1 /*
2 * Copyright 1999,2004-2005 The Apache Software Foundation.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 package org.apache.catalina.cluster.tcp;
18
19
20 import java.net.InetSocketAddress;
21 import java.net.ServerSocket;
22 import java.nio.channels.SelectableChannel;
23 import java.nio.channels.SelectionKey;
24 import java.nio.channels.Selector;
25 import java.nio.channels.ServerSocketChannel;
26 import java.nio.channels.SocketChannel;
27 import java.util.Iterator;
28
29 import org.apache.catalina.cluster.io.ObjectReader;
30
31 /**
32 * FIXME i18n log messages
33 * FIXME jmx support
34 * @author Peter Rossbach
35 * @author Filip Hanik
36 * @version $Revision: 349504 $ $Date: 2005-11-28 16:05:40 -0500 (Mon, 28 Nov 2005) $
37 */
38 public class ReplicationListener extends ClusterReceiverBase
39 {
40
41 /**
42 * The descriptive information about this implementation.
43 */
44 private static final String info = "ReplicationListener/1.2";
45
46 private ThreadPool pool = null;
47 private int tcpThreadCount;
48 private long tcpSelectorTimeout;
49 private Selector selector = null;
50
51 private Object interestOpsMutex = new Object();
52
53 public ReplicationListener() {
54 }
55
56 /**
57 * Return descriptive information about this implementation and the
58 * corresponding version number, in the format
59 * <code><description>/<version></code>.
60 */
61 public String getInfo() {
62
63 return (info);
64
65 }
66
67 public long getTcpSelectorTimeout() {
68 return tcpSelectorTimeout;
69 }
70 public void setTcpSelectorTimeout(long tcpSelectorTimeout) {
71 this.tcpSelectorTimeout = tcpSelectorTimeout;
72 }
73 public int getTcpThreadCount() {
74 return tcpThreadCount;
75 }
76 public void setTcpThreadCount(int tcpThreadCount) {
77 this.tcpThreadCount = tcpThreadCount;
78 }
79 public Object getInterestOpsMutex() {
80 return interestOpsMutex;
81 }
82
83 /**
84 * start cluster receiver
85 * @throws Exception
86 * @see org.apache.catalina.cluster.ClusterReceiver#start()
87 */
88 public void start() {
89 try {
90 pool = new ThreadPool(tcpThreadCount, TcpReplicationThread.class, interestOpsMutex);
91 } catch (Exception e) {
92 log.error("ThreadPool can initilzed. Listener not started",e);
93 return ;
94 }
95 super.start() ;
96 }
97
98
99 /**
100 * get data from channel and store in byte array
101 * send it to cluster
102 * @throws IOException
103 * @throws java.nio.channels.ClosedChannelException
104 */
105 protected void listen ()
106 throws Exception
107 {
108 if (doListen) {
109 log.warn("ServerSocketChannel allready started");
110 return;
111 }
112 doListen=true;
113 // allocate an unbound server socket channel
114 ServerSocketChannel serverChannel = ServerSocketChannel.open();
115 // Get the associated ServerSocket to bind it with
116 ServerSocket serverSocket = serverChannel.socket();
117 // create a new Selector for use below
118 selector = Selector.open();
119 // set the port the server channel will listen to
120 serverSocket.bind (new InetSocketAddress (getBind(),getTcpListenPort()));
121 // set non-blocking mode for the listening socket
122 serverChannel.configureBlocking (false);
123 // register the ServerSocketChannel with the Selector
124 serverChannel.register (selector, SelectionKey.OP_ACCEPT);
125 while (doListen && selector != null) {
126 // this may block for a long time, upon return the
127 // selected set contains keys of the ready channels
128 try {
129
130 int n = selector.select(tcpSelectorTimeout);
131 if (n == 0) {
132 //there is a good chance that we got here
133 //because the TcpReplicationThread called
134 //selector wakeup().
135 //if that happens, we must ensure that that
136 //thread has enough time to call interestOps
137 synchronized (interestOpsMutex) {
138 //if we got the lock, means there are no
139 //keys trying to register for the
140 //interestOps method
141 }
142 continue; // nothing to do
143 }
144 // get an iterator over the set of selected keys
145 Iterator it = selector.selectedKeys().iterator();
146 // look at each key in the selected set
147 while (it.hasNext()) {
148 SelectionKey key = (SelectionKey) it.next();
149 // Is a new connection coming in?
150 if (key.isAcceptable()) {
151 ServerSocketChannel server =
152 (ServerSocketChannel) key.channel();
153 SocketChannel channel = server.accept();
154 Object attach = new ObjectReader(channel, selector,
155 this) ;
156 registerChannel(selector,
157 channel,
158 SelectionKey.OP_READ,
159 attach);
160 }
161 // is there data to read on this channel?
162 if (key.isReadable()) {
163 readDataFromSocket(key);
164 } else {
165 key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
166 }
167
168 // remove key from selected set, it's been handled
169 it.remove();
170 }
171 } catch (java.nio.channels.ClosedSelectorException cse) {
172 // ignore is normal at shutdown or stop listen socket
173 } catch (java.nio.channels.CancelledKeyException nx) {
174 log.warn(
175 "Replication client disconnected, error when polling key. Ignoring client.");
176 } catch (Exception x) {
177 log.error("Unable to process request in ReplicationListener", x);
178 }
179
180 }
181 serverChannel.close();
182 if(selector != null)
183 selector.close();
184 }
185
186 /**
187 * Close Selector.
188 *
189 * @see org.apache.catalina.cluster.tcp.ClusterReceiverBase#stopListening()
190 */
191 protected void stopListening() {
192 // Bugzilla 37529: http://issues.apache.org/bugzilla/show_bug.cgi?id=37529
193 doListen = false;
194 if ( selector != null ) {
195 try {
196 for(int i = 0; i < getTcpThreadCount(); i++) {
197 selector.wakeup();
198 }
199 selector.close();
200 } catch ( Exception x ) {
201 log.error("Unable to close cluster receiver selector.",x);
202 } finally {
203 selector = null;
204 }
205 }
206 }
207
208 // ----------------------------------------------------------
209
210 /**
211 * Register the given channel with the given selector for
212 * the given operations of interest
213 */
214 protected void registerChannel (Selector selector,
215 SelectableChannel channel,
216 int ops,
217 Object attach)
218 throws Exception {
219 if (channel == null) return; // could happen
220 // set the new channel non-blocking
221 channel.configureBlocking (false);
222 // register it with the selector
223 channel.register (selector, ops, attach);
224 }
225
226 // ----------------------------------------------------------
227
228 /**
229 * Sample data handler method for a channel with data ready to read.
230 * @param key A SelectionKey object associated with a channel
231 * determined by the selector to be ready for reading. If the
232 * channel returns an EOF condition, it is closed here, which
233 * automatically invalidates the associated key. The selector
234 * will then de-register the channel on the next select call.
235 */
236 protected void readDataFromSocket (SelectionKey key)
237 throws Exception
238 {
239 TcpReplicationThread worker = (TcpReplicationThread)pool.getWorker();
240 if (worker == null) {
241 // No threads available, do nothing, the selection
242 // loop will keep calling this method until a
243 // thread becomes available.
244 // FIXME: This design could be improved.
245 if(log.isDebugEnabled())
246 log.debug("No TcpReplicationThread available");
247 } else {
248 // invoking this wakes up the worker thread then returns
249 worker.serviceChannel(key, isSendAck());
250 }
251 }
252
253
254 }