1 /*
2 * JBoss, Home of Professional Open Source
3 * Copyright 2005, JBoss Inc., and individual contributors as indicated
4 * by the @authors tag. See the copyright.txt in the distribution for a
5 * full listing of individual contributors.
6 *
7 * This is free software; you can redistribute it and/or modify it
8 * under the terms of the GNU Lesser General Public License as
9 * published by the Free Software Foundation; either version 2.1 of
10 * the License, or (at your option) any later version.
11 *
12 * This software is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this software; if not, write to the Free
19 * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
20 * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
21 */
22 package org.jboss.mq.il.uil2;
23
24 import java.io.Serializable;
25 import java.io.IOException;
26 import java.net.InetAddress;
27 import java.net.ConnectException;
28 import java.net.Socket;
29 import javax.jms.Destination;
30 import javax.jms.JMSException;
31 import javax.jms.Queue;
32 import javax.jms.TemporaryQueue;
33 import javax.jms.TemporaryTopic;
34 import javax.jms.Topic;
35 import javax.net.SocketFactory;
36 import javax.transaction.xa.Xid;
37
38 import org.jboss.logging.Logger;
39 import org.jboss.mq.AcknowledgementRequest;
40 import org.jboss.mq.Connection;
41 import org.jboss.mq.ConnectionToken;
42 import org.jboss.mq.DurableSubscriptionID;
43 import org.jboss.mq.Recoverable;
44 import org.jboss.mq.SpyDestination;
45 import org.jboss.mq.SpyMessage;
46 import org.jboss.mq.TransactionRequest;
47 import org.jboss.mq.il.ServerIL;
48 import org.jboss.mq.il.uil2.msgs.MsgTypes;
49 import org.jboss.mq.il.uil2.msgs.ConnectionTokenMsg;
50 import org.jboss.mq.il.uil2.msgs.EnableConnectionMsg;
51 import org.jboss.mq.il.uil2.msgs.GetIDMsg;
52 import org.jboss.mq.il.uil2.msgs.RecoverMsg;
53 import org.jboss.mq.il.uil2.msgs.TemporaryDestMsg;
54 import org.jboss.mq.il.uil2.msgs.AcknowledgementRequestMsg;
55 import org.jboss.mq.il.uil2.msgs.AddMsg;
56 import org.jboss.mq.il.uil2.msgs.BrowseMsg;
57 import org.jboss.mq.il.uil2.msgs.CheckIDMsg;
58 import org.jboss.mq.il.uil2.msgs.CheckUserMsg;
59 import org.jboss.mq.il.uil2.msgs.CloseMsg;
60 import org.jboss.mq.il.uil2.msgs.CreateDestMsg;
61 import org.jboss.mq.il.uil2.msgs.DeleteTemporaryDestMsg;
62 import org.jboss.mq.il.uil2.msgs.DeleteSubscriptionMsg;
63 import org.jboss.mq.il.uil2.msgs.PingMsg;
64 import org.jboss.mq.il.uil2.msgs.ReceiveMsg;
65 import org.jboss.mq.il.uil2.msgs.SubscribeMsg;
66 import org.jboss.mq.il.uil2.msgs.TransactMsg;
67 import org.jboss.mq.il.uil2.msgs.UnsubscribeMsg;
68
69 /** The UILServerIL is created on the server and copied to the client during
70 * connection factory lookups. It represents the transport interface to the
71 * JMS server.
72 *
73 * @author Scott.Stark@jboss.org
74 * @version $Revision: 45317 $
75 */
76 public class UILServerIL
77 implements Cloneable, MsgTypes, Serializable, ServerIL, Recoverable
78 {
79 /** @since 1.7, at least jboss-3.2.5, jboss-4.0.0 */
80 private static final long serialVersionUID = 853594001646066224L;
81 private static Logger log = Logger.getLogger(UILServerIL.class);
82
83 /** The org.jboss.mq.il.uil2.useServerHost system property allows a client to
84 * to connect to the host name rather than the ip address
85 */
86 private final static String USE_SERVER_HOST = "org.jboss.mq.il.uil2.useServerHost";
87
88 /** The org.jboss.mq.il.uil2.localAddr system property allows a client to
89 *define the local interface to which its sockets should be bound
90 */
91 private final static String LOCAL_ADDR = "org.jboss.mq.il.uil2.localAddr";
92 /** The org.jboss.mq.il.uil2.localPort system property allows a client to
93 *define the local port to which its sockets should be bound
94 */
95 private final static String LOCAL_PORT = "org.jboss.mq.il.uil2.localPort";
96 /** The org.jboss.mq.il.uil2.serverAddr system property allows a client to
97 * override the address to which it attempts to connect to. This is useful
98 * for networks where NAT is ocurring between the client and jms server.
99 */
100 private final static String SERVER_ADDR = "org.jboss.mq.il.uil2.serverAddr";
101 /** The org.jboss.mq.il.uil2.serverPort system property allows a client to
102 * override the port to which it attempts to connect. This is useful for
103 * for networks where port forwarding is ocurring between the client and jms
104 * server.
105 */
106 private final static String SERVER_PORT = "org.jboss.mq.il.uil2.serverPort";
107 /** The org.jboss.mq.il.uil2.retryCount controls the number of attempts to
108 * retry connecting to the jms server. Retries are only made for
109 * java.net.ConnectException failures. A value <= 0 means no retry atempts
110 * will be made.
111 */
112 private final static String RETRY_COUNT = "org.jboss.mq.il.uil2.retryCount";
113 /** The org.jboss.mq.il.uil2.retryDelay controls the delay in milliseconds
114 * between retries due to ConnectException failures.
115 */
116 private final static String RETRY_DELAY = "org.jboss.mq.il.uil2.retryDelay";
117
118 /** The server host name/IP to connect to as defined by the jms server.
119 */
120 private InetAddress addr;
121 /** The server port to connect to as defined by the jms server.
122 */
123 private int port;
124 /** The name of the class implementing the javax.net.SocketFactory to
125 * use for creating the client socket.
126 */
127 private String socketFactoryName;
128
129 /**
130 * If the TcpNoDelay option should be used on the socket.
131 */
132 private boolean enableTcpNoDelay = false;
133
134 /**
135 * The client side read timeout
136 */
137 private int soTimeout = 0;
138
139 /**
140 * The connect address
141 */
142 private String connectAddress;
143
144 /**
145 * The connect port
146 */
147 private int connectPort = 0;
148
149 /**
150 * The buffer size.
151 */
152 private int bufferSize;
153
154 /**
155 * The chunk size.
156 */
157 private int chunkSize;
158
159 /** The local interface name/IP to use for the client
160 */
161 private transient InetAddress localAddr;
162 /** The local port to use for the client
163 */
164 private transient int localPort;
165
166 /**
167 * Description of the Field
168 */
169 protected transient Socket socket;
170 /**
171 * Description of the Field
172 */
173 protected transient SocketManager socketMgr;
174
175 public UILServerIL(InetAddress addr, int port, String socketFactoryName,
176 boolean enableTcpNoDelay, int bufferSize, int chunkSize, int soTimeout, String connectAddress, int connectPort)
177 throws Exception
178 {
179 this.addr = addr;
180 this.port = port;
181 this.socketFactoryName = socketFactoryName;
182 this.enableTcpNoDelay = enableTcpNoDelay;
183 this.bufferSize = bufferSize;
184 this.chunkSize = chunkSize;
185 this.soTimeout = soTimeout;
186 this.connectAddress = connectAddress;
187 this.connectPort = connectPort;
188 }
189
190 public void setConnectionToken(ConnectionToken dest)
191 throws Exception
192 {
193 ConnectionTokenMsg msg = new ConnectionTokenMsg(dest);
194 getSocketMgr().sendMessage(msg);
195 }
196
197 public void setEnabled(ConnectionToken dc, boolean enabled)
198 throws JMSException, Exception
199 {
200 EnableConnectionMsg msg = new EnableConnectionMsg(enabled);
201 getSocketMgr().sendMessage(msg);
202 }
203
204 public String getID()
205 throws Exception
206 {
207 GetIDMsg msg = new GetIDMsg();
208 getSocketMgr().sendMessage(msg);
209 String id = msg.getID();
210 return id;
211 }
212
213 public TemporaryQueue getTemporaryQueue(ConnectionToken dc)
214 throws JMSException, Exception
215 {
216 TemporaryDestMsg msg = new TemporaryDestMsg(true);
217 getSocketMgr().sendMessage(msg);
218 TemporaryQueue dest = msg.getQueue();
219 return dest;
220 }
221
222 public TemporaryTopic getTemporaryTopic(ConnectionToken dc)
223 throws JMSException, Exception
224 {
225 TemporaryDestMsg msg = new TemporaryDestMsg(false);
226 getSocketMgr().sendMessage(msg);
227 TemporaryTopic dest = msg.getTopic();
228 return dest;
229 }
230
231 public void acknowledge(ConnectionToken dc, AcknowledgementRequest item)
232 throws JMSException, Exception
233 {
234 AcknowledgementRequestMsg msg = new AcknowledgementRequestMsg(item);
235 if (item.isAck())
236 getSocketMgr().sendMessage(msg);
237 else
238 getSocketMgr().sendOneWay(msg);
239 }
240
241 public void addMessage(ConnectionToken dc, SpyMessage val)
242 throws Exception
243 {
244 AddMsg msg = new AddMsg(val);
245 getSocketMgr().sendMessage(msg);
246 }
247
248 public SpyMessage[] browse(ConnectionToken dc, Destination dest, String selector)
249 throws JMSException, Exception
250 {
251 BrowseMsg msg = new BrowseMsg(dest, selector);
252 getSocketMgr().sendMessage(msg);
253 SpyMessage[] msgs = msg.getMessages();
254 return msgs;
255 }
256
257 public void checkID(String id)
258 throws JMSException, Exception
259 {
260 CheckIDMsg msg = new CheckIDMsg(id);
261 getSocketMgr().sendMessage(msg);
262 }
263
264 public String checkUser(String username, String password)
265 throws JMSException, Exception
266 {
267 CheckUserMsg msg = new CheckUserMsg(username, password, false);
268 getSocketMgr().sendMessage(msg);
269 String clientID = msg.getID();
270 return clientID;
271 }
272
273 public String authenticate(String username, String password)
274 throws JMSException, Exception
275 {
276 CheckUserMsg msg = new CheckUserMsg(username, password, true);
277 getSocketMgr().sendMessage(msg);
278 String sessionID = msg.getID();
279 return sessionID;
280 }
281
282 public Object clone()
283 throws CloneNotSupportedException
284 {
285 return super.clone();
286 }
287
288 public ServerIL cloneServerIL()
289 throws Exception
290 {
291 return (ServerIL)clone();
292 }
293
294 public void connectionClosing(ConnectionToken dc)
295 throws JMSException, Exception
296 {
297 CloseMsg msg = new CloseMsg();
298 try
299 {
300 getSocketMgr().sendMessage(msg);
301 }
302 catch (IOException ignored)
303 {
304 }
305 destroyConnection();
306 }
307
308 public Queue createQueue(ConnectionToken dc, String destName)
309 throws JMSException, Exception
310 {
311 CreateDestMsg msg = new CreateDestMsg(destName, true);
312 getSocketMgr().sendMessage(msg);
313 Queue dest = msg.getQueue();
314 return dest;
315 }
316
317 public Topic createTopic(ConnectionToken dc, String destName)
318 throws JMSException, Exception
319 {
320 CreateDestMsg msg = new CreateDestMsg(destName, false);
321 getSocketMgr().sendMessage(msg);
322 Topic dest = msg.getTopic();
323 return dest;
324 }
325
326 public void deleteTemporaryDestination(ConnectionToken dc, SpyDestination dest)
327 throws JMSException, Exception
328 {
329 DeleteTemporaryDestMsg msg = new DeleteTemporaryDestMsg(dest);
330 getSocketMgr().sendMessage(msg);
331 }
332
333 public void destroySubscription(ConnectionToken dc,DurableSubscriptionID id)
334 throws JMSException, Exception
335 {
336 DeleteSubscriptionMsg msg = new DeleteSubscriptionMsg(id);
337 getSocketMgr().sendMessage(msg);
338 }
339
340 public void ping(ConnectionToken dc, long clientTime)
341 throws Exception
342 {
343 PingMsg msg = new PingMsg(clientTime, true);
344 msg.getMsgID();
345 getSocketMgr().sendReply(msg);
346 }
347
348 public SpyMessage receive(ConnectionToken dc, int subscriberId, long wait)
349 throws Exception, Exception
350 {
351 ReceiveMsg msg = new ReceiveMsg(subscriberId, wait);
352 getSocketMgr().sendMessage(msg);
353 SpyMessage reply = msg.getMessage();
354 return reply;
355 }
356
357 public void subscribe(ConnectionToken dc, org.jboss.mq.Subscription s)
358 throws JMSException, Exception
359 {
360 SubscribeMsg msg = new SubscribeMsg(s);
361 getSocketMgr().sendMessage(msg);
362 }
363
364 public void transact(ConnectionToken dc, TransactionRequest t)
365 throws JMSException, Exception
366 {
367 TransactMsg msg = new TransactMsg(t);
368 getSocketMgr().sendMessage(msg);
369 }
370
371 public Xid[] recover(ConnectionToken dc, int flags) throws Exception
372 {
373 RecoverMsg msg = new RecoverMsg(flags);
374 getSocketMgr().sendMessage(msg);
375 Xid[] reply = msg.getXids();
376 return reply;
377 }
378
379 public void unsubscribe(ConnectionToken dc, int subscriptionID)
380 throws JMSException, Exception
381 {
382 UnsubscribeMsg msg = new UnsubscribeMsg(subscriptionID);
383 getSocketMgr().sendMessage(msg);
384 }
385
386 final SocketManager getSocketMgr()
387 throws Exception
388 {
389 if( socketMgr == null )
390 createConnection();
391 return socketMgr;
392 }
393
394 protected void checkConnection()
395 throws Exception
396 {
397 if (socketMgr == null)
398 {
399 createConnection();
400 }
401 }
402
403 /**
404 * Used to establish a new connection to the server
405 *
406 * @exception Exception Description of Exception
407 */
408 protected void createConnection()
409 throws Exception
410 {
411 boolean tracing = log.isTraceEnabled();
412
413 /** Attempt to load the socket factory and if this fails, use the
414 * default socket factory impl.
415 */
416 SocketFactory socketFactory = null;
417 if( socketFactoryName != null )
418 {
419 try
420 {
421 ClassLoader loader = Thread.currentThread().getContextClassLoader();
422 Class factoryClass = loader.loadClass(socketFactoryName);
423 socketFactory = (SocketFactory) factoryClass.newInstance();
424 }
425 catch(Exception e)
426 {
427 log.debug("Failed to load socket factory: "+socketFactoryName, e);
428 }
429 }
430 // Use the default socket factory
431 if( socketFactory == null )
432 {
433 socketFactory = SocketFactory.getDefault();
434 }
435
436 // Look for a local address and port as properties
437 String tmp = getProperty(LOCAL_ADDR);
438 if( tmp != null )
439 this.localAddr = InetAddress.getByName(tmp);
440 tmp = getProperty(LOCAL_PORT);
441 if( tmp != null )
442 this.localPort = Integer.parseInt(tmp);
443
444 // Look for client side overrides of the server address/port
445 InetAddress serverAddr = addr;
446 int serverPort = port;
447 tmp = getProperty(SERVER_ADDR);
448 if (tmp == null)
449 tmp = connectAddress;
450 if( tmp != null )
451 serverAddr = InetAddress.getByName(tmp);
452 tmp = getProperty(SERVER_PORT);
453 if( tmp != null )
454 serverPort = Integer.parseInt(tmp);
455 else if (connectPort != 0)
456 serverPort = connectPort;
457
458 String useHostNameProp = getProperty(USE_SERVER_HOST);
459 String serverHost = serverAddr.getHostAddress();
460 if (Boolean.valueOf(useHostNameProp).booleanValue())
461 serverHost = serverAddr.getHostName();
462
463 int retries = 0;
464 // Default to 10 retries, no delay in the absence of user override
465 int maxRetries = 10;
466 tmp = getProperty(RETRY_COUNT);
467 if( tmp != null )
468 maxRetries = Integer.parseInt(tmp);
469 long retryDelay = 0;
470 tmp = getProperty(RETRY_DELAY);
471 if( tmp != null )
472 {
473 retryDelay = Long.parseLong(tmp);
474 if( retryDelay < 0 )
475 retryDelay = 0;
476 }
477 if( tracing )
478 log.trace("Begin connect loop, maxRetries="+maxRetries+", delay="+retryDelay);
479
480 while (true)
481 {
482 try
483 {
484 if( tracing )
485 {
486 log.trace("Connecting with addr="+serverHost+", port="+serverPort
487 + ", localAddr="+localAddr+", localPort="+localPort
488 + ", socketFactory="+socketFactory
489 + ", enableTcpNoDelay="+enableTcpNoDelay
490 + ", bufferSize="+bufferSize
491 + ", chunkSize="+chunkSize
492 );
493 }
494 if( localAddr != null )
495 socket = socketFactory.createSocket(serverHost, serverPort, localAddr, localPort);
496 else
497 socket = socketFactory.createSocket(serverHost, serverPort);
498 break;
499 }
500 catch (ConnectException e)
501 {
502 if (++retries > maxRetries)
503 throw e;
504 if( tracing )
505 log.trace("Failed to connect, retries="+retries, e);
506 }
507 try
508 {
509 Thread.sleep(retryDelay);
510 }
511 catch(InterruptedException e)
512 {
513 break;
514 }
515 }
516
517 socket.setTcpNoDelay(enableTcpNoDelay);
518 if (soTimeout != 0)
519 socket.setSoTimeout(soTimeout);
520 socketMgr = new SocketManager(socket);
521 socketMgr.setBufferSize(bufferSize);
522 socketMgr.setChunkSize(chunkSize);
523 socketMgr.start(Connection.getThreadGroup());
524 }
525
526 /**
527 * Used to close the current connection with the server
528 *
529 */
530 protected void destroyConnection()
531 {
532 try
533 {
534 if( socket != null )
535 {
536 try
537 {
538 socketMgr.stop();
539 }
540 finally
541 {
542 socket.close();
543 }
544 }
545 }
546 catch(IOException ignore)
547 {
548 }
549 }
550
551 private String getProperty(String name)
552 {
553 String value = null;
554 try
555 {
556 value = System.getProperty(name);
557 }
558 catch (Throwable ignored)
559 {
560 log.trace("Cannot retrieve system property " + name);
561 }
562 return value;
563 }
564 }