Docjar: A Java Source and Docuemnt Enginecom.*    java.*    javax.*    org.*    all    new    plug-in

Quick Search    Search Deep

Source code: org/activemq/advisories/ConnectionAdvisor.java


1   /** 
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  
19  package org.activemq.advisories;
20  import java.util.HashMap;
21  import java.util.HashSet;
22  import java.util.Iterator;
23  import java.util.List;
24  import java.util.Map;
25  import java.util.Set;
26  
27  import javax.jms.Connection;
28  import javax.jms.Destination;
29  import javax.jms.JMSException;
30  import javax.jms.Message;
31  import javax.jms.MessageConsumer;
32  import javax.jms.MessageListener;
33  import javax.jms.ObjectMessage;
34  import javax.jms.Session;
35  
36  import org.activemq.message.ActiveMQDestination;
37  import org.activemq.message.ActiveMQTopic;
38  import org.activemq.message.ConnectionInfo;
39  import org.apache.commons.logging.Log;
40  import org.apache.commons.logging.LogFactory;
41  
42  import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArrayList;
43  import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
44  
45  /**
46   * A helper class for listening for MessageConnection advisories *
47   * 
48   * @version $Revision: 1.1.1.1 $
49   */
50  public class ConnectionAdvisor implements MessageListener {
51      private static final Log log = LogFactory.getLog(ConnectionAdvisor.class);
52      private Connection connection;
53      private Session session;
54      private List listeners = new CopyOnWriteArrayList();
55      private Map activeConnections = new HashMap();
56      private SynchronizedBoolean started = new SynchronizedBoolean(false);
57      private Object lock = new Object();
58  
59      /**
60       * Construct a ConnectionAdvisor
61       * 
62       * @param connection
63       * @throws JMSException
64       */
65      public ConnectionAdvisor(Connection connection) throws JMSException {
66          this.connection = connection;
67      }
68  
69      /**
70       * start listening for advisories
71       * 
72       * @throws JMSException
73       */
74      public void start() throws JMSException {
75          if (started.commit(false, true)) {
76              
77              session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
78              
79              String advisoryName = ActiveMQDestination.CONNECTION_ADVISORY_PREFIX;
80              Destination advisoryDestination = new ActiveMQTopic(advisoryName);
81              MessageConsumer consumer = session.createConsumer(advisoryDestination);
82              consumer.setMessageListener(this);
83          }
84      }
85  
86      /**
87       * stop listening for advisories
88       * 
89       * @throws JMSException
90       */
91      public void stop() throws JMSException {
92          if (started.commit(true, false)) {
93              if (session != null) {
94                  session.close();
95              }
96              synchronized (lock) {
97                  lock.notifyAll();
98              }
99          }
100     }
101 
102     /**
103      * Add a listener
104      * 
105      * @param l
106      */
107     public void addListener(ConnectionAdvisoryEventListener l) {
108         listeners.add(l);
109     }
110 
111     /**
112      * Remove a listener
113      * 
114      * @param l
115      */
116     public void removeListener(ConnectionAdvisoryEventListener l) {
117         listeners.remove(l);
118     }
119 
120     /**
121      * returns true if the connection is active
122      * 
123      * @param clientId for the connection
124      * @return true if the connection is active
125      */
126     public boolean isActive(String clientId) {
127         return activeConnections.containsKey(clientId);
128     }
129 
130     /**
131      * Retrive all current Connections
132      * 
133      * @return
134      */
135     public Set getConnections() {
136         Set set = new HashSet();
137         set.addAll(activeConnections.values());
138         return set;
139     }
140 
141     /**
142      * Waits until the number of active connections is equivalent to the number supplied, or the timeout is exceeded
143      * 
144      * @param number
145      * @param timeout
146      * @return the number of activeConnections
147      */
148     public int waitForActiveConnections(int number, long timeout) {
149         int result = 0;
150         // if timeInMillis is less than zero assume nowait
151         long waitTime = timeout;
152         long start = (timeout <= 0) ? 0 : System.currentTimeMillis();
153         synchronized (lock) {
154             while (started.get()) {
155                 result = numberOfActiveConnections();
156                 if (result == number || waitTime <= 0) {
157                     break;
158                 }
159                 else {
160                     try {
161                         lock.wait(waitTime);
162                     }
163                     catch (Throwable e) {
164                         log.debug("Interrupted", e);
165                         e.printStackTrace();
166                     }
167                     waitTime = timeout - (System.currentTimeMillis() - start);
168                 }
169             }
170         }
171         return result;
172     }
173 
174     /**
175      * return the current number of active connections
176      * 
177      * @return
178      */
179     public int numberOfActiveConnections() {
180         return activeConnections.size();
181     }
182 
183     /**
184      * OnMessage() implementation
185      * 
186      * @param msg
187      */
188     public void onMessage(Message msg) {
189         if (msg instanceof ObjectMessage) {
190             try {
191                 ConnectionInfo info = (ConnectionInfo) ((ObjectMessage) msg).getObject();
192                 
193                 ConnectionAdvisoryEvent event = new ConnectionAdvisoryEvent(info);
194                 if (!event.getInfo().isClosed()) {
195                     activeConnections.put(event.getInfo().getClientId(), event.getInfo());
196                 }
197                 else {
198                     activeConnections.remove(event.getInfo().getClientId());
199                 }
200                 synchronized (lock) {
201                     lock.notify();
202                 }
203                 fireEvent(event);
204             }
205             catch (Throwable e) {
206                 log.error("Failed to process message: " + msg);
207             }
208         }
209     }
210 
211     private void fireEvent(ConnectionAdvisoryEvent event) {
212         for (Iterator i = listeners.iterator();i.hasNext();) {
213             ConnectionAdvisoryEventListener l = (ConnectionAdvisoryEventListener) i.next();
214             l.onEvent(event);
215         }
216     }
217 }