Docjar: A Java Source and Docuemnt Enginecom.*    java.*    javax.*    org.*    all    new    plug-in

Quick Search    Search Deep

Source code: org/activemq/advisories/TempDestinationAdvisor.java


1   /** 
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  
19  package org.activemq.advisories;
20  import java.util.Iterator;
21  import java.util.List;
22  import java.util.Set;
23  import javax.jms.Connection;
24  import javax.jms.Destination;
25  import javax.jms.JMSException;
26  import javax.jms.Message;
27  import javax.jms.MessageConsumer;
28  import javax.jms.MessageListener;
29  import javax.jms.ObjectMessage;
30  import javax.jms.Session;
31  import org.apache.commons.logging.Log;
32  import org.apache.commons.logging.LogFactory;
33  import org.activemq.ActiveMQConnection;
34  import org.activemq.ActiveMQSession;
35  import org.activemq.message.ActiveMQDestination;
36  import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArrayList;
37  import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArraySet;
38  import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
39  
40  /**
41   * A helper class for listening for TempDestination advisories
42   * 
43   * @version $Revision: 1.1.1.1 $
44   */
45  public class TempDestinationAdvisor implements MessageListener {
46      private static final Log log = LogFactory.getLog(TempDestinationAdvisor.class);
47      private Connection connection;
48      private ActiveMQDestination destination;
49      private Session session;
50      private List listeners = new CopyOnWriteArrayList();
51      private Set activeDestinations = new CopyOnWriteArraySet();
52      private SynchronizedBoolean started = new SynchronizedBoolean(false);
53      private long startedAt;
54  
55      /**
56       * Construct a TempDestinationAdvisor
57       * 
58       * @param connection
59       * @param destination the destination to listen for TempDestination events
60       * @throws JMSException
61       */
62      public TempDestinationAdvisor(Connection connection, Destination destination) throws JMSException {
63          this.connection = connection;
64          this.destination = ActiveMQDestination.transformDestination(destination);
65      }
66  
67      /**
68       * start listening for advisories
69       * 
70       * @throws JMSException
71       */
72      public void start() throws JMSException {
73          if (started.commit(false, true)) {
74              if (connection instanceof ActiveMQConnection) {
75                  session = ((ActiveMQConnection) connection).createSession(false, Session.AUTO_ACKNOWLEDGE, true);
76                  ((ActiveMQSession) session).setInternalSession(true);
77              }
78              else {
79                  session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
80              }
81              MessageConsumer consumer = session.createConsumer(destination.getTopicForTempAdvisory());
82              consumer.setMessageListener(this);
83              startedAt = System.currentTimeMillis();
84          }
85      }
86  
87      /**
88       * stop listening for advisories
89       * 
90       * @throws JMSException
91       */
92      public void stop() throws JMSException {
93          if (started.commit(true, false)) {
94              if (session != null) {
95                  session.close();
96              }
97          }
98      }
99  
100     /**
101      * returns true if the temporary destination is active
102      * 
103      * @param destination
104      * @return true if a subscriber for the destination
105      */
106     public boolean isActive(Destination destination) {
107         boolean rtnval = false;
108         synchronized(this)
109         {
110             rtnval = activeDestinations.contains(destination);
111             if (rtnval == false && startedAt > 0)
112             {
113                 // wait a while to see if the advisory event arrives (no longer than 5 seconds)
114                 long waittime = 5000 - (System.currentTimeMillis() - startedAt);
115                 startedAt = 0;
116                 try {
117                     wait(waittime);
118                 } catch (Exception e) {}
119                 rtnval = activeDestinations.contains(destination);
120             }
121         }
122         return rtnval;
123     }
124 
125     /**
126      * Add a listener
127      * 
128      * @param l
129      */
130     public void addListener(TempDestinationAdvisoryEventListener l) {
131         listeners.add(l);
132     }
133 
134     /**
135      * Remove a listener
136      * 
137      * @param l
138      */
139     public void removeListener(TempDestinationAdvisoryEventListener l) {
140         listeners.remove(l);
141     }
142 
143     /**
144      * OnMessage() implementation
145      * 
146      * @param msg
147      */
148     public void onMessage(Message msg) {
149         if (msg instanceof ObjectMessage) {
150             try {
151                 TempDestinationAdvisoryEvent event = (TempDestinationAdvisoryEvent) ((ObjectMessage) msg).getObject();
152                 if (event.isStarted()) {
153                     activeDestinations.add(event.getDestination());
154                     synchronized (this) {
155                         notifyAll();
156                     }
157         }
158                 else {
159                     activeDestinations.remove(event.getDestination());
160                 }
161                 fireEvent(event);
162             }
163             catch (JMSException e) {
164                 log.error("Failed to process message: " + msg);
165             }
166         }
167     }
168 
169     private void fireEvent(TempDestinationAdvisoryEvent event) {
170         for (Iterator i = listeners.iterator();i.hasNext();) {
171             TempDestinationAdvisoryEventListener l = (TempDestinationAdvisoryEventListener) i.next();
172             l.onEvent(event);
173         }
174     }
175 }