Docjar: A Java Source and Docuemnt Enginecom.*    java.*    javax.*    org.*    all    new    plug-in

Quick Search    Search Deep

Source code: org/activemq/advisories/ConsumerAdvisor.java


1   /** 
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  
19  package org.activemq.advisories;
20  import java.util.Iterator;
21  import java.util.List;
22  import java.util.Map;
23  import java.util.Set;
24  import javax.jms.Connection;
25  import javax.jms.Destination;
26  import javax.jms.JMSException;
27  import javax.jms.Message;
28  import javax.jms.MessageConsumer;
29  import javax.jms.MessageListener;
30  import javax.jms.ObjectMessage;
31  import javax.jms.Session;
32  import org.apache.commons.logging.Log;
33  import org.apache.commons.logging.LogFactory;
34  import org.activemq.message.ActiveMQDestination;
35  import org.activemq.message.ConsumerInfo;
36  import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
37  import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArrayList;
38  import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArraySet;
39  import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
40  
41  /**
42   * A helper class for listening for MessageConsumer advisories
43   * 
44   * @version $Revision: 1.1.1.1 $
45   */
46  public class ConsumerAdvisor implements MessageListener {
47      private static final Log log = LogFactory.getLog(ConsumerAdvisor.class);
48      private Connection connection;
49      private ActiveMQDestination destination;
50      private Session session;
51      private List listeners = new CopyOnWriteArrayList();
52      private SynchronizedBoolean started = new SynchronizedBoolean(false);
53      private Map activeSubscribers = new ConcurrentHashMap();
54  
55      /**
56       * Construct a ConsumerAdvisor
57       * 
58       * @param connection
59       * @param destination the destination to listen for Consumer events
60       * @throws JMSException
61       */
62      public ConsumerAdvisor(Connection connection, Destination destination) throws JMSException {
63          this.connection = connection;
64          this.destination = ActiveMQDestination.transformDestination(destination);
65      }
66  
67      /**
68       * start listening for advisories
69       * 
70       * @throws JMSException
71       */
72      public void start() throws JMSException {
73          if (started.commit(false, true)) {
74              session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
75              MessageConsumer consumer = session.createConsumer(destination.getTopicForConsumerAdvisory());
76              consumer.setMessageListener(this);
77          }
78      }
79  
80      /**
81       * stop listening for advisories
82       * 
83       * @throws JMSException
84       */
85      public void stop() throws JMSException {
86          if (started.commit(true, false)) {
87              if (session != null) {
88                  session.close();
89              }
90          }
91      }
92  
93      /**
94       * Add a listener
95       * 
96       * @param l
97       */
98      public void addListener(ConsumerAdvisoryEventListener l) {
99          listeners.add(l);
100     }
101 
102     /**
103      * Remove a listener
104      * 
105      * @param l
106      */
107     public void removeListener(ConsumerAdvisoryEventListener l) {
108         listeners.remove(l);
109     }
110 
111     /**
112      * returns true if there is an active subscriber for the destination
113      * 
114      * @param destination
115      * @return true if a subscriber for the destination
116      */
117     public boolean isActive(Destination destination) {
118         return activeSubscribers.containsKey(destination);
119     }
120 
121     /**
122      * return a set of active ConsumerInfo's for a particular destination
123      * @param destination
124      * @return the set of currently active ConsumerInfo objects
125      */
126     public Set activeConsumers(Destination destination) {
127         Set set = (Set) activeSubscribers.get(destination);
128         return set != null ? set : new CopyOnWriteArraySet();
129     }
130 
131     /**
132      * OnMessage() implementation
133      * 
134      * @param msg
135      */
136     public void onMessage(Message msg) {
137         if (msg instanceof ObjectMessage) {
138             try {
139                 ConsumerInfo info = (ConsumerInfo) ((ObjectMessage) msg).getObject();
140                 updateActiveConsumers(info);
141                 ConsumerAdvisoryEvent event = new ConsumerAdvisoryEvent(info);
142                 fireEvent(event);
143             }
144             catch (JMSException e) {
145                 log.error("Failed to process message: " + msg);
146             }
147         }
148     }
149 
150     private void fireEvent(ConsumerAdvisoryEvent event) {
151         for (Iterator i = listeners.iterator();i.hasNext();) {
152             ConsumerAdvisoryEventListener l = (ConsumerAdvisoryEventListener) i.next();
153             l.onEvent(event);
154         }
155     }
156 
157     private void updateActiveConsumers(ConsumerInfo info) {
158         Set set = (Set) activeSubscribers.get(info.getDestination());
159         if (info.isStarted()) {
160             if (set == null) {
161                 set = new CopyOnWriteArraySet();
162                 activeSubscribers.put(info.getDestination(), set);
163             }
164             set.add(info);
165         }
166         else {
167             if (set != null) {
168                 set.remove(info);
169                 if (set.isEmpty()) {
170                     activeSubscribers.remove(info.getDestination());
171                 }
172             }
173         }
174     }
175 }