1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17 package org.apache.activemq.transport.tcp;
18
19 import java.io.DataInputStream;
20 import java.io.DataOutputStream;
21 import java.io.IOException;
22 import java.io.InterruptedIOException;
23 import java.net.InetAddress;
24 import java.net.InetSocketAddress;
25 import java.net.Socket;
26 import java.net.SocketException;
27 import java.net.SocketTimeoutException;
28 import java.net.URI;
29 import java.net.UnknownHostException;
30 import java.util.HashMap;
31 import java.util.Map;
32 import java.util.concurrent.CountDownLatch;
33 import java.util.concurrent.SynchronousQueue;
34 import java.util.concurrent.ThreadFactory;
35 import java.util.concurrent.ThreadPoolExecutor;
36 import java.util.concurrent.TimeUnit;
37 import java.util.concurrent.atomic.AtomicReference;
38
39 import javax.net.SocketFactory;
40
41 import org.apache.activemq.Service;
42 import org.apache.activemq.transport.Transport;
43 import org.apache.activemq.transport.TransportLoggerFactory;
44 import org.apache.activemq.transport.TransportThreadSupport;
45 import org.apache.activemq.util.IntrospectionSupport;
46 import org.apache.activemq.util.ServiceStopper;
47 import org.apache.activemq.wireformat.WireFormat;
48 import org.apache.commons.logging.Log;
49 import org.apache.commons.logging.LogFactory;
50
51 /**
52 * An implementation of the {@link Transport} interface using raw tcp/ip
53 *
54 * @author David Martin Clavo david(dot)martin(dot)clavo(at)gmail.com (logging improvement modifications)
55 * @version $Revision$
56 */
57 public class TcpTransport extends TransportThreadSupport implements Transport, Service, Runnable {
58 private static final Log LOG = LogFactory.getLog(TcpTransport.class);
59 private static final ThreadPoolExecutor SOCKET_CLOSE;
60 protected final URI remoteLocation;
61 protected final URI localLocation;
62 protected final WireFormat wireFormat;
63
64 protected int connectionTimeout = 30000;
65 protected int soTimeout;
66 protected int socketBufferSize = 64 * 1024;
67 protected int ioBufferSize = 8 * 1024;
68 protected Socket socket;
69 protected DataOutputStream dataOut;
70 protected DataInputStream dataIn;
71 /**
72 * trace=true -> the Transport stack where this TcpTransport
73 * object will be, will have a TransportLogger layer
74 * trace=false -> the Transport stack where this TcpTransport
75 * object will be, will NOT have a TransportLogger layer, and therefore
76 * will never be able to print logging messages.
77 * This parameter is most probably set in Connection or TransportConnector URIs.
78 */
79 protected boolean trace = false;
80 /**
81 * Name of the LogWriter implementation to use.
82 * Names are mapped to classes in the resources/META-INF/services/org/apache/activemq/transport/logwriters directory.
83 * This parameter is most probably set in Connection or TransportConnector URIs.
84 */
85 protected String logWriterName = TransportLoggerFactory.defaultLogWriterName;
86 /**
87 * Specifies if the TransportLogger will be manageable by JMX or not.
88 * Also, as long as there is at least 1 TransportLogger which is manageable,
89 * a TransportLoggerControl MBean will me created.
90 */
91 protected boolean dynamicManagement = false;
92 /**
93 * startLogging=true -> the TransportLogger object of the Transport stack
94 * will initially write messages to the log.
95 * startLogging=false -> the TransportLogger object of the Transport stack
96 * will initially NOT write messages to the log.
97 * This parameter only has an effect if trace == true.
98 * This parameter is most probably set in Connection or TransportConnector URIs.
99 */
100 protected boolean startLogging = true;
101 /**
102 * Specifies the port that will be used by the JMX server to manage
103 * the TransportLoggers.
104 * This should only be set in an URI by a client (producer or consumer) since
105 * a broker will already create a JMX server.
106 * It is useful for people who test a broker and clients in the same machine
107 * and want to control both via JMX; a different port will be needed.
108 */
109 protected int jmxPort = 1099;
110 protected boolean useLocalHost = true;
111 protected int minmumWireFormatVersion;
112 protected SocketFactory socketFactory;
113 protected final AtomicReference<CountDownLatch> stoppedLatch = new AtomicReference<CountDownLatch>();
114
115 private Map<String, Object> socketOptions;
116 private Boolean keepAlive;
117 private Boolean tcpNoDelay;
118 private Thread runnerThread;
119
120 /**
121 * Connect to a remote Node - e.g. a Broker
122 *
123 * @param wireFormat
124 * @param socketFactory
125 * @param remoteLocation
126 * @param localLocation - e.g. local InetAddress and local port
127 * @throws IOException
128 * @throws UnknownHostException
129 */
130 public TcpTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation,
131 URI localLocation) throws UnknownHostException, IOException {
132 this.wireFormat = wireFormat;
133 this.socketFactory = socketFactory;
134 try {
135 this.socket = socketFactory.createSocket();
136 } catch (SocketException e) {
137 this.socket = null;
138 }
139 this.remoteLocation = remoteLocation;
140 this.localLocation = localLocation;
141 setDaemon(false);
142 }
143
144 /**
145 * Initialize from a server Socket
146 *
147 * @param wireFormat
148 * @param socket
149 * @throws IOException
150 */
151 public TcpTransport(WireFormat wireFormat, Socket socket) throws IOException {
152 this.wireFormat = wireFormat;
153 this.socket = socket;
154 this.remoteLocation = null;
155 this.localLocation = null;
156 setDaemon(true);
157 }
158
159 /**
160 * A one way asynchronous send
161 */
162 public void oneway(Object command) throws IOException {
163 checkStarted();
164 wireFormat.marshal(command, dataOut);
165 dataOut.flush();
166 }
167
168 /**
169 * @return pretty print of 'this'
170 */
171 public String toString() {
172 return "tcp://" + socket.getInetAddress() + ":" + socket.getPort();
173 }
174
175 /**
176 * reads packets from a Socket
177 */
178 public void run() {
179 LOG.trace("TCP consumer thread starting");
180 this.runnerThread=Thread.currentThread();
181 try {
182 while (!isStopped()) {
183 doRun();
184 }
185 } catch (IOException e) {
186 stoppedLatch.get().countDown();
187 onException(e);
188 } finally {
189 stoppedLatch.get().countDown();
190 }
191 }
192
193 protected void doRun() throws IOException {
194 try {
195 Object command = readCommand();
196 doConsume(command);
197 } catch (SocketTimeoutException e) {
198 } catch (InterruptedIOException e) {
199 }
200 }
201
202 protected Object readCommand() throws IOException {
203 return wireFormat.unmarshal(dataIn);
204 }
205
206 // Properties
207 // -------------------------------------------------------------------------
208
209 public boolean isTrace() {
210 return trace;
211 }
212
213 public void setTrace(boolean trace) {
214 this.trace = trace;
215 }
216
217 public String getLogWriterName() {
218 return logWriterName;
219 }
220
221 public void setLogWriterName(String logFormat) {
222 this.logWriterName = logFormat;
223 }
224
225 public boolean isDynamicManagement() {
226 return dynamicManagement;
227 }
228
229 public void setDynamicManagement(boolean useJmx) {
230 this.dynamicManagement = useJmx;
231 }
232
233 public boolean isStartLogging() {
234 return startLogging;
235 }
236
237 public void setStartLogging(boolean startLogging) {
238 this.startLogging = startLogging;
239 }
240
241 public int getJmxPort() {
242 return jmxPort;
243 }
244
245 public void setJmxPort(int jmxPort) {
246 this.jmxPort = jmxPort;
247 }
248
249 public int getMinmumWireFormatVersion() {
250 return minmumWireFormatVersion;
251 }
252
253 public void setMinmumWireFormatVersion(int minmumWireFormatVersion) {
254 this.minmumWireFormatVersion = minmumWireFormatVersion;
255 }
256
257 public boolean isUseLocalHost() {
258 return useLocalHost;
259 }
260
261 /**
262 * Sets whether 'localhost' or the actual local host name should be used to
263 * make local connections. On some operating systems such as Macs its not
264 * possible to connect as the local host name so localhost is better.
265 */
266 public void setUseLocalHost(boolean useLocalHost) {
267 this.useLocalHost = useLocalHost;
268 }
269
270 public int getSocketBufferSize() {
271 return socketBufferSize;
272 }
273
274 /**
275 * Sets the buffer size to use on the socket
276 */
277 public void setSocketBufferSize(int socketBufferSize) {
278 this.socketBufferSize = socketBufferSize;
279 }
280
281 public int getSoTimeout() {
282 return soTimeout;
283 }
284
285 /**
286 * Sets the socket timeout
287 */
288 public void setSoTimeout(int soTimeout) {
289 this.soTimeout = soTimeout;
290 }
291
292 public int getConnectionTimeout() {
293 return connectionTimeout;
294 }
295
296 /**
297 * Sets the timeout used to connect to the socket
298 */
299 public void setConnectionTimeout(int connectionTimeout) {
300 this.connectionTimeout = connectionTimeout;
301 }
302
303 public Boolean getKeepAlive() {
304 return keepAlive;
305 }
306
307 /**
308 * Enable/disable TCP KEEP_ALIVE mode
309 */
310 public void setKeepAlive(Boolean keepAlive) {
311 this.keepAlive = keepAlive;
312 }
313
314 public Boolean getTcpNoDelay() {
315 return tcpNoDelay;
316 }
317
318 /**
319 * Enable/disable the TCP_NODELAY option on the socket
320 */
321 public void setTcpNoDelay(Boolean tcpNoDelay) {
322 this.tcpNoDelay = tcpNoDelay;
323 }
324
325 /**
326 * @return the ioBufferSize
327 */
328 public int getIoBufferSize() {
329 return this.ioBufferSize;
330 }
331
332 /**
333 * @param ioBufferSize the ioBufferSize to set
334 */
335 public void setIoBufferSize(int ioBufferSize) {
336 this.ioBufferSize = ioBufferSize;
337 }
338
339 // Implementation methods
340 // -------------------------------------------------------------------------
341 protected String resolveHostName(String host) throws UnknownHostException {
342 String localName = InetAddress.getLocalHost().getHostName();
343 if (localName != null && isUseLocalHost()) {
344 if (localName.equals(host)) {
345 return "localhost";
346 }
347 }
348 return host;
349 }
350
351 /**
352 * Configures the socket for use
353 *
354 * @param sock
355 * @throws SocketException
356 */
357 protected void initialiseSocket(Socket sock) throws SocketException {
358 if (socketOptions != null) {
359 IntrospectionSupport.setProperties(socket, socketOptions);
360 }
361
362 try {
363 sock.setReceiveBufferSize(socketBufferSize);
364 sock.setSendBufferSize(socketBufferSize);
365 } catch (SocketException se) {
366 LOG.warn("Cannot set socket buffer size = " + socketBufferSize);
367 LOG.debug("Cannot set socket buffer size. Reason: " + se, se);
368 }
369 sock.setSoTimeout(soTimeout);
370
371 if (keepAlive != null) {
372 sock.setKeepAlive(keepAlive.booleanValue());
373 }
374 if (tcpNoDelay != null) {
375 sock.setTcpNoDelay(tcpNoDelay.booleanValue());
376 }
377 }
378
379 protected void doStart() throws Exception {
380 connect();
381 stoppedLatch.set(new CountDownLatch(1));
382 super.doStart();
383 }
384
385 protected void connect() throws Exception {
386
387 if (socket == null && socketFactory == null) {
388 throw new IllegalStateException("Cannot connect if the socket or socketFactory have not been set");
389 }
390
391 InetSocketAddress localAddress = null;
392 InetSocketAddress remoteAddress = null;
393
394 if (localLocation != null) {
395 localAddress = new InetSocketAddress(InetAddress.getByName(localLocation.getHost()),
396 localLocation.getPort());
397 }
398
399 if (remoteLocation != null) {
400 String host = resolveHostName(remoteLocation.getHost());
401 remoteAddress = new InetSocketAddress(host, remoteLocation.getPort());
402 }
403
404 if (socket != null) {
405
406 if (localAddress != null) {
407 socket.bind(localAddress);
408 }
409
410 // If it's a server accepted socket.. we don't need to connect it
411 // to a remote address.
412 if (remoteAddress != null) {
413 if (connectionTimeout >= 0) {
414 socket.connect(remoteAddress, connectionTimeout);
415 } else {
416 socket.connect(remoteAddress);
417 }
418 }
419
420 } else {
421 // For SSL sockets.. you can't create an unconnected socket :(
422 // This means the timout option are not supported either.
423 if (localAddress != null) {
424 socket = socketFactory.createSocket(remoteAddress.getAddress(), remoteAddress.getPort(),
425 localAddress.getAddress(), localAddress.getPort());
426 } else {
427 socket = socketFactory.createSocket(remoteAddress.getAddress(), remoteAddress.getPort());
428 }
429 }
430
431 initialiseSocket(socket);
432 initializeStreams();
433 }
434
435 protected void doStop(ServiceStopper stopper) throws Exception {
436 if (LOG.isDebugEnabled()) {
437 LOG.debug("Stopping transport " + this);
438 }
439
440 // Closing the streams flush the sockets before closing.. if the socket
441 // is hung.. then this hangs the close.
442 // closeStreams();
443 if (socket != null) {
444 //closing the socket can hang also
445 final CountDownLatch latch = new CountDownLatch(1);
446 SOCKET_CLOSE.execute(new Runnable() {
447
448 public void run() {
449 try {
450 socket.close();
451 } catch (IOException e) {
452 LOG.debug("Caught exception closing socket",e);
453 }finally {
454 latch.countDown();
455 }
456 }
457
458 });
459 latch.await(1,TimeUnit.SECONDS);
460
461 }
462 }
463
464 /**
465 * Override so that stop() blocks until the run thread is no longer running.
466 */
467 @Override
468 public void stop() throws Exception {
469 super.stop();
470 CountDownLatch countDownLatch = stoppedLatch.get();
471 if (countDownLatch != null && Thread.currentThread() != this.runnerThread) {
472 countDownLatch.await(1,TimeUnit.SECONDS);
473 }
474 }
475
476 protected void initializeStreams() throws Exception {
477 TcpBufferedInputStream buffIn = new TcpBufferedInputStream(socket.getInputStream(), ioBufferSize);
478 this.dataIn = new DataInputStream(buffIn);
479 TcpBufferedOutputStream buffOut = new TcpBufferedOutputStream(socket.getOutputStream(), ioBufferSize);
480 this.dataOut = new DataOutputStream(buffOut);
481 }
482
483 protected void closeStreams() throws IOException {
484 if (dataOut != null) {
485 dataOut.close();
486 }
487 if (dataIn != null) {
488 dataIn.close();
489 }
490 }
491
492 public void setSocketOptions(Map<String, Object> socketOptions) {
493 this.socketOptions = new HashMap<String, Object>(socketOptions);
494 }
495
496 public String getRemoteAddress() {
497 if (socket != null) {
498 return "" + socket.getRemoteSocketAddress();
499 }
500 return null;
501 }
502
503 @Override
504 public <T> T narrow(Class<T> target) {
505 if (target == Socket.class) {
506 return target.cast(socket);
507 }
508 return super.narrow(target);
509 }
510
511 static {
512 SOCKET_CLOSE = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() {
513 public Thread newThread(Runnable runnable) {
514 Thread thread = new Thread(runnable, "TcpSocketClose: "+runnable);
515 thread.setDaemon(true);
516 return thread;
517 }
518 });
519 }
520 }