1 /*
2 * JBoss, Home of Professional Open Source.
3 * Copyright 2006, Red Hat Middleware LLC, and individual contributors
4 * as indicated by the @author tags. See the copyright.txt file in the
5 * distribution for a full listing of individual contributors.
6 *
7 * This is free software; you can redistribute it and/or modify it
8 * under the terms of the GNU Lesser General Public License as
9 * published by the Free Software Foundation; either version 2.1 of
10 * the License, or (at your option) any later version.
11 *
12 * This software is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this software; if not, write to the Free
19 * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
20 * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
21 */
22 package org.jboss.mq;
23
24 import java.io.IOException;
25 import java.io.Serializable;
26 import java.util.Arrays;
27 import java.util.HashMap;
28 import java.util.HashSet;
29 import java.util.LinkedList;
30
31 import javax.jms.ConnectionMetaData;
32 import javax.jms.Destination;
33 import javax.jms.ExceptionListener;
34 import javax.jms.IllegalStateException;
35 import javax.jms.JMSException;
36 import javax.jms.JMSSecurityException;
37 import javax.jms.Queue;
38 import javax.jms.TemporaryQueue;
39 import javax.jms.TemporaryTopic;
40 import javax.transaction.xa.Xid;
41
42 import org.jboss.logging.Logger;
43 import org.jboss.mq.il.ClientILService;
44 import org.jboss.mq.il.ServerIL;
45 import org.jboss.util.UnreachableStatementException;
46
47 import EDU.oswego.cs.dl.util.concurrent.ClockDaemon;
48 import EDU.oswego.cs.dl.util.concurrent.Semaphore;
49 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
50 import EDU.oswego.cs.dl.util.concurrent.ThreadFactory;
51
52 /**
53 * This class implements javax.jms.Connection.
54 *
55 * <p>
56 * It is also the gateway through wich all calls to the JMS server is done. To
57 * do its work it needs a ServerIL to invoke (@see
58 * org.jboss.mq.server.ServerIL).
59 * </p>
60 *
61 * <p>
62 * The (new from february 2002) logic for clientID is the following: if logging
63 * in with a user and passwork a preconfigured clientID may be automatically
64 * delivered from the server.
65 * </p>
66 *
67 * <p>
68 * If the client wants to set it's own clientID it must do so on a connection
69 * wich does not have a prefonfigured clientID and it must do so before it
70 * calls any other methods on the connection (even getClientID()). It is not
71 * allowable to use a clientID that either looks like JBossMQ internal one
72 * (beginning with ID) or a clientID that is allready in use by someone, or a
73 * clientID that is already preconfigured in the server.
74 * </p>
75 *
76 * <p>
77 * If a preconfigured ID is not get, or a valid one is not set, the server will
78 * set an internal ID. This ID is NEVER possible to use for durable
79 * subscriptions. If a prefconfigured ID or one manually set is possible to use
80 * to create a durable subscriptions is governed by the security configuration
81 * of JBossMQ. In the default setup, only preconfigured clientID's are possible
82 * to use. If using a SecurityManager, permissions to create a surable
83 * subscriptions is * the resiult of a combination of the following:
84 * </p>
85 * <p>- The clientID is not one of JBossMQ's internal.
86 * </p>
87 * <p>- The user is authenticated and has a role that has create set to true
88 * in the security config of the destination.
89 * </p>
90 *
91 * <p>
92 * Notes for JBossMQ developers: All calls, except close(), that is possible to
93 * do on a connection must call checkClientID()
94 * </p>
95 *
96 * @author Norbert Lataille (Norbert.Lataille@m4x.org)
97 * @author Hiram Chirino (Cojonudo14@hotmail.com)
98 * @author <a href="pra@tim.se">Peter Antman</a>
99 * @author <a href="mailto:adrian@jboss.org">Adrian Brock</a>
100 * @version $Revision: 61741 $
101 */
102 public abstract class Connection implements Serializable, javax.jms.Connection
103 {
104 /** The serialVersionUID */
105 private static final long serialVersionUID = 87938199839407082L;
106
107 /** The threadGroup */
108 private static ThreadGroup threadGroup = new ThreadGroup("JBossMQ Client Threads");
109
110 /** The log */
111 static Logger log = Logger.getLogger(Connection.class);
112
113 /** Whether trace is enabled */
114 static boolean trace = log.isTraceEnabled();
115
116 /** Manages the thread that pings the connection to see if it is 'alive' */
117 static protected ClockDaemon clockDaemon = new ClockDaemon();
118
119 /** Maps a destination to a LinkedList of Subscriptions */
120 public HashMap destinationSubscriptions = new HashMap();
121
122 /** Maps a subscription id to a Subscription */
123 public HashMap subscriptions = new HashMap();
124
125 /** Is the connection stopped ? */
126 public boolean modeStop;
127
128 /** This is our connection to the JMS server */
129 protected ServerIL serverIL;
130
131 /** This is the clientID */
132 protected String clientID;
133
134 /** The connection token is used to identify our connection to the server. */
135 protected ConnectionToken connectionToken;
136
137 /** The object that sets up the client IL */
138 protected ClientILService clientILService;
139
140 /** How often to ping the connection */
141 protected long pingPeriod = 1000 * 60;
142
143 /** This field is reset when a ping is sent, set when ponged. */
144 protected boolean ponged = true;
145
146 /** This is used to know when the PingTask is running */
147 Semaphore pingTaskSemaphore = new Semaphore(1);
148
149 /** Identifies the PinkTask in the ClockDaemon */
150 Object pingTaskId;
151
152 /** Set a soon as close() is called on the connection. */
153 private SynchronizedBoolean closing = new SynchronizedBoolean(false);
154
155 /** Whether setClientId is Allowed */
156 private volatile boolean setClientIdAllowed = true;
157
158 /** LinkedList of all created sessions by this connection */
159 HashSet createdSessions;
160
161 /** Numbers subscriptions */
162 int subscriptionCounter = Integer.MIN_VALUE;
163
164 /** The lock for subscriptionCounter */
165 Object subCountLock = new Object();
166
167 /** Is the connection closed */
168 private SynchronizedBoolean closed = new SynchronizedBoolean(false);
169
170 /** Used to control tranactions */
171 SpyXAResourceManager spyXAResourceManager;
172
173 /** The class that created this connection */
174 GenericConnectionFactory genericConnectionFactory;
175
176 /** Last message ID returned */
177 private int lastMessageID;
178
179 /** the exceptionListener */
180 private ExceptionListener exceptionListener;
181
182 /** The exception listener lock */
183 private Object elLock = new Object();
184
185 /** The exception listener invocation thread */
186 private Thread elThread;
187
188 /** Used in message id generation */
189 private StringBuffer sb = new StringBuffer();
190
191 /** Used in message id generation */
192 private char[] charStack = new char[22];
193
194 /** The next session id */
195 String sessionId;
196
197 /** Temporary destinations created by this connection */
198 protected HashSet temps = new HashSet();
199
200 static
201 {
202 log.debug("Setting the clockDaemon's thread factory");
203 clockDaemon.setThreadFactory(new ThreadFactory()
204 {
205 public Thread newThread(Runnable r)
206 {
207 Thread t = new Thread(getThreadGroup(), r, "Connection Monitor Thread");
208 t.setDaemon(true);
209 return t;
210 }
211 });
212 }
213
214 public static ThreadGroup getThreadGroup()
215 {
216 if (threadGroup.isDestroyed())
217 threadGroup = new ThreadGroup("JBossMQ Client Threads");
218 return threadGroup;
219 }
220
221 /**
222 * Create a new Connection
223 *
224 * @param userName the username
225 * @param password the password
226 * @param genericConnectionFactory the constructing class
227 * @throws JMSException for any error
228 */
229 Connection(String userName, String password, GenericConnectionFactory genericConnectionFactory) throws JMSException
230 {
231 //Set the attributes
232 createdSessions = new HashSet();
233 connectionToken = null;
234 lastMessageID = 0;
235 modeStop = true;
236
237 if (trace)
238 log.trace("Connection Initializing userName=" + userName + " " + this);
239 this.genericConnectionFactory = genericConnectionFactory;
240 genericConnectionFactory.initialise(this);
241
242 // Connect to the server
243 if (trace)
244 log.trace("Getting the serverIL " + this);
245 serverIL = genericConnectionFactory.createServerIL();
246 if (trace)
247 log.trace("serverIL=" + serverIL + " " + this);
248
249 // Register ourselves as a client
250 try
251 {
252 authenticate(userName, password);
253
254 if (userName != null)
255 askForAnID(userName, password);
256
257 startILService();
258 }
259 catch (Throwable t)
260 {
261 // Client registeration failed, close the connection
262 try
263 {
264 serverIL.connectionClosing(null);
265 }
266 catch (Throwable t2)
267 {
268 log.debug("Error closing the connection", t2);
269 }
270
271 SpyJMSException.rethrowAsJMSException("Failed to create connection", t);
272 }
273
274 // Finish constructing the connection
275 try
276 {
277 if (trace)
278 log.trace("Creating XAResourceManager " + this);
279
280 // Setup the XA Resource manager,
281 spyXAResourceManager = new SpyXAResourceManager(this);
282
283 if (trace)
284 log.trace("Starting the ping thread " + this);
285 startPingThread();
286
287 if (trace)
288 log.trace("Connection establishment successful " + this);
289 }
290 catch (Throwable t)
291 {
292 // Could not complete the connection, tidy up
293 // the server and client ILs.
294 try
295 {
296 serverIL.connectionClosing(connectionToken);
297 }
298 catch (Throwable t2)
299 {
300 log.debug("Error closing the connection", t2);
301 }
302 try
303 {
304 stopILService();
305 }
306 catch (Throwable t2)
307 {
308 log.debug("Error stopping the client IL", t2);
309 }
310
311 SpyJMSException.rethrowAsJMSException("Failed to create connection", t);
312 }
313 }
314
315 /**
316 * Create a new Connection
317 *
318 * @param genericConnectionFactory the constructing class
319 * @throws JMSException for any error
320 */
321 Connection(GenericConnectionFactory genericConnectionFactory) throws JMSException
322 {
323 this(null, null, genericConnectionFactory);
324 }
325
326 /**
327 * Gets the ServerIL attribute of the Connection object
328 *
329 * @return The ServerIL value
330 */
331 public ServerIL getServerIL()
332 {
333 return serverIL;
334 }
335
336 /**
337 * Notification from the server that the connection is closed
338 */
339 public void asynchClose()
340 {
341 // If we receive a close and we did not initiate it, then fire the exception listener
342 if (closing.get() == false)
343 asynchFailure("Asynchronous close from server.", new IOException("Close request from the server or transport layer."));
344 }
345
346 /**
347 * Called by a TemporaryDestination which is going to be deleted()
348 *
349 * @param dest the temporary destination
350 */
351 public void asynchDeleteTemporaryDestination(SpyDestination dest)
352 {
353 if (trace)
354 log.trace("Deleting temporary destination " + dest);
355 try
356 {
357 deleteTemporaryDestination(dest);
358 }
359 catch (Throwable t)
360 {
361 asynchFailure("Error deleting temporary destination " + dest, t);
362 }
363 }
364
365 /**
366 * Gets the first consumer that is listening to a destination.
367 *
368 * @param requests the receive requests
369 */
370 public void asynchDeliver(ReceiveRequest requests[])
371 {
372 // If we are closing the connection, the server will nack the messages
373 if (closing.get())
374 return;
375
376 if (trace)
377 log.trace("Async deliver requests=" + Arrays.asList(requests) + " " + this);
378
379 try
380 {
381 for (int i = 0; i < requests.length; i++)
382 {
383 ReceiveRequest r = requests[i];
384 if (trace)
385 log.trace("Processing request=" + r + " " + this);
386
387 SpyConsumer consumer = (SpyConsumer) subscriptions.get(r.subscriptionId);
388 r.message.createAcknowledgementRequest(r.subscriptionId.intValue());
389
390 if (consumer == null)
391 {
392 send(r.message.getAcknowledgementRequest(false));
393 log.debug("WARNING: NACK issued due to non existent subscription " + r.message.header.messageId);
394 continue;
395 }
396
397 if (trace)
398 log.trace("Delivering messageid=" + r.message.header.messageId + " to consumer=" + consumer);
399
400 consumer.addMessage(r.message);
401 }
402 }
403 catch (Throwable t)
404 {
405 asynchFailure("Error during async delivery", t);
406 }
407 }
408 /**
409 * Notification of a failure on this connection
410 *
411 * @param reason the reason for the failure
412 * @param t the throwable
413 */
414 public void asynchFailure(String reason, Throwable t)
415 {
416 if (trace)
417 log.trace("Notified of failure reason=" + reason + " " + this, t);
418
419 // Exceptions due to closing will be ignored.
420 if (closing.get())
421 return;
422
423 JMSException excep = SpyJMSException.getAsJMSException(reason, t);
424
425 synchronized (elLock)
426 {
427 ExceptionListener el = exceptionListener;
428 if (el != null && elThread == null)
429 {
430 try
431 {
432 Runnable run = new ExceptionListenerRunnable(el, excep);
433 elThread = new Thread(getThreadGroup(), run, "ExceptionListener " + this);
434 elThread.setDaemon(false);
435 elThread.start();
436 }
437 catch (Throwable t1)
438 {
439 log.warn("Connection failure: ", excep);
440 log.warn("Unable to start exception listener thread: ", t1);
441 }
442 }
443 else if (elThread != null)
444 log.warn("Connection failure, already in the exception listener", excep);
445 else
446 log.warn("Connection failure, use javax.jms.Connection.setExceptionListener() to handle this error and reconnect", excep);
447 }
448 }
449
450 /**
451 * Invoked when the server pong us
452 *
453 * @param serverTime the server time
454 */
455 public void asynchPong(long serverTime)
456 {
457 if (trace)
458 log.trace("PONG serverTime=" + serverTime + " " + this);
459 ponged = true;
460 }
461
462 /**
463 * Called by a TemporaryDestination which is going to be deleted
464 *
465 * @param dest the temporary destination
466 * @exception JMSException for any error
467 */
468 public void deleteTemporaryDestination(SpyDestination dest) throws JMSException
469 {
470 checkClosed();
471 if (trace)
472 log.trace("DeleteDestination dest=" + dest + " " + this);
473 try
474 {
475 //Ask the broker to delete() this TemporaryDestination
476 serverIL.deleteTemporaryDestination(connectionToken, dest);
477
478 //Remove it from the destinations list
479 synchronized (subscriptions)
480 {
481 destinationSubscriptions.remove(dest);
482 }
483
484 // Remove it from the temps list
485 synchronized (temps)
486 {
487 temps.remove(dest);
488 }
489 }
490 catch (Throwable t)
491 {
492
493 SpyJMSException.rethrowAsJMSException("Cannot delete the TemporaryDestination", t);
494 }
495 }
496
497 public void setClientID(String cID) throws JMSException
498 {
499 checkClosed();
500 if (clientID != null)
501 throw new IllegalStateException("The connection has already a clientID");
502 if (setClientIdAllowed == false)
503 throw new IllegalStateException("SetClientID was not called emediately after creation of connection");
504
505 if (trace)
506 log.trace("SetClientID clientID=" + clientID + " " + this);
507
508 try
509 {
510 serverIL.checkID(cID);
511 }
512 catch (Throwable t)
513 {
514 SpyJMSException.rethrowAsJMSException("Cannot connect to the JMSServer", t);
515 }
516
517 clientID = cID;
518 connectionToken.setClientID(clientID);
519 }
520
521 public String getClientID() throws JMSException
522 {
523 checkClosed();
524 return clientID;
525 }
526
527 public ExceptionListener getExceptionListener() throws JMSException
528 {
529 checkClosed();
530 checkClientID();
531 return exceptionListener;
532 }
533
534 public void setExceptionListener(ExceptionListener listener) throws JMSException
535 {
536 checkClosed();
537 checkClientID();
538
539 exceptionListener = listener;
540 }
541
542 public ConnectionMetaData getMetaData() throws JMSException
543 {
544 checkClosed();
545 checkClientID();
546
547 return new SpyConnectionMetaData();
548 }
549
550 public synchronized void close() throws JMSException
551 {
552 if (closed.get())
553 return;
554 if (trace)
555 log.trace("Closing connection " + this);
556
557 closing.set(true);
558
559 // We don't want to notify the exception listener
560 exceptionListener = null;
561
562 // The first exception
563 JMSException exception = null;
564
565 try
566 {
567 doStop();
568 }
569 catch (Throwable t)
570 {
571 log.trace("Error during stop", t);
572 }
573
574 if (trace)
575 log.trace("Closing sessions " + this);
576 Object[] vect = null;
577 synchronized (createdSessions)
578 {
579 vect = createdSessions.toArray();
580 }
581 for (int i = 0; i < vect.length; i++)
582 {
583 SpySession session = (SpySession) vect[i];
584 try
585 {
586 session.close();
587 }
588 catch (Throwable t)
589 {
590 if (trace)
591 log.trace("Error closing session " + session, t);
592 }
593 }
594 if (trace)
595 log.trace("Closed sessions " + this);
596
597 if (trace)
598 log.trace("Notifying the server of close " + this);
599 try
600 {
601 serverIL.connectionClosing(connectionToken);
602 }
603 catch (Throwable t)
604 {
605 log.trace("Cannot close properly the connection", t);
606 }
607
608 if (trace)
609 log.trace("Stopping ping thread " + this);
610 try
611 {
612 stopPingThread();
613 }
614 catch (Throwable t)
615 {
616 if (exception == null)
617 exception = SpyJMSException.getAsJMSException("Cannot stop the ping thread", t);
618 }
619
620 if (trace)
621 log.trace("Stopping the ClientIL service " + this);
622 try
623 {
624 stopILService();
625 }
626 catch (Throwable t)
627 {
628 log.trace("Cannot stop the client il service", t);
629 }
630
631 // Only set the closed flag after all the objects that depend
632 // on this connection have been closed.
633 closed.set(true);
634
635 if (trace)
636 log.trace("Disconnected from server " + this);
637
638 // Throw the first exception
639 if (exception != null)
640 throw exception;
641 }
642
643 public void start() throws JMSException
644 {
645 checkClosed();
646 checkClientID();
647
648 if (modeStop == false)
649 return;
650 modeStop = false;
651
652 if (trace)
653 log.trace("Starting connection " + this);
654
655 try
656 {
657 serverIL.setEnabled(connectionToken, true);
658 }
659 catch (Throwable t)
660 {
661 SpyJMSException.rethrowAsJMSException("Cannot enable the connection with the JMS server", t);
662 }
663 }
664
665 public void stop() throws JMSException
666 {
667 checkClosed();
668 checkClientID();
669 doStop();
670 }
671
672 public String toString()
673 {
674 StringBuffer buffer = new StringBuffer();
675 buffer.append("Connection@").append(System.identityHashCode(this));
676 buffer.append('[');
677 if (connectionToken != null)
678 buffer.append("token=").append(connectionToken);
679 else
680 buffer.append("clientID=").append(clientID);
681 if (closed.get())
682 buffer.append(" CLOSED");
683 else if (closing.get())
684 buffer.append(" CLOSING");
685 buffer.append(" rcvstate=");
686 if (modeStop)
687 buffer.append("STOPPED");
688 else
689 buffer.append("STARTED");
690 buffer.append(']');
691 return buffer.toString();
692 }
693
694 /**
695 * Get the next message id
696 * <p>
697 *
698 * All longs are less than 22 digits long
699 * <p>
700 *
701 * Note that in this routine we assume that System.currentTimeMillis() is
702 * non-negative always be non-negative (so don't set lastMessageID to a
703 * positive for a start).
704 *
705 * @return the next message id
706 * @throws JMSException for any error
707 */
708 String getNewMessageID() throws JMSException
709 {
710 checkClosed();
711 synchronized (sb)
712 {
713 sb.setLength(0);
714 sb.append(clientID);
715 sb.append('-');
716 long time = System.currentTimeMillis();
717 int count = 0;
718 do
719 {
720 charStack[count] = (char) ('0' + (time % 10));
721 time = time / 10;
722 ++count;
723 }
724 while (time != 0);
725 --count;
726 for (; count >= 0; --count)
727 {
728 sb.append(charStack[count]);
729 }
730 ++lastMessageID;
731 //avoid having to deal with negative numbers.
732 if (lastMessageID < 0)
733 {
734 lastMessageID = 0;
735 }
736 int id = lastMessageID;
737 count = 0;
738 do
739 {
740 charStack[count] = (char) ('0' + (id % 10));
741 id = id / 10;
742 ++count;
743 }
744 while (id != 0);
745 --count;
746 for (; count >= 0; --count)
747 {
748 sb.append(charStack[count]);
749 }
750 return sb.toString();
751 }
752 }
753
754 /**
755 * A new Consumer has been created.
756 * <p>
757 * We have to handle security issues, a consumer may actually not be allowed
758 * to be created
759 *
760 * @param consumer the consumer added
761 * @throws JMSException for any error
762 */
763 void addConsumer(SpyConsumer consumer) throws JMSException
764 {
765 checkClosed();
766 Subscription req = consumer.getSubscription();
767 synchronized (subCountLock)
768 {
769 req.subscriptionId = subscriptionCounter++;
770 }
771 req.connectionToken = connectionToken;
772 if (trace)
773 log.trace("addConsumer sub=" + req);
774
775 try
776 {
777 synchronized (subscriptions)
778 {
779 subscriptions.put(new Integer(req.subscriptionId), consumer);
780
781 LinkedList ll = (LinkedList) destinationSubscriptions.get(req.destination);
782 if (ll == null)
783 {
784 ll = new LinkedList();
785 destinationSubscriptions.put(req.destination, ll);
786 }
787
788 ll.add(consumer);
789 }
790
791 serverIL.subscribe(connectionToken, req);
792 }
793 catch (JMSSecurityException ex)
794 {
795 removeConsumerInternal(consumer);
796 throw ex;
797 }
798 catch (Throwable t)
799 {
800 SpyJMSException.rethrowAsJMSException("Cannot subscribe to this Destination: ", t);
801 }
802 }
803
804 /**
805 * Browse a queue
806 *
807 * @param queue the queue
808 * @param selector the selector
809 * @return an array of messages
810 * @exception JMSException for any error
811 */
812 SpyMessage[] browse(Queue queue, String selector) throws JMSException
813 {
814 checkClosed();
815 if (trace)
816 log.trace("Browsing queue=" + queue + " selector=" + selector + " " + this);
817
818 try
819 {
820 return serverIL.browse(connectionToken, queue, selector);
821 }
822 catch (Throwable t)
823 {
824 SpyJMSException.rethrowAsJMSException("Cannot browse the Queue", t);
825 throw new UnreachableStatementException();
826 }
827 }
828
829 /**
830 * Ping the server
831 *
832 * @param clientTime the start of the ping
833 * @throws JMSException for any error
834 */
835 void pingServer(long clientTime) throws JMSException
836 {
837 checkClosed();
838 trace = log.isTraceEnabled();
839 if (trace)
840 log.trace("PING " + clientTime + " " + this);
841
842 try
843 {
844 serverIL.ping(connectionToken, clientTime);
845 }
846 catch (Throwable t)
847 {
848 SpyJMSException.rethrowAsJMSException("Cannot ping the JMS server", t);
849 }
850 }
851
852 /**
853 * Receive a message
854 *
855 * @param sub the subscription
856 * @param wait the wait time
857 * @return the message or null if there isn't one
858 * @throws JMSException for any error
859 */
860 SpyMessage receive(Subscription sub, long wait) throws JMSException
861 {
862 checkClosed();
863 if (trace)
864 log.trace("Receive subscription=" + sub + " wait=" + wait);
865
866 try
867 {
868 SpyMessage message = serverIL.receive(connectionToken, sub.subscriptionId, wait);
869 if (message != null)
870 message.createAcknowledgementRequest(sub.subscriptionId);
871 return message;
872 }
873 catch (Throwable t)
874 {
875 SpyJMSException.rethrowAsJMSException("Cannot receive ", t);
876 throw new UnreachableStatementException();
877 }
878 }
879
880 /**
881 * Remove a consumer
882 *
883 * @param consumer the consumer
884 * @throws JMSException for any error
885 */
886 void removeConsumer(SpyConsumer consumer) throws JMSException
887 {
888 checkClosed();
889 Subscription req = consumer.getSubscription();
890 if (trace)
891 log.trace("removeConsumer req=" + req);
892
893 try
894 {
895 serverIL.unsubscribe(connectionToken, req.subscriptionId);
896
897 removeConsumerInternal(consumer);
898 }
899 catch (Throwable t)
900 {
901 SpyJMSException.rethrowAsJMSException("Cannot unsubscribe to this destination", t);
902 }
903
904 }
905
906 /**
907 * Send a message to the server
908 *
909 * @param mes the message
910 * @throws JMSException for any error
911 */
912 void sendToServer(SpyMessage mes) throws JMSException
913 {
914 checkClosed();
915 if (trace)
916 log.trace("SendToServer message=" + mes.header.jmsMessageID + " " + this);
917
918 try
919 {
920 serverIL.addMessage(connectionToken, mes);
921 }
922 catch (Throwable t)
923 {
924 SpyJMSException.rethrowAsJMSException("Cannot send a message to the JMS server", t);
925 }
926 }
927
928 /**
929 * Closing a session
930 *
931 * @param who the session
932 */
933 void sessionClosing(SpySession who)
934 {
935 if (trace)
936 log.trace("Closing session " + who);
937
938 synchronized (createdSessions)
939 {
940 createdSessions.remove(who);
941 }
942
943 //This session should not be in the "destinations" object anymore.
944 //We could check this, though
945 }
946
947 void unsubscribe(DurableSubscriptionID id) throws JMSException
948 {
949 if (trace)
950 log.trace("Unsubscribe id=" + id + " " + this);
951
952 try
953 {
954 serverIL.destroySubscription(connectionToken, id);
955 }
956 catch (Throwable t)
957 {
958 SpyJMSException.rethrowAsJMSException("Cannot destroy durable subscription " + id, t);
959 }
960 }
961
962 /**
963 * Check a tempoary destination
964 *
965 * @param destination the destination
966 */
967 void checkTemporary(Destination destination) throws JMSException
968 {
969 if (destination instanceof TemporaryQueue || destination instanceof TemporaryTopic)
970 {
971 synchronized (temps)
972 {
973 if (temps.contains(destination) == false)
974 throw new JMSException("Cannot create a consumer for a temporary destination from a different session. " + destination);
975 }
976 }
977 }
978
979 /**
980 * Check that a clientID exists. If not get one from server.
981 *
982 * Also sets the setClientIdAllowed to false.
983 *
984 * Check clientId, must be called by all public methods on the
985 * jacax.jmx.Connection interface and its children.
986 *
987 * @exception JMSException if clientID is null as post condition
988 */
989 synchronized protected void checkClientID() throws JMSException
990 {
991 if (setClientIdAllowed == false)
992 return;
993
994 setClientIdAllowed = false;
995 if (trace)
996 log.trace("Checking clientID=" + clientID + " " + this);
997 if (clientID == null)
998 {
999 askForAnID();//Request a random one
1000 if (clientID == null)
1001 throw new JMSException("Could not get a clientID");
1002 connectionToken.setClientID(clientID);
1003
1004 if (trace)
1005 log.trace("ClientID established " + this);
1006 }
1007 }
1008
1009 /**
1010 * Ask the server for an id
1011 *
1012 * @exception JMSException for any error
1013 */
1014 protected void askForAnID() throws JMSException
1015 {
1016 if (trace)
1017 log.trace("Ask for an id " + this);
1018
1019 try
1020 {
1021 if (clientID == null)
1022 clientID = serverIL.getID();
1023 }
1024 catch (Throwable t)
1025 {
1026 SpyJMSException.rethrowAsJMSException("Cannot get a client ID", t);
1027 }
1028 }
1029
1030 /**
1031 * Ask the server for an id
1032 *
1033 * @param userName the user
1034 * @param password the password
1035 * @exception JMSException for any error
1036 */
1037 protected void askForAnID(String userName, String password) throws JMSException
1038 {
1039 if (trace)
1040 log.trace("Ask for an id user=" + userName + " " + this);
1041
1042 try
1043 {
1044 String configuredClientID = serverIL.checkUser(userName, password);
1045 if (configuredClientID != null)
1046 clientID = configuredClientID;
1047 }
1048 catch (Throwable t)
1049 {
1050 SpyJMSException.rethrowAsJMSException("Cannot get a client ID", t);
1051 }
1052 }
1053
1054 /**
1055 * Authenticate a user
1056 *
1057 * @param userName the user
1058 * @param password the password
1059 * @throws JMSException for any error
1060 */
1061 protected void authenticate(String userName, String password) throws JMSException
1062 {
1063 if (trace)
1064 log.trace("Authenticating user " + userName + " " + this);
1065 try
1066 {
1067 sessionId = serverIL.authenticate(userName, password);
1068 }
1069 catch (Throwable t)
1070 {
1071 SpyJMSException.rethrowAsJMSException("Cannot authenticate user", t);
1072 }
1073 }
1074
1075 // used to acknowledge a message
1076 /**
1077 * Acknowledge/Nack a message
1078 *
1079 * @param item the acknowledgement
1080 * @exception JMSException for any error
1081 */
1082 protected void send(AcknowledgementRequest item) throws JMSException
1083 {
1084 checkClosed();
1085 if (trace)
1086 log.trace("Acknowledge item=" + item + " " + this);
1087
1088 try
1089 {
1090 serverIL.acknowledge(connectionToken, item);
1091 }
1092 catch (Throwable t)
1093 {
1094 SpyJMSException.rethrowAsJMSException("Cannot acknowlege a message", t);
1095 }
1096 }
1097
1098 /**
1099 * Commit/rollback
1100 *
1101 * @param transaction the transaction request
1102 * @exception JMSException for any error
1103 */
1104 protected void send(TransactionRequest transaction) throws JMSException
1105 {
1106 checkClosed();
1107 if (trace)
1108 log.trace("Transact request=" + transaction + " " + this);
1109
1110 try
1111 {
1112 serverIL.transact(connectionToken, transaction);
1113 }
1114 catch (Throwable t)
1115 {
1116 SpyJMSException.rethrowAsJMSException("Cannot process a transaction", t);
1117 }
1118 }
1119
1120 /**
1121 * Recover
1122 *
1123 * @param flags the flags
1124 * @throws JMSException for any error
1125 */
1126 protected Xid[] recover(int flags) throws JMSException
1127 {
1128 checkClosed();
1129 if (trace)
1130 log.trace("Recover flags=" + flags + " " + this);
1131
1132 try
1133 {
1134 if (serverIL instanceof Recoverable)
1135 {
1136 Recoverable recoverableIL = (Recoverable) serverIL;
1137 return recoverableIL.recover(connectionToken, flags);
1138 }
1139 }
1140 catch (Throwable t)
1141 {
1142 SpyJMSException.rethrowAsJMSException("Cannot recover", t);
1143 }
1144
1145 log.warn(serverIL + " does not implement " + Recoverable.class.getName());
1146 return new Xid[0];
1147 }
1148
1149 /**
1150 * Start the il
1151 *
1152 * @exception JMSException for any error
1153 */
1154 protected void startILService() throws JMSException
1155 {
1156 if (trace)
1157 log.trace("Starting the client il " + this);
1158 try
1159 {
1160 clientILService = genericConnectionFactory.createClientILService(this);
1161 clientILService.start();
1162 if (trace)
1163 log.trace("Using client id " + clientILService + " " + this);
1164 connectionToken = new ConnectionToken(clientID, clientILService.getClientIL(), sessionId);
1165 serverIL.setConnectionToken(connectionToken);
1166 }
1167 catch (Throwable t)
1168 {
1169 SpyJMSException.rethrowAsJMSException("Cannot start a the client IL service", t);
1170 }
1171 }
1172
1173 /**
1174 * Stop the il
1175 *
1176 * @exception JMSException for any error
1177 */
1178 protected void stopILService() throws JMSException
1179 {
1180 try
1181 {
1182 clientILService.stop();
1183 }
1184 catch (Throwable t)
1185 {
1186 SpyJMSException.rethrowAsJMSException("Cannot stop a the client IL service", t);
1187 }
1188 }
1189
1190 /**
1191 * Stop delivery
1192 *
1193 * @param consumer the consumer
1194 */
1195 public void doStop() throws JMSException
1196 {
1197 if (modeStop)
1198 return;
1199 modeStop = true;
1200
1201 if (trace)
1202 log.trace("Stopping connection " + this);
1203
1204 try
1205 {
1206 serverIL.setEnabled(connectionToken, false);
1207 }
1208 catch (Throwable t)
1209 {
1210 SpyJMSException.rethrowAsJMSException("Cannot disable the connection with the JMS server", t);
1211 }
1212 }
1213
1214 /**
1215 * Remove a consumer
1216 *
1217 * @param consumer the consumer
1218 */
1219 private void removeConsumerInternal(SpyConsumer consumer)
1220 {
1221 synchronized (subscriptions)
1222 {
1223 Subscription req = consumer.getSubscription();
1224 subscriptions.remove(new Integer(req.subscriptionId));
1225
1226 LinkedList ll = (LinkedList) destinationSubscriptions.get(req.destination);
1227 if (ll != null)
1228 {
1229 ll.remove(consumer);
1230 if (ll.size() == 0)
1231 {
1232 destinationSubscriptions.remove(req.destination);
1233 }
1234 }
1235 }
1236 }
1237
1238 /**
1239 * Check whether we are closed
1240 *
1241 * @throws IllegalStateException when the session is closed
1242 */
1243 protected void checkClosed() throws IllegalStateException
1244 {
1245 if (closed.get())
1246 throw new IllegalStateException("The connection is closed");
1247 }
1248
1249 /**
1250 * Start the ping thread
1251 */
1252 private void startPingThread()
1253 {
1254 // Ping thread does not need to be running if the ping period is 0.
1255 if (pingPeriod == 0)
1256 return;
1257 pingTaskId = clockDaemon.executePeriodically(pingPeriod, new PingTask(), true);
1258 }
1259
1260 /**
1261 * Stop the ping thread
1262 */
1263 private void stopPingThread()
1264 {
1265 // Ping thread was not running if ping period is 0.
1266 if (pingPeriod == 0)
1267 return;
1268
1269 ClockDaemon.cancel(pingTaskId);
1270
1271 //Aquire the Semaphore to make sure the ping task is not running.
1272 try
1273 {
1274 pingTaskSemaphore.attempt(1000 * 10);
1275 }
1276 catch (InterruptedException e)
1277 {
1278 Thread.currentThread().interrupt();
1279 }
1280 }
1281
1282 /**
1283 * The ping task
1284 */
1285 class PingTask implements Runnable
1286 {
1287 public void run()
1288 {
1289 // Don't bother if we are closing
1290 if (closing.get())
1291 return;
1292
1293 try
1294 {
1295 // If we can't aquire the semaphore then it
1296 // almost certainly means the close has got it
1297 // Try for 10 seconds to make sure the problem
1298 // is not just a long garbage collection that has suspended threads
1299 if (pingTaskSemaphore.attempt(1000 * 10) == false)
1300 return;
1301 }
1302 catch (InterruptedException e)
1303 {
1304 log.debug("Interrupted requesting ping semaphore");
1305 return;
1306 }
1307 try
1308 {
1309 if (ponged == false)
1310 {
1311 // Server did not pong use with in the timeout
1312 // period.. Assuming the connection is dead.
1313 throw new SpyJMSException("No pong received", new IOException("ping timeout."));
1314 }
1315
1316 ponged = false;
1317 pingServer(System.currentTimeMillis());
1318 }
1319 catch (Throwable t)
1320 {
1321 asynchFailure("Unexpected ping failure", t);
1322 }
1323 finally
1324 {
1325 pingTaskSemaphore.release();
1326 }
1327 }
1328 }
1329
1330 /**
1331 * The Exception listener runnable
1332 */
1333 class ExceptionListenerRunnable implements Runnable
1334 {
1335 ExceptionListener el;
1336 JMSException excep;
1337
1338 /**
1339 * Create a new ExceptionListener runnable
1340 *
1341 * @param el the exception exception
1342 * @param excep the jms exception
1343 */
1344 public ExceptionListenerRunnable(ExceptionListener el, JMSException excep)
1345 {
1346 this.el = el;
1347 this.excep = excep;
1348 }
1349
1350 public void run()
1351 {
1352 try
1353 {
1354 synchronized (elLock)
1355 {
1356 el.onException(excep);
1357 }
1358 }
1359 catch (Throwable t)
1360 {
1361 log.warn("Connection failure: ", excep);
1362 log.warn("Exception listener ended abnormally: ", t);
1363 }
1364
1365 synchronized (elLock)
1366 {
1367 elThread = null;
1368 }
1369 }
1370 }
1371 }