1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18 package org.apache.catalina.tribes.transport.nio;
19
20 import java.io.IOException;
21 import java.net.ServerSocket;
22 import java.nio.channels.SelectableChannel;
23 import java.nio.channels.SelectionKey;
24 import java.nio.channels.Selector;
25 import java.nio.channels.ServerSocketChannel;
26 import java.nio.channels.SocketChannel;
27 import java.util.Iterator;
28
29 import org.apache.catalina.tribes.ChannelReceiver;
30 import org.apache.catalina.tribes.io.ListenCallback;
31 import org.apache.catalina.tribes.io.ObjectReader;
32 import org.apache.catalina.tribes.transport.Constants;
33 import org.apache.catalina.tribes.transport.ReceiverBase;
34 import org.apache.catalina.tribes.transport.RxTaskPool;
35 import org.apache.catalina.tribes.transport.AbstractRxTask;
36 import org.apache.catalina.tribes.util.StringManager;
37 import java.util.LinkedList;
38 import java.util.Set;
39 import java.nio.channels.CancelledKeyException;
40
41 /**
42 * @author Filip Hanik
43 * @version $Revision: 538977 $ $Date: 2007-05-17 17:43:49 +0200 (jeu., 17 mai 2007) $
44 */
45 public class NioReceiver extends ReceiverBase implements Runnable, ChannelReceiver, ListenCallback {
46
47 protected static org.apache.juli.logging.Log log = org.apache.juli.logging.LogFactory.getLog(NioReceiver.class);
48
49 /**
50 * The string manager for this package.
51 */
52 protected StringManager sm = StringManager.getManager(Constants.Package);
53
54 /**
55 * The descriptive information about this implementation.
56 */
57 private static final String info = "NioReceiver/1.0";
58
59 private Selector selector = null;
60 private ServerSocketChannel serverChannel = null;
61
62 protected LinkedList events = new LinkedList();
63 // private Object interestOpsMutex = new Object();
64
65 public NioReceiver() {
66 }
67
68 /**
69 * Return descriptive information about this implementation and the
70 * corresponding version number, in the format
71 * <code><description>/<version></code>.
72 */
73 public String getInfo() {
74 return (info);
75 }
76
77 // public Object getInterestOpsMutex() {
78 // return interestOpsMutex;
79 // }
80
81 public void stop() {
82 this.stopListening();
83 super.stop();
84 }
85
86 /**
87 * start cluster receiver
88 * @throws Exception
89 * @see org.apache.catalina.tribes.ClusterReceiver#start()
90 */
91 public void start() throws IOException {
92 super.start();
93 try {
94 setPool(new RxTaskPool(getMaxThreads(),getMinThreads(),this));
95 } catch (Exception x) {
96 log.fatal("ThreadPool can initilzed. Listener not started", x);
97 if ( x instanceof IOException ) throw (IOException)x;
98 else throw new IOException(x.getMessage());
99 }
100 try {
101 getBind();
102 bind();
103 Thread t = new Thread(this, "NioReceiver");
104 t.setDaemon(true);
105 t.start();
106 } catch (Exception x) {
107 log.fatal("Unable to start cluster receiver", x);
108 if ( x instanceof IOException ) throw (IOException)x;
109 else throw new IOException(x.getMessage());
110 }
111 }
112
113 public AbstractRxTask createRxTask() {
114 NioReplicationTask thread = new NioReplicationTask(this,this);
115 thread.setUseBufferPool(this.getUseBufferPool());
116 thread.setRxBufSize(getRxBufSize());
117 thread.setOptions(getWorkerThreadOptions());
118 return thread;
119 }
120
121
122
123 protected void bind() throws IOException {
124 // allocate an unbound server socket channel
125 serverChannel = ServerSocketChannel.open();
126 // Get the associated ServerSocket to bind it with
127 ServerSocket serverSocket = serverChannel.socket();
128 // create a new Selector for use below
129 selector = Selector.open();
130 // set the port the server channel will listen to
131 //serverSocket.bind(new InetSocketAddress(getBind(), getTcpListenPort()));
132 bind(serverSocket,getTcpListenPort(),getAutoBind());
133 // set non-blocking mode for the listening socket
134 serverChannel.configureBlocking(false);
135 // register the ServerSocketChannel with the Selector
136 serverChannel.register(selector, SelectionKey.OP_ACCEPT);
137
138 }
139
140 public void addEvent(Runnable event) {
141 if ( selector != null ) {
142 synchronized (events) {
143 events.add(event);
144 }
145 if ( log.isTraceEnabled() ) log.trace("Adding event to selector:"+event);
146 if ( isListening() && selector!=null ) selector.wakeup();
147 }
148 }
149
150 public void events() {
151 if ( events.size() == 0 ) return;
152 synchronized (events) {
153 Runnable r = null;
154 while ( (events.size() > 0) && (r = (Runnable)events.removeFirst()) != null ) {
155 try {
156 if ( log.isTraceEnabled() ) log.trace("Processing event in selector:"+r);
157 r.run();
158 } catch ( Exception x ) {
159 log.error("",x);
160 }
161 }
162 events.clear();
163 }
164 }
165
166 public static void cancelledKey(SelectionKey key) {
167 ObjectReader reader = (ObjectReader)key.attachment();
168 if ( reader != null ) {
169 reader.setCancelled(true);
170 reader.finish();
171 }
172 key.cancel();
173 key.attach(null);
174 try { ((SocketChannel)key.channel()).socket().close(); } catch (IOException e) { if (log.isDebugEnabled()) log.debug("", e); }
175 try { key.channel().close(); } catch (IOException e) { if (log.isDebugEnabled()) log.debug("", e); }
176
177 }
178 protected long lastCheck = System.currentTimeMillis();
179 protected void socketTimeouts() {
180 long now = System.currentTimeMillis();
181 if ( (now-lastCheck) < getSelectorTimeout() ) return;
182 //timeout
183 Selector tmpsel = selector;
184 Set keys = (isListening()&&tmpsel!=null)?tmpsel.keys():null;
185 if ( keys == null ) return;
186 for (Iterator iter = keys.iterator(); iter.hasNext(); ) {
187 SelectionKey key = (SelectionKey) iter.next();
188 try {
189 // if (key.interestOps() == SelectionKey.OP_READ) {
190 // //only timeout sockets that we are waiting for a read from
191 // ObjectReader ka = (ObjectReader) key.attachment();
192 // long delta = now - ka.getLastAccess();
193 // if (delta > (long) getTimeout()) {
194 // cancelledKey(key);
195 // }
196 // }
197 // else
198 if ( key.interestOps() == 0 ) {
199 //check for keys that didn't make it in.
200 ObjectReader ka = (ObjectReader) key.attachment();
201 if ( ka != null ) {
202 long delta = now - ka.getLastAccess();
203 if (delta > (long) getTimeout() && (!ka.isAccessed())) {
204 log.warn("Channel key is registered, but has had no interest ops for the last "+getTimeout()+" ms. (cancelled:"+ka.isCancelled()+"):"+key+" last access:"+new java.sql.Timestamp(ka.getLastAccess()));
205 // System.out.println("Interest:"+key.interestOps());
206 // System.out.println("Ready Ops:"+key.readyOps());
207 // System.out.println("Valid:"+key.isValid());
208 ka.setLastAccess(now);
209 //key.interestOps(SelectionKey.OP_READ);
210 }//end if
211 } else {
212 cancelledKey(key);
213 }//end if
214 }//end if
215 }catch ( CancelledKeyException ckx ) {
216 cancelledKey(key);
217 }
218 }
219 lastCheck = System.currentTimeMillis();
220 }
221
222
223 /**
224 * get data from channel and store in byte array
225 * send it to cluster
226 * @throws IOException
227 * @throws java.nio.channels.ClosedChannelException
228 */
229 protected void listen() throws Exception {
230 if (doListen()) {
231 log.warn("ServerSocketChannel already started");
232 return;
233 }
234
235 setListen(true);
236
237 while (doListen() && selector != null) {
238 // this may block for a long time, upon return the
239 // selected set contains keys of the ready channels
240 try {
241 events();
242 socketTimeouts();
243 int n = selector.select(getTcpSelectorTimeout());
244 if (n == 0) {
245 //there is a good chance that we got here
246 //because the TcpReplicationThread called
247 //selector wakeup().
248 //if that happens, we must ensure that that
249 //thread has enough time to call interestOps
250 // synchronized (interestOpsMutex) {
251 //if we got the lock, means there are no
252 //keys trying to register for the
253 //interestOps method
254 // }
255 continue; // nothing to do
256 }
257 // get an iterator over the set of selected keys
258 Iterator it = selector.selectedKeys().iterator();
259 // look at each key in the selected set
260 while (it.hasNext()) {
261 SelectionKey key = (SelectionKey) it.next();
262 // Is a new connection coming in?
263 if (key.isAcceptable()) {
264 ServerSocketChannel server = (ServerSocketChannel) key.channel();
265 SocketChannel channel = server.accept();
266 channel.socket().setReceiveBufferSize(getRxBufSize());
267 channel.socket().setSendBufferSize(getTxBufSize());
268 channel.socket().setTcpNoDelay(getTcpNoDelay());
269 channel.socket().setKeepAlive(getSoKeepAlive());
270 channel.socket().setOOBInline(getOoBInline());
271 channel.socket().setReuseAddress(getSoReuseAddress());
272 channel.socket().setSoLinger(getSoLingerOn(),getSoLingerTime());
273 channel.socket().setTrafficClass(getSoTrafficClass());
274 channel.socket().setSoTimeout(getTimeout());
275 Object attach = new ObjectReader(channel);
276 registerChannel(selector,
277 channel,
278 SelectionKey.OP_READ,
279 attach);
280 }
281 // is there data to read on this channel?
282 if (key.isReadable()) {
283 readDataFromSocket(key);
284 } else {
285 key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
286 }
287
288 // remove key from selected set, it's been handled
289 it.remove();
290 }
291 } catch (java.nio.channels.ClosedSelectorException cse) {
292 // ignore is normal at shutdown or stop listen socket
293 } catch (java.nio.channels.CancelledKeyException nx) {
294 log.warn("Replication client disconnected, error when polling key. Ignoring client.");
295 } catch (Throwable x) {
296 try {
297 log.error("Unable to process request in NioReceiver", x);
298 }catch ( Throwable tx ) {
299 //in case an out of memory error, will affect the logging framework as well
300 tx.printStackTrace();
301 }
302 }
303
304 }
305 serverChannel.close();
306 if (selector != null)
307 selector.close();
308 }
309
310
311
312 /**
313 * Close Selector.
314 *
315 * @see org.apache.catalina.tribes.transport.ClusterReceiverBase#stopListening()
316 */
317 protected void stopListening() {
318 setListen(false);
319 if (selector != null) {
320 try {
321 selector.wakeup();
322 selector.close();
323 } catch (Exception x) {
324 log.error("Unable to close cluster receiver selector.", x);
325 } finally {
326 selector = null;
327 }
328 }
329 }
330
331 // ----------------------------------------------------------
332
333 /**
334 * Register the given channel with the given selector for
335 * the given operations of interest
336 */
337 protected void registerChannel(Selector selector,
338 SelectableChannel channel,
339 int ops,
340 Object attach) throws Exception {
341 if (channel == null)return; // could happen
342 // set the new channel non-blocking
343 channel.configureBlocking(false);
344 // register it with the selector
345 channel.register(selector, ops, attach);
346 }
347
348 /**
349 * Start thread and listen
350 */
351 public void run() {
352 try {
353 listen();
354 } catch (Exception x) {
355 log.error("Unable to run replication listener.", x);
356 }
357 }
358
359 // ----------------------------------------------------------
360
361 /**
362 * Sample data handler method for a channel with data ready to read.
363 * @param key A SelectionKey object associated with a channel
364 * determined by the selector to be ready for reading. If the
365 * channel returns an EOF condition, it is closed here, which
366 * automatically invalidates the associated key. The selector
367 * will then de-register the channel on the next select call.
368 */
369 protected void readDataFromSocket(SelectionKey key) throws Exception {
370 NioReplicationTask task = (NioReplicationTask) getTaskPool().getRxTask();
371 if (task == null) {
372 // No threads/tasks available, do nothing, the selection
373 // loop will keep calling this method until a
374 // thread becomes available, the thread pool itself has a waiting mechanism
375 // so we will not wait here.
376 if (log.isDebugEnabled()) log.debug("No TcpReplicationThread available");
377 } else {
378 // invoking this wakes up the worker thread then returns
379 //add task to thread pool
380 task.serviceChannel(key);
381 getExecutor().execute(task);
382 }
383 }
384
385
386 }