1 /*
2 * Copyright 1999,2004 The Apache Software Foundation.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 package org.apache.catalina.cluster.tcp;
18 import java.io.IOException;
19 import java.nio.ByteBuffer;
20 import java.nio.channels.SelectionKey;
21 import java.nio.channels.SocketChannel;
22
23 import org.apache.catalina.cluster.io.ObjectReader;
24
25 /**
26 * A worker thread class which can drain channels and echo-back the input. Each
27 * instance is constructed with a reference to the owning thread pool object.
28 * When started, the thread loops forever waiting to be awakened to service the
29 * channel associated with a SelectionKey object. The worker is tasked by
30 * calling its serviceChannel() method with a SelectionKey object. The
31 * serviceChannel() method stores the key reference in the thread object then
32 * calls notify() to wake it up. When the channel has been drained, the worker
33 * thread returns itself to its parent pool.
34 *
35 * @author Filip Hanik
36 *
37 * @version $Revision: 303987 $, $Date: 2005-07-08 16:50:30 -0400 (Fri, 08 Jul 2005) $
38 */
39 public class TcpReplicationThread extends WorkerThread {
40 public static final byte[] ACK_COMMAND = new byte[] {6, 2, 3};
41 private static org.apache.commons.logging.Log log =
42 org.apache.commons.logging.LogFactory.getLog( TcpReplicationThread.class );
43 private ByteBuffer buffer = ByteBuffer.allocate (1024);
44 private SelectionKey key;
45 private boolean sendAck=true;
46
47
48 TcpReplicationThread ()
49 {
50 }
51
52 // loop forever waiting for work to do
53 public synchronized void run()
54 {
55 while (doRun) {
56 try {
57 // sleep and release object lock
58 this.wait();
59 } catch (InterruptedException e) {
60 if(log.isInfoEnabled())
61 log.info("TCP worker thread interrupted in cluster",e);
62 // clear interrupt status
63 Thread.interrupted();
64 }
65 if (key == null) {
66 continue; // just in case
67 }
68 try {
69 drainChannel (key);
70 } catch (Exception e) {
71 log.error ("TCP Worker thread in cluster caught '"
72 + e + "' closing channel", e);
73
74 // close channel and nudge selector
75 try {
76 key.channel().close();
77 } catch (IOException ex) {
78 log.error("Unable to close channel.",ex);
79 }
80 key.selector().wakeup();
81 }
82 key = null;
83 // done, ready for more, return to pool
84 this.pool.returnWorker (this);
85 }
86 }
87
88 /**
89 * Called to initiate a unit of work by this worker thread
90 * on the provided SelectionKey object. This method is
91 * synchronized, as is the run() method, so only one key
92 * can be serviced at a given time.
93 * Before waking the worker thread, and before returning
94 * to the main selection loop, this key's interest set is
95 * updated to remove OP_READ. This will cause the selector
96 * to ignore read-readiness for this channel while the
97 * worker thread is servicing it.
98 */
99 synchronized void serviceChannel (SelectionKey key, boolean sendAck)
100 {
101 this.key = key;
102 this.sendAck=sendAck;
103 key.interestOps (key.interestOps() & (~SelectionKey.OP_READ));
104 key.interestOps (key.interestOps() & (~SelectionKey.OP_WRITE));
105 this.notify(); // awaken the thread
106 }
107
108 /**
109 * The actual code which drains the channel associated with
110 * the given key. This method assumes the key has been
111 * modified prior to invocation to turn off selection
112 * interest in OP_READ. When this method completes it
113 * re-enables OP_READ and calls wakeup() on the selector
114 * so the selector will resume watching this channel.
115 */
116 protected void drainChannel (SelectionKey key)
117 throws Exception
118 {
119 boolean packetReceived=false;
120 SocketChannel channel = (SocketChannel) key.channel();
121 int count;
122 buffer.clear(); // make buffer empty
123 ObjectReader reader = (ObjectReader)key.attachment();
124 // loop while data available, channel is non-blocking
125 while ((count = channel.read (buffer)) > 0) {
126 buffer.flip(); // make buffer readable
127 reader.append(buffer.array(),0,count);
128 buffer.clear(); // make buffer empty
129 }
130 //check to see if any data is available
131 int pkgcnt = reader.execute();
132 if (log.isTraceEnabled()) {
133 log.trace("sending " + pkgcnt + " ack packages to " + channel.socket().getLocalPort() );
134 }
135
136 if (sendAck) {
137 while ( pkgcnt > 0 ) {
138 sendAck(key,channel);
139 pkgcnt--;
140 }
141 }
142
143 if (count < 0) {
144 // close channel on EOF, invalidates the key
145 channel.close();
146 return;
147 }
148
149 //acquire the interestOps mutex
150 Object mutex = this.getPool().getInterestOpsMutex();
151 synchronized (mutex) {
152 // cycle the selector so this key is active again
153 key.selector().wakeup();
154 // resume interest in OP_READ, OP_WRITE
155 int resumeOps = key.interestOps() | SelectionKey.OP_READ;
156 key.interestOps(resumeOps);
157 }
158
159 }
160
161 /**
162 * send a reply-acknowledgement (6,2,3)
163 * @param key
164 * @param channel
165 */
166 protected void sendAck(SelectionKey key, SocketChannel channel) {
167
168 try {
169 channel.write(ByteBuffer.wrap(ACK_COMMAND));
170 if (log.isTraceEnabled()) {
171 log.trace("ACK sent to " + channel.socket().getPort());
172 }
173 } catch ( java.io.IOException x ) {
174 log.warn("Unable to send ACK back through channel, channel disconnected?: "+x.getMessage());
175 }
176 }
177 }