Source code: org/activemq/advisories/ConnectionAdvisor.java
1 /**
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18
19 package org.activemq.advisories;
20 import java.util.HashMap;
21 import java.util.HashSet;
22 import java.util.Iterator;
23 import java.util.List;
24 import java.util.Map;
25 import java.util.Set;
26
27 import javax.jms.Connection;
28 import javax.jms.Destination;
29 import javax.jms.JMSException;
30 import javax.jms.Message;
31 import javax.jms.MessageConsumer;
32 import javax.jms.MessageListener;
33 import javax.jms.ObjectMessage;
34 import javax.jms.Session;
35
36 import org.activemq.message.ActiveMQDestination;
37 import org.activemq.message.ActiveMQTopic;
38 import org.activemq.message.ConnectionInfo;
39 import org.apache.commons.logging.Log;
40 import org.apache.commons.logging.LogFactory;
41
42 import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArrayList;
43 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
44
45 /**
46 * A helper class for listening for MessageConnection advisories *
47 *
48 * @version $Revision: 1.1.1.1 $
49 */
50 public class ConnectionAdvisor implements MessageListener {
51 private static final Log log = LogFactory.getLog(ConnectionAdvisor.class);
52 private Connection connection;
53 private Session session;
54 private List listeners = new CopyOnWriteArrayList();
55 private Map activeConnections = new HashMap();
56 private SynchronizedBoolean started = new SynchronizedBoolean(false);
57 private Object lock = new Object();
58
59 /**
60 * Construct a ConnectionAdvisor
61 *
62 * @param connection
63 * @throws JMSException
64 */
65 public ConnectionAdvisor(Connection connection) throws JMSException {
66 this.connection = connection;
67 }
68
69 /**
70 * start listening for advisories
71 *
72 * @throws JMSException
73 */
74 public void start() throws JMSException {
75 if (started.commit(false, true)) {
76
77 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
78
79 String advisoryName = ActiveMQDestination.CONNECTION_ADVISORY_PREFIX;
80 Destination advisoryDestination = new ActiveMQTopic(advisoryName);
81 MessageConsumer consumer = session.createConsumer(advisoryDestination);
82 consumer.setMessageListener(this);
83 }
84 }
85
86 /**
87 * stop listening for advisories
88 *
89 * @throws JMSException
90 */
91 public void stop() throws JMSException {
92 if (started.commit(true, false)) {
93 if (session != null) {
94 session.close();
95 }
96 synchronized (lock) {
97 lock.notifyAll();
98 }
99 }
100 }
101
102 /**
103 * Add a listener
104 *
105 * @param l
106 */
107 public void addListener(ConnectionAdvisoryEventListener l) {
108 listeners.add(l);
109 }
110
111 /**
112 * Remove a listener
113 *
114 * @param l
115 */
116 public void removeListener(ConnectionAdvisoryEventListener l) {
117 listeners.remove(l);
118 }
119
120 /**
121 * returns true if the connection is active
122 *
123 * @param clientId for the connection
124 * @return true if the connection is active
125 */
126 public boolean isActive(String clientId) {
127 return activeConnections.containsKey(clientId);
128 }
129
130 /**
131 * Retrive all current Connections
132 *
133 * @return
134 */
135 public Set getConnections() {
136 Set set = new HashSet();
137 set.addAll(activeConnections.values());
138 return set;
139 }
140
141 /**
142 * Waits until the number of active connections is equivalent to the number supplied, or the timeout is exceeded
143 *
144 * @param number
145 * @param timeout
146 * @return the number of activeConnections
147 */
148 public int waitForActiveConnections(int number, long timeout) {
149 int result = 0;
150 // if timeInMillis is less than zero assume nowait
151 long waitTime = timeout;
152 long start = (timeout <= 0) ? 0 : System.currentTimeMillis();
153 synchronized (lock) {
154 while (started.get()) {
155 result = numberOfActiveConnections();
156 if (result == number || waitTime <= 0) {
157 break;
158 }
159 else {
160 try {
161 lock.wait(waitTime);
162 }
163 catch (Throwable e) {
164 log.debug("Interrupted", e);
165 e.printStackTrace();
166 }
167 waitTime = timeout - (System.currentTimeMillis() - start);
168 }
169 }
170 }
171 return result;
172 }
173
174 /**
175 * return the current number of active connections
176 *
177 * @return
178 */
179 public int numberOfActiveConnections() {
180 return activeConnections.size();
181 }
182
183 /**
184 * OnMessage() implementation
185 *
186 * @param msg
187 */
188 public void onMessage(Message msg) {
189 if (msg instanceof ObjectMessage) {
190 try {
191 ConnectionInfo info = (ConnectionInfo) ((ObjectMessage) msg).getObject();
192
193 ConnectionAdvisoryEvent event = new ConnectionAdvisoryEvent(info);
194 if (!event.getInfo().isClosed()) {
195 activeConnections.put(event.getInfo().getClientId(), event.getInfo());
196 }
197 else {
198 activeConnections.remove(event.getInfo().getClientId());
199 }
200 synchronized (lock) {
201 lock.notify();
202 }
203 fireEvent(event);
204 }
205 catch (Throwable e) {
206 log.error("Failed to process message: " + msg);
207 }
208 }
209 }
210
211 private void fireEvent(ConnectionAdvisoryEvent event) {
212 for (Iterator i = listeners.iterator();i.hasNext();) {
213 ConnectionAdvisoryEventListener l = (ConnectionAdvisoryEventListener) i.next();
214 l.onEvent(event);
215 }
216 }
217 }