| Method from org.apache.catalina.cluster.tcp.ReplicationListener Detail: |
public String getInfo() {
return (info);
}
Return descriptive information about this implementation and the
corresponding version number, in the format
<description>/<version>. |
public Object getInterestOpsMutex() {
return interestOpsMutex;
}
|
public long getTcpSelectorTimeout() {
return tcpSelectorTimeout;
}
|
public int getTcpThreadCount() {
return tcpThreadCount;
}
|
protected void listen() throws Exception {
if (doListen) {
log.warn("ServerSocketChannel allready started");
return;
}
doListen=true;
// allocate an unbound server socket channel
ServerSocketChannel serverChannel = ServerSocketChannel.open();
// Get the associated ServerSocket to bind it with
ServerSocket serverSocket = serverChannel.socket();
// create a new Selector for use below
selector = Selector.open();
// set the port the server channel will listen to
serverSocket.bind (new InetSocketAddress (getBind(),getTcpListenPort()));
// set non-blocking mode for the listening socket
serverChannel.configureBlocking (false);
// register the ServerSocketChannel with the Selector
serverChannel.register (selector, SelectionKey.OP_ACCEPT);
while (doListen && selector != null) {
// this may block for a long time, upon return the
// selected set contains keys of the ready channels
try {
int n = selector.select(tcpSelectorTimeout);
if (n == 0) {
//there is a good chance that we got here
//because the TcpReplicationThread called
//selector wakeup().
//if that happens, we must ensure that that
//thread has enough time to call interestOps
synchronized (interestOpsMutex) {
//if we got the lock, means there are no
//keys trying to register for the
//interestOps method
}
continue; // nothing to do
}
// get an iterator over the set of selected keys
Iterator it = selector.selectedKeys().iterator();
// look at each key in the selected set
while (it.hasNext()) {
SelectionKey key = (SelectionKey) it.next();
// Is a new connection coming in?
if (key.isAcceptable()) {
ServerSocketChannel server =
(ServerSocketChannel) key.channel();
SocketChannel channel = server.accept();
Object attach = new ObjectReader(channel, selector,
this) ;
registerChannel(selector,
channel,
SelectionKey.OP_READ,
attach);
}
// is there data to read on this channel?
if (key.isReadable()) {
readDataFromSocket(key);
} else {
key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
}
// remove key from selected set, it's been handled
it.remove();
}
} catch (java.nio.channels.ClosedSelectorException cse) {
// ignore is normal at shutdown or stop listen socket
} catch (java.nio.channels.CancelledKeyException nx) {
log.warn(
"Replication client disconnected, error when polling key. Ignoring client.");
} catch (Exception x) {
log.error("Unable to process request in ReplicationListener", x);
}
}
serverChannel.close();
if(selector != null)
selector.close();
}
get data from channel and store in byte array
send it to cluster |
protected void readDataFromSocket(SelectionKey key) throws Exception {
TcpReplicationThread worker = (TcpReplicationThread)pool.getWorker();
if (worker == null) {
// No threads available, do nothing, the selection
// loop will keep calling this method until a
// thread becomes available.
// FIXME: This design could be improved.
if(log.isDebugEnabled())
log.debug("No TcpReplicationThread available");
} else {
// invoking this wakes up the worker thread then returns
worker.serviceChannel(key, isSendAck());
}
}
Sample data handler method for a channel with data ready to read. |
protected void registerChannel(Selector selector,
SelectableChannel channel,
int ops,
Object attach) throws Exception {
if (channel == null) return; // could happen
// set the new channel non-blocking
channel.configureBlocking (false);
// register it with the selector
channel.register (selector, ops, attach);
}
Register the given channel with the given selector for
the given operations of interest |
public void setTcpSelectorTimeout(long tcpSelectorTimeout) {
this.tcpSelectorTimeout = tcpSelectorTimeout;
}
|
public void setTcpThreadCount(int tcpThreadCount) {
this.tcpThreadCount = tcpThreadCount;
}
|
public void start() {
try {
pool = new ThreadPool(tcpThreadCount, TcpReplicationThread.class, interestOpsMutex);
} catch (Exception e) {
log.error("ThreadPool can initilzed. Listener not started",e);
return ;
}
super.start() ;
}
|
protected void stopListening() {
// Bugzilla 37529: http://issues.apache.org/bugzilla/show_bug.cgi?id=37529
doListen = false;
if ( selector != null ) {
try {
for(int i = 0; i < getTcpThreadCount(); i++) {
selector.wakeup();
}
selector.close();
} catch ( Exception x ) {
log.error("Unable to close cluster receiver selector.",x);
} finally {
selector = null;
}
}
}
|