Docjar: A Java Source and Docuemnt Enginecom.*    java.*    javax.*    org.*    all    new    plug-in

Quick Search    Search Deep

Source code: org/activemq/broker/impl/AdvisorySupport.java


1   /**
2    *
3    * Copyright 2004 Protique Ltd
4    *
5    * Licensed under the Apache License, Version 2.0 (the "License");
6    * you may not use this file except in compliance with the License.
7    * You may obtain a copy of the License at
8    *
9    * http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   *
17   **/
18  
19  package org.activemq.broker.impl;
20  import java.io.Serializable;
21  import java.util.Iterator;
22  import java.util.Map;
23  import java.util.Set;
24  import javax.jms.DeliveryMode;
25  import javax.jms.JMSException;
26  import javax.jms.ObjectMessage;
27  import org.apache.commons.logging.Log;
28  import org.apache.commons.logging.LogFactory;
29  import org.activemq.advisories.TempDestinationAdvisoryEvent;
30  import org.activemq.broker.BrokerClient;
31  import org.activemq.message.ActiveMQDestination;
32  import org.activemq.message.ActiveMQMessage;
33  import org.activemq.message.ActiveMQObjectMessage;
34  import org.activemq.message.ConnectionInfo;
35  import org.activemq.message.ConsumerInfo;
36  import org.activemq.message.Packet;
37  import org.activemq.message.ProducerInfo;
38  import org.activemq.util.IdGenerator;
39  import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
40  import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArraySet;
41  
42  /**
43   * Manages advisory subscriptions and messages
44   * 
45   * @version $Revision: 1.1.1.1 $
46   */
47  class AdvisorySupport {
48      private static final Log log = LogFactory.getLog(AdvisorySupport.class);
49      private Set advisoryConsumers = new CopyOnWriteArraySet();
50      private Set consumers = new CopyOnWriteArraySet();
51      private Set producers = new CopyOnWriteArraySet();
52      private Set connections = new CopyOnWriteArraySet();
53      private IdGenerator idGen = new IdGenerator();
54      private Map tempDestinations = new ConcurrentHashMap();//client ids = keys, Set of TempDestinationAdvisoryEvents =
55      // values
56      private DefaultBroker broker;
57  
58      AdvisorySupport(DefaultBroker broker) {
59          this.broker = broker;
60      }
61  
62      /**
63       * Add an advisory Consumer
64       * 
65       * @param advisory
66       * @param sender
67       */
68      void addAdvisory(BrokerClient sender, ConsumerInfo advisory) {
69          if (advisory != null && advisory.isAdvisory()) {
70              advisoryConsumers.add(advisory);
71              for (Iterator i = consumers.iterator();i.hasNext();) {
72                  ConsumerInfo info = (ConsumerInfo) i.next();
73                  dispatchToTarget(sender, generateAdvisory(advisory, info));
74              }
75              for (Iterator i = producers.iterator();i.hasNext();) {
76                  ProducerInfo info = (ProducerInfo) i.next();
77                  dispatchToTarget(sender, generateAdvisory(advisory, info));
78              }
79              for (Iterator i = connections.iterator();i.hasNext();) {
80                  ConnectionInfo info = (ConnectionInfo) i.next();
81                  dispatchToTarget(sender, generateAdvisory(advisory, info));
82              }
83              for (Iterator i = tempDestinations.values().iterator();i.hasNext();) {
84                  Set set = (Set) i.next();
85                  for (Iterator si = set.iterator();si.hasNext();) {
86                      TempDestinationAdvisoryEvent event = (TempDestinationAdvisoryEvent) si.next();
87                      dispatchToTarget(sender, generateAdvisory(advisory, event));
88                  }
89              }
90          }
91          addConsumer(sender, advisory);
92      }
93  
94      /**
95       * remove an advisory Consumer
96       * 
97       * @param sender
98       * @param info
99       */
100     void removeAdvisory(BrokerClient sender, ConsumerInfo info) {
101         advisoryConsumers.remove(info);
102         removeConsumer(sender, info);
103     }
104 
105     /**
106      * Add a Consumer
107      * 
108      * @param sender
109      * @param info
110      */
111     private void addConsumer(BrokerClient sender, ConsumerInfo info) {
112         consumers.remove(info);
113         consumers.add(info);
114         dispatchToBroker(sender, generateAdvisoryMessage(info, info.getDestination().getTopicForConsumerAdvisory()));
115     }
116 
117     /**
118      * Remove a Consumer
119      * 
120      * @param sender
121      * @param info
122      */
123     private void removeConsumer(BrokerClient sender, ConsumerInfo info) {
124         consumers.remove(info);
125         dispatchToBroker(sender, generateAdvisoryMessage(info, info.getDestination().getTopicForConsumerAdvisory()));
126     }
127 
128     /**
129      * Add a Producer
130      * 
131      * @param sender
132      * @param info
133      */
134     void addProducer(BrokerClient sender, ProducerInfo info) {
135         producers.remove(info);
136         producers.add(info);
137         if (info.getDestination() != null) {
138             dispatchToBroker(sender, generateAdvisoryMessage(info, info.getDestination().getTopicForProducerAdvisory()));
139         }
140     }
141 
142     /**
143      * Remove a Producer
144      * 
145      * @param sender
146      * @param info
147      */
148     void removeProducer(BrokerClient sender, ProducerInfo info) {
149         producers.remove(info);
150         if (info.getDestination() != null) {
151             dispatchToBroker(sender, generateAdvisoryMessage(info, info.getDestination().getTopicForProducerAdvisory()));
152         }
153     }
154 
155     /**
156      * Add a Connection
157      * 
158      * @param sender
159      * @param info
160      */
161     void addConnection(BrokerClient sender, ConnectionInfo info) {
162         connections.remove(info);
163         connections.add(info);
164         ActiveMQDestination dest = ActiveMQDestination.createDestination(ActiveMQDestination.ACTIVEMQ_TOPIC,
165                 ActiveMQDestination.CONNECTION_ADVISORY_PREFIX);
166         dispatchToBroker(sender, generateAdvisoryMessage(info, dest));
167     }
168 
169     /**
170      * Remove a Connection
171      * 
172      * @param sender
173      * @param info
174      */
175     void removeConnection(BrokerClient sender, ConnectionInfo info) {
176         connections.remove(info);
177         removeAllTempDestinations(sender, info.getClientId());
178         ActiveMQDestination dest = ActiveMQDestination.createDestination(ActiveMQDestination.ACTIVEMQ_TOPIC,
179                 ActiveMQDestination.CONNECTION_ADVISORY_PREFIX);
180         dispatchToBroker(sender, generateAdvisoryMessage(info, dest));
181     }
182 
183     /**
184      * @param sender
185      * @param message
186      * @throws JMSException
187      */
188     void processTempDestinationAdvisory(BrokerClient sender, ActiveMQMessage message) throws JMSException {
189         TempDestinationAdvisoryEvent event = (TempDestinationAdvisoryEvent) ((ObjectMessage) message).getObject();
190         processTempDestinationAdvisory(event);
191     }
192 
193     /**
194      * @param advisory
195      * @param info
196      * @return an advisory message or null
197      */
198     private ActiveMQMessage generateAdvisory(ConsumerInfo advisory, ConsumerInfo info) {
199         if (matchConsumer(advisory, info)) {
200             return generateAdvisoryMessage(advisory, info, info.getDestination().getTopicForConsumerAdvisory());
201         }
202         return null;
203     }
204 
205     /**
206      * @param advisory
207      * @param info
208      * @return an advisory message or null
209      */
210     private ActiveMQMessage generateAdvisory(ConsumerInfo advisory, ProducerInfo info) {
211         if (matchProducer(advisory, info)) {
212             return generateAdvisoryMessage(advisory, info, info.getDestination().getTopicForProducerAdvisory());
213         }
214         return null;
215     }
216 
217     /**
218      * @param advisory
219      * @param info
220      * @return an advisory message or null
221      */
222     private ActiveMQMessage generateAdvisory(ConsumerInfo advisory, ConnectionInfo info) {
223         if (matchConnection(advisory, info)) {
224             String destName = advisory.getDestination().getPhysicalName();
225             ActiveMQDestination dest = ActiveMQDestination.createDestination(advisory.getDestination()
226                     .getDestinationType(), destName);
227             return generateAdvisoryMessage(advisory, info, dest);
228         }
229         return null;
230     }
231 
232     /**
233      * Generate a TempDestinationAdvisoryEvent if the advisory is a match
234      * 
235      * @param advisory
236      * @param event
237      * @return an advisory message or null
238      */
239     private ActiveMQMessage generateAdvisory(ConsumerInfo advisory, TempDestinationAdvisoryEvent event) {
240         if (matchTempDestinationAdvisory(advisory, event.getDestination())) {
241             return generateAdvisoryMessage(advisory, event, event.getDestination().getTopicForTempAdvisory());
242         }
243         return null;
244     }
245 
246     boolean matchConsumer(ConsumerInfo advisory, ConsumerInfo info) {
247         boolean result = false;
248         if (advisory != null && advisory.getDestination() != null && info != null && info.getDestination() != null) {
249             ActiveMQDestination advisoryDestination = advisory.getDestination();
250             ActiveMQDestination destination = info.getDestination();
251             if (advisoryDestination.isConsumerAdvisory()) {
252                 ActiveMQDestination match = advisoryDestination.getDestinationBeingAdvised();
253                 return match.matches(destination) || matchGeneralAdvisory(advisory, destination);
254             }
255         }
256         return result;
257     }
258 
259     boolean matchProducer(ConsumerInfo advisory, ProducerInfo info) {
260         boolean result = false;
261         if (advisory != null && advisory.getDestination() != null && info != null && info.getDestination() != null) {
262             ActiveMQDestination advisoryDestination = advisory.getDestination();
263             ActiveMQDestination destination = info.getDestination();
264             if (advisoryDestination.isProducerAdvisory()) {
265                 ActiveMQDestination match = advisoryDestination.getDestinationBeingAdvised();
266                 return match.matches(destination) || matchGeneralAdvisory(advisory, destination);
267             }
268         }
269         return result;
270     }
271 
272     boolean matchConnection(ConsumerInfo advisory, ConnectionInfo info) {
273         boolean result = false;
274         if (advisory != null && advisory.getDestination() != null && info != null) {
275             result = (advisory.getDestination().isConnectionAdvisory() && advisory.getDestination().matches(
276                     ActiveMQDestination.createDestination(advisory.getDestination().getDestinationType(),
277                             ActiveMQDestination.CONNECTION_ADVISORY_PREFIX)))
278                     || matchGeneralAdvisory(advisory, advisory.getDestination());
279         }
280         return result;
281     }
282 
283     /**
284      * A consumer could listen for all advisories
285      * 
286      * @param advisory
287      * @param destination
288      * @return true if a general 'catch-all' advisory subscriber
289      */
290     private boolean matchGeneralAdvisory(ConsumerInfo advisory, ActiveMQDestination destination) {
291         boolean result = advisory.getDestination() != null && advisory.getDestination().isAdvisory();
292         if (result) {
293             ActiveMQDestination match = advisory.getDestination().getDestinationBeingAdvised();
294             result = match != null && match.matches(destination);
295         }
296         return result;
297     }
298 
299     boolean matchTempDestinationAdvisory(ConsumerInfo advisory, ActiveMQDestination destination) {
300         boolean result = false;
301         if (advisory != null && advisory.getDestination() != null) {
302             ActiveMQDestination advisoryDestination = advisory.getDestination();
303             if (advisoryDestination.isTempDestinationAdvisory()) {
304                 ActiveMQDestination match = advisoryDestination.getDestinationBeingAdvised();
305                 return match.matches(destination) || matchGeneralAdvisory(advisory, destination);
306             }
307         }
308         return result;
309     }
310 
311     private void processTempDestinationAdvisory(TempDestinationAdvisoryEvent event) {
312         String clientId = ActiveMQDestination.getClientId(event.getDestination());
313         Set set = (Set) tempDestinations.get(clientId);
314         if (event.isStarted()) {
315             if (set == null) {
316                 set = new CopyOnWriteArraySet();
317                 tempDestinations.put(clientId, set);
318             }
319             set.add(event);
320         }
321         else {
322             if (set != null) {
323                 set.remove(event);
324                 if (set.isEmpty()) {
325                     tempDestinations.remove(clientId);
326                 }
327             }
328         }
329     }
330 
331     private void removeAllTempDestinations(BrokerClient sender, String clientId) {
332         Set set = (Set) tempDestinations.remove(clientId);
333         if (set != null) {
334             for (Iterator i = set.iterator();i.hasNext();) {
335                 TempDestinationAdvisoryEvent event = (TempDestinationAdvisoryEvent) i.next();
336                 event.setStarted(false);
337                 processTempDestinationAdvisory(event);
338                 for (Iterator it = advisoryConsumers.iterator();it.hasNext();) {
339                     ConsumerInfo advisory = (ConsumerInfo) it.next();
340                     dispatchToTarget(sender, generateAdvisory(advisory, event));
341                 }
342             }
343         }
344     }
345 
346     /**
347      * Generate an advisory message
348      * 
349      * @param payload
350      * @param destination
351      * @return create ActiveMQMessage
352      */
353     private ActiveMQMessage generateAdvisoryMessage(Packet payload, ActiveMQDestination destination) {
354         return generateAdvisoryMessage(null, payload, destination);
355     }
356 
357     /**
358      * Generate an advisory message
359      * 
360      * @param advisoryTarget
361      * @param payload
362      * @param destination
363      * @return create ActiveMQMessage
364      */
365     private ActiveMQMessage generateAdvisoryMessage(final ConsumerInfo advisoryTarget, final Packet payload,
366             final ActiveMQDestination destination) {
367         ActiveMQObjectMessage advisoryMsg = null;
368         try {
369             advisoryMsg = new ActiveMQObjectMessage();
370             advisoryMsg.setJMSMessageID(idGen.generateId());
371             advisoryMsg.setJMSDestination(destination);
372             advisoryMsg.setExternalMessageId(true);
373             advisoryMsg.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT);
374             advisoryMsg.setObject((Serializable) payload);
375             if (advisoryTarget != null) {
376                 advisoryMsg.setConsumerNos(new int[]{advisoryTarget.getConsumerNo()});
377             }
378         }
379         catch (JMSException e) {
380             advisoryMsg = null;
381             log.warn("caught an exception generating an advisory", e);
382         }
383         return advisoryMsg;
384     }
385 
386     private void dispatchToTarget(BrokerClient target, ActiveMQMessage message) {
387         if (target != null && message != null) {
388             target.dispatch(message);
389         }
390     }
391 
392     private void dispatchToBroker(BrokerClient sender, ActiveMQMessage message) {
393         if (sender != null && message != null) {
394             try {
395                 broker.sendMessage(sender, message);
396             }
397             catch (JMSException e) {
398                 log.warn("caught an exception sending an advisory", e);
399             }
400         }
401     }
402 }