Source code: org/activemq/broker/impl/AdvisorySupport.java
1 /**
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18
19 package org.activemq.broker.impl;
20 import java.io.Serializable;
21 import java.util.Iterator;
22 import java.util.Map;
23 import java.util.Set;
24 import javax.jms.DeliveryMode;
25 import javax.jms.JMSException;
26 import javax.jms.ObjectMessage;
27 import org.apache.commons.logging.Log;
28 import org.apache.commons.logging.LogFactory;
29 import org.activemq.advisories.TempDestinationAdvisoryEvent;
30 import org.activemq.broker.BrokerClient;
31 import org.activemq.message.ActiveMQDestination;
32 import org.activemq.message.ActiveMQMessage;
33 import org.activemq.message.ActiveMQObjectMessage;
34 import org.activemq.message.ConnectionInfo;
35 import org.activemq.message.ConsumerInfo;
36 import org.activemq.message.Packet;
37 import org.activemq.message.ProducerInfo;
38 import org.activemq.util.IdGenerator;
39 import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
40 import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArraySet;
41
42 /**
43 * Manages advisory subscriptions and messages
44 *
45 * @version $Revision: 1.1.1.1 $
46 */
47 class AdvisorySupport {
48 private static final Log log = LogFactory.getLog(AdvisorySupport.class);
49 private Set advisoryConsumers = new CopyOnWriteArraySet();
50 private Set consumers = new CopyOnWriteArraySet();
51 private Set producers = new CopyOnWriteArraySet();
52 private Set connections = new CopyOnWriteArraySet();
53 private IdGenerator idGen = new IdGenerator();
54 private Map tempDestinations = new ConcurrentHashMap();//client ids = keys, Set of TempDestinationAdvisoryEvents =
55 // values
56 private DefaultBroker broker;
57
58 AdvisorySupport(DefaultBroker broker) {
59 this.broker = broker;
60 }
61
62 /**
63 * Add an advisory Consumer
64 *
65 * @param advisory
66 * @param sender
67 */
68 void addAdvisory(BrokerClient sender, ConsumerInfo advisory) {
69 if (advisory != null && advisory.isAdvisory()) {
70 advisoryConsumers.add(advisory);
71 for (Iterator i = consumers.iterator();i.hasNext();) {
72 ConsumerInfo info = (ConsumerInfo) i.next();
73 dispatchToTarget(sender, generateAdvisory(advisory, info));
74 }
75 for (Iterator i = producers.iterator();i.hasNext();) {
76 ProducerInfo info = (ProducerInfo) i.next();
77 dispatchToTarget(sender, generateAdvisory(advisory, info));
78 }
79 for (Iterator i = connections.iterator();i.hasNext();) {
80 ConnectionInfo info = (ConnectionInfo) i.next();
81 dispatchToTarget(sender, generateAdvisory(advisory, info));
82 }
83 for (Iterator i = tempDestinations.values().iterator();i.hasNext();) {
84 Set set = (Set) i.next();
85 for (Iterator si = set.iterator();si.hasNext();) {
86 TempDestinationAdvisoryEvent event = (TempDestinationAdvisoryEvent) si.next();
87 dispatchToTarget(sender, generateAdvisory(advisory, event));
88 }
89 }
90 }
91 addConsumer(sender, advisory);
92 }
93
94 /**
95 * remove an advisory Consumer
96 *
97 * @param sender
98 * @param info
99 */
100 void removeAdvisory(BrokerClient sender, ConsumerInfo info) {
101 advisoryConsumers.remove(info);
102 removeConsumer(sender, info);
103 }
104
105 /**
106 * Add a Consumer
107 *
108 * @param sender
109 * @param info
110 */
111 private void addConsumer(BrokerClient sender, ConsumerInfo info) {
112 consumers.remove(info);
113 consumers.add(info);
114 dispatchToBroker(sender, generateAdvisoryMessage(info, info.getDestination().getTopicForConsumerAdvisory()));
115 }
116
117 /**
118 * Remove a Consumer
119 *
120 * @param sender
121 * @param info
122 */
123 private void removeConsumer(BrokerClient sender, ConsumerInfo info) {
124 consumers.remove(info);
125 dispatchToBroker(sender, generateAdvisoryMessage(info, info.getDestination().getTopicForConsumerAdvisory()));
126 }
127
128 /**
129 * Add a Producer
130 *
131 * @param sender
132 * @param info
133 */
134 void addProducer(BrokerClient sender, ProducerInfo info) {
135 producers.remove(info);
136 producers.add(info);
137 if (info.getDestination() != null) {
138 dispatchToBroker(sender, generateAdvisoryMessage(info, info.getDestination().getTopicForProducerAdvisory()));
139 }
140 }
141
142 /**
143 * Remove a Producer
144 *
145 * @param sender
146 * @param info
147 */
148 void removeProducer(BrokerClient sender, ProducerInfo info) {
149 producers.remove(info);
150 if (info.getDestination() != null) {
151 dispatchToBroker(sender, generateAdvisoryMessage(info, info.getDestination().getTopicForProducerAdvisory()));
152 }
153 }
154
155 /**
156 * Add a Connection
157 *
158 * @param sender
159 * @param info
160 */
161 void addConnection(BrokerClient sender, ConnectionInfo info) {
162 connections.remove(info);
163 connections.add(info);
164 ActiveMQDestination dest = ActiveMQDestination.createDestination(ActiveMQDestination.ACTIVEMQ_TOPIC,
165 ActiveMQDestination.CONNECTION_ADVISORY_PREFIX);
166 dispatchToBroker(sender, generateAdvisoryMessage(info, dest));
167 }
168
169 /**
170 * Remove a Connection
171 *
172 * @param sender
173 * @param info
174 */
175 void removeConnection(BrokerClient sender, ConnectionInfo info) {
176 connections.remove(info);
177 removeAllTempDestinations(sender, info.getClientId());
178 ActiveMQDestination dest = ActiveMQDestination.createDestination(ActiveMQDestination.ACTIVEMQ_TOPIC,
179 ActiveMQDestination.CONNECTION_ADVISORY_PREFIX);
180 dispatchToBroker(sender, generateAdvisoryMessage(info, dest));
181 }
182
183 /**
184 * @param sender
185 * @param message
186 * @throws JMSException
187 */
188 void processTempDestinationAdvisory(BrokerClient sender, ActiveMQMessage message) throws JMSException {
189 TempDestinationAdvisoryEvent event = (TempDestinationAdvisoryEvent) ((ObjectMessage) message).getObject();
190 processTempDestinationAdvisory(event);
191 }
192
193 /**
194 * @param advisory
195 * @param info
196 * @return an advisory message or null
197 */
198 private ActiveMQMessage generateAdvisory(ConsumerInfo advisory, ConsumerInfo info) {
199 if (matchConsumer(advisory, info)) {
200 return generateAdvisoryMessage(advisory, info, info.getDestination().getTopicForConsumerAdvisory());
201 }
202 return null;
203 }
204
205 /**
206 * @param advisory
207 * @param info
208 * @return an advisory message or null
209 */
210 private ActiveMQMessage generateAdvisory(ConsumerInfo advisory, ProducerInfo info) {
211 if (matchProducer(advisory, info)) {
212 return generateAdvisoryMessage(advisory, info, info.getDestination().getTopicForProducerAdvisory());
213 }
214 return null;
215 }
216
217 /**
218 * @param advisory
219 * @param info
220 * @return an advisory message or null
221 */
222 private ActiveMQMessage generateAdvisory(ConsumerInfo advisory, ConnectionInfo info) {
223 if (matchConnection(advisory, info)) {
224 String destName = advisory.getDestination().getPhysicalName();
225 ActiveMQDestination dest = ActiveMQDestination.createDestination(advisory.getDestination()
226 .getDestinationType(), destName);
227 return generateAdvisoryMessage(advisory, info, dest);
228 }
229 return null;
230 }
231
232 /**
233 * Generate a TempDestinationAdvisoryEvent if the advisory is a match
234 *
235 * @param advisory
236 * @param event
237 * @return an advisory message or null
238 */
239 private ActiveMQMessage generateAdvisory(ConsumerInfo advisory, TempDestinationAdvisoryEvent event) {
240 if (matchTempDestinationAdvisory(advisory, event.getDestination())) {
241 return generateAdvisoryMessage(advisory, event, event.getDestination().getTopicForTempAdvisory());
242 }
243 return null;
244 }
245
246 boolean matchConsumer(ConsumerInfo advisory, ConsumerInfo info) {
247 boolean result = false;
248 if (advisory != null && advisory.getDestination() != null && info != null && info.getDestination() != null) {
249 ActiveMQDestination advisoryDestination = advisory.getDestination();
250 ActiveMQDestination destination = info.getDestination();
251 if (advisoryDestination.isConsumerAdvisory()) {
252 ActiveMQDestination match = advisoryDestination.getDestinationBeingAdvised();
253 return match.matches(destination) || matchGeneralAdvisory(advisory, destination);
254 }
255 }
256 return result;
257 }
258
259 boolean matchProducer(ConsumerInfo advisory, ProducerInfo info) {
260 boolean result = false;
261 if (advisory != null && advisory.getDestination() != null && info != null && info.getDestination() != null) {
262 ActiveMQDestination advisoryDestination = advisory.getDestination();
263 ActiveMQDestination destination = info.getDestination();
264 if (advisoryDestination.isProducerAdvisory()) {
265 ActiveMQDestination match = advisoryDestination.getDestinationBeingAdvised();
266 return match.matches(destination) || matchGeneralAdvisory(advisory, destination);
267 }
268 }
269 return result;
270 }
271
272 boolean matchConnection(ConsumerInfo advisory, ConnectionInfo info) {
273 boolean result = false;
274 if (advisory != null && advisory.getDestination() != null && info != null) {
275 result = (advisory.getDestination().isConnectionAdvisory() && advisory.getDestination().matches(
276 ActiveMQDestination.createDestination(advisory.getDestination().getDestinationType(),
277 ActiveMQDestination.CONNECTION_ADVISORY_PREFIX)))
278 || matchGeneralAdvisory(advisory, advisory.getDestination());
279 }
280 return result;
281 }
282
283 /**
284 * A consumer could listen for all advisories
285 *
286 * @param advisory
287 * @param destination
288 * @return true if a general 'catch-all' advisory subscriber
289 */
290 private boolean matchGeneralAdvisory(ConsumerInfo advisory, ActiveMQDestination destination) {
291 boolean result = advisory.getDestination() != null && advisory.getDestination().isAdvisory();
292 if (result) {
293 ActiveMQDestination match = advisory.getDestination().getDestinationBeingAdvised();
294 result = match != null && match.matches(destination);
295 }
296 return result;
297 }
298
299 boolean matchTempDestinationAdvisory(ConsumerInfo advisory, ActiveMQDestination destination) {
300 boolean result = false;
301 if (advisory != null && advisory.getDestination() != null) {
302 ActiveMQDestination advisoryDestination = advisory.getDestination();
303 if (advisoryDestination.isTempDestinationAdvisory()) {
304 ActiveMQDestination match = advisoryDestination.getDestinationBeingAdvised();
305 return match.matches(destination) || matchGeneralAdvisory(advisory, destination);
306 }
307 }
308 return result;
309 }
310
311 private void processTempDestinationAdvisory(TempDestinationAdvisoryEvent event) {
312 String clientId = ActiveMQDestination.getClientId(event.getDestination());
313 Set set = (Set) tempDestinations.get(clientId);
314 if (event.isStarted()) {
315 if (set == null) {
316 set = new CopyOnWriteArraySet();
317 tempDestinations.put(clientId, set);
318 }
319 set.add(event);
320 }
321 else {
322 if (set != null) {
323 set.remove(event);
324 if (set.isEmpty()) {
325 tempDestinations.remove(clientId);
326 }
327 }
328 }
329 }
330
331 private void removeAllTempDestinations(BrokerClient sender, String clientId) {
332 Set set = (Set) tempDestinations.remove(clientId);
333 if (set != null) {
334 for (Iterator i = set.iterator();i.hasNext();) {
335 TempDestinationAdvisoryEvent event = (TempDestinationAdvisoryEvent) i.next();
336 event.setStarted(false);
337 processTempDestinationAdvisory(event);
338 for (Iterator it = advisoryConsumers.iterator();it.hasNext();) {
339 ConsumerInfo advisory = (ConsumerInfo) it.next();
340 dispatchToTarget(sender, generateAdvisory(advisory, event));
341 }
342 }
343 }
344 }
345
346 /**
347 * Generate an advisory message
348 *
349 * @param payload
350 * @param destination
351 * @return create ActiveMQMessage
352 */
353 private ActiveMQMessage generateAdvisoryMessage(Packet payload, ActiveMQDestination destination) {
354 return generateAdvisoryMessage(null, payload, destination);
355 }
356
357 /**
358 * Generate an advisory message
359 *
360 * @param advisoryTarget
361 * @param payload
362 * @param destination
363 * @return create ActiveMQMessage
364 */
365 private ActiveMQMessage generateAdvisoryMessage(final ConsumerInfo advisoryTarget, final Packet payload,
366 final ActiveMQDestination destination) {
367 ActiveMQObjectMessage advisoryMsg = null;
368 try {
369 advisoryMsg = new ActiveMQObjectMessage();
370 advisoryMsg.setJMSMessageID(idGen.generateId());
371 advisoryMsg.setJMSDestination(destination);
372 advisoryMsg.setExternalMessageId(true);
373 advisoryMsg.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT);
374 advisoryMsg.setObject((Serializable) payload);
375 if (advisoryTarget != null) {
376 advisoryMsg.setConsumerNos(new int[]{advisoryTarget.getConsumerNo()});
377 }
378 }
379 catch (JMSException e) {
380 advisoryMsg = null;
381 log.warn("caught an exception generating an advisory", e);
382 }
383 return advisoryMsg;
384 }
385
386 private void dispatchToTarget(BrokerClient target, ActiveMQMessage message) {
387 if (target != null && message != null) {
388 target.dispatch(message);
389 }
390 }
391
392 private void dispatchToBroker(BrokerClient sender, ActiveMQMessage message) {
393 if (sender != null && message != null) {
394 try {
395 broker.sendMessage(sender, message);
396 }
397 catch (JMSException e) {
398 log.warn("caught an exception sending an advisory", e);
399 }
400 }
401 }
402 }