Docjar: A Java Source and Docuemnt Enginecom.*    java.*    javax.*    org.*    all    new    plug-in

Quick Search    Search Deep

Source code: org/activemq/service/impl/DurableTopicMessageContainerManager.java


1   /**
2    * Copyright 2004 Protique Ltd
3    * 
4    * Licensed under the Apache License, Version 2.0 (the "License"); 
5    * you may not use this file except in compliance with the License. 
6    * You may obtain a copy of the License at 
7    * 
8    * http://www.apache.org/licenses/LICENSE-2.0
9    * 
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, 
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
13   * See the License for the specific language governing permissions and 
14   * limitations under the License. 
15   * 
16   **/
17  
18  package org.activemq.service.impl;
19  import java.util.Collections;
20  import java.util.HashMap;
21  import java.util.Iterator;
22  import java.util.Map;
23  import java.util.Set;
24  
25  import javax.jms.DeliveryMode;
26  import javax.jms.Destination;
27  import javax.jms.InvalidDestinationException;
28  import javax.jms.JMSException;
29  import javax.jms.Topic;
30  
31  import org.apache.commons.logging.Log;
32  import org.apache.commons.logging.LogFactory;
33  import org.activemq.DuplicateDurableSubscriptionException;
34  import org.activemq.broker.BrokerClient;
35  import org.activemq.filter.AndFilter;
36  import org.activemq.filter.DestinationMap;
37  import org.activemq.filter.Filter;
38  import org.activemq.filter.FilterFactory;
39  import org.activemq.filter.FilterFactoryImpl;
40  import org.activemq.filter.NoLocalFilter;
41  import org.activemq.message.ActiveMQDestination;
42  import org.activemq.message.ActiveMQMessage;
43  import org.activemq.message.ActiveMQTopic;
44  import org.activemq.message.ConsumerInfo;
45  import org.activemq.message.MessageAck;
46  import org.activemq.service.DeadLetterPolicy;
47  import org.activemq.service.Dispatcher;
48  import org.activemq.service.MessageContainer;
49  import org.activemq.service.RedeliveryPolicy;
50  import org.activemq.service.Subscription;
51  import org.activemq.service.SubscriptionContainer;
52  import org.activemq.service.TopicMessageContainer;
53  import org.activemq.service.TransactionManager;
54  import org.activemq.service.TransactionTask;
55  import org.activemq.store.PersistenceAdapter;
56  
57  import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
58  
59  /**
60   * A default Broker used for Topic messages for durable consumers
61   * 
62   * @version $Revision: 1.1.1.1 $
63   */
64  public class DurableTopicMessageContainerManager extends MessageContainerManagerSupport {
65      private static final Log log = LogFactory.getLog(DurableTopicMessageContainerManager.class);
66      private PersistenceAdapter persistenceAdapter;
67      protected SubscriptionContainer subscriptionContainer;
68      protected FilterFactory filterFactory;
69      protected Map activeSubscriptions = new ConcurrentHashMap();
70      private DestinationMap destinationMap = new DestinationMap();
71      private ConcurrentHashMap durableSubscriptions = new ConcurrentHashMap();
72      
73      public DurableTopicMessageContainerManager(PersistenceAdapter persistenceAdapter, RedeliveryPolicy redeliveryPolicy,DeadLetterPolicy deadLetterPolicy) {
74          this(persistenceAdapter, new DurableTopicSubscriptionContainerImpl(redeliveryPolicy,deadLetterPolicy), new FilterFactoryImpl(),
75                  new DispatcherImpl());
76      }
77  
78      public DurableTopicMessageContainerManager(PersistenceAdapter persistenceAdapter,
79              SubscriptionContainer subscriptionContainer, FilterFactory filterFactory, Dispatcher dispatcher) {
80          super(dispatcher);
81          this.persistenceAdapter = persistenceAdapter;
82          this.subscriptionContainer = subscriptionContainer;
83          this.filterFactory = filterFactory;
84          try {
85              loadAllMessageContainers();
86          }
87          catch (JMSException e) {
88              log.error("Failed to load initial Topic Containers",e);
89          }
90      }
91  
92      public void addMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException {
93          if (info.isDurableTopic()) {
94              if (log.isDebugEnabled()) {
95                  log.debug("Adding consumer: " + info);
96              }
97  
98              doAddMessageConsumer(client, info);
99          }
100     }
101 
102     public void removeMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException {
103         // we should not remove a durable topic subscription from the subscriptionContainer
104         // unless via the deleteSubscription() method
105         
106 //        subscriptionContainer.removeSubscription(info.getConsumerId());
107 //        subscription.clear();
108         
109         Subscription sub = (Subscription) activeSubscriptions.remove(info.getConsumerId());
110         if (sub != null) {
111             sub.setActive(false);
112             dispatcher.removeActiveSubscription(client, sub);
113         }
114     }
115 
116     /**
117      * Delete a durable subscriber
118      * 
119      * @param clientId
120      * @param subscriberName
121      * @throws javax.jms.JMSException if the subscriber doesn't exist or is still active
122      */
123     public void deleteSubscription(String clientId, String subscriberName) throws JMSException {
124         
125         String consumerKey = ConsumerInfo.generateConsumerKey(clientId, subscriberName);        
126         Subscription sub = (Subscription) durableSubscriptions.remove(consumerKey);
127         if( sub!=null ) {
128             //only delete if not active
129             if (sub.isActive()) {
130                 throw new JMSException("The Consummer " + subscriberName + " is still active");
131             }
132             else {
133                 subscriptionContainer.removeSubscription(sub.getConsumerId());
134                 sub.clear();
135                 
136                 Set containers = destinationMap.get(sub.getDestination());
137                 for (Iterator iter = containers.iterator();iter.hasNext();) {
138                     TopicMessageContainer container = (TopicMessageContainer) iter.next();
139                     if (container instanceof DurableTopicMessageContainer) {
140                         ((DurableTopicMessageContainer) container).deleteSubscription(sub.getPersistentKey());
141                     }
142                 }
143                 
144             }
145         } else {
146             throw new InvalidDestinationException("The Consumer " + subscriberName + " does not exist for client: "
147                     + clientId);
148         }
149     }
150 
151     /**
152      * @param client
153      * @param message
154      * @throws javax.jms.JMSException
155      */
156     public void sendMessage(final BrokerClient client, final ActiveMQMessage message) throws JMSException {
157         ActiveMQDestination dest = (ActiveMQDestination) message.getJMSDestination();
158         if (dest != null && dest.isTopic() && message.getJMSDeliveryMode() == DeliveryMode.PERSISTENT) {
159             // note that we still need to persist the message even if there are no matching
160             // subscribers as they may come along later
161             // plus we don't pre-load subscription information                
162             final MessageContainer container = getContainer(dest.getPhysicalName());
163             container.addMessage(message);
164             TransactionManager.getContexTransaction().addPostCommitTask(new TransactionTask() {
165                 public void execute() throws Throwable {
166                     doSendMessage(client, message, container);
167                 }
168             });
169         }
170     }
171 
172     /**
173      * @param client
174      * @param message
175      * @throws JMSException
176      */
177     private void doSendMessage(BrokerClient client, ActiveMQMessage message, MessageContainer container) throws JMSException {
178         Set matchingSubscriptions = subscriptionContainer.getSubscriptions(message.getJMSActiveMQDestination());
179         if (!matchingSubscriptions.isEmpty()) {
180             for (Iterator i = matchingSubscriptions.iterator();i.hasNext();) {
181                 Subscription sub = (Subscription) i.next();
182                 if (sub.isTarget(message)) {
183                     sub.addMessage(container, message);
184                 }
185             }
186             updateSendStats(client, message);
187         }        
188     }
189 
190     /**
191      * Returns an unmodifiable map, indexed by String name, of all the {@link javax.jms.Destination}
192      * objects used by non-broker consumers directly connected to this container
193      *
194      * @return
195      */
196     public Map getLocalDestinations() {
197         Map localDestinations = new HashMap();
198         for (Iterator iter = subscriptionContainer.subscriptionIterator(); iter.hasNext();) {
199             Subscription sub = (Subscription) iter.next();
200             if (sub.isLocalSubscription()) {
201                 final ActiveMQDestination dest = sub.getDestination();
202                 localDestinations.put(dest.getPhysicalName(), dest);
203             }
204         }
205         return Collections.unmodifiableMap(localDestinations);
206     }
207 
208     /**
209      * Acknowledge a message as being read and consumed byh the Consumer
210      * 
211      * @param client
212      * @param ack
213      * @throws javax.jms.JMSException
214      */
215     public void acknowledgeMessage(BrokerClient client, final MessageAck ack) throws JMSException {
216         if ( !ack.getDestination().isTopic() || !ack.isPersistent())
217             return;
218             
219         Subscription sub = (Subscription) activeSubscriptions.get(ack.getConsumerId());
220         if (sub == null) {
221             return;
222         }
223         
224         sub.messageConsumed(ack);
225     }
226 
227     /**
228      * poll or messages
229      * 
230      * @throws javax.jms.JMSException
231      */
232     public void poll() throws JMSException {
233         //do nothing
234     }
235 
236     // Implementation methods
237     //-------------------------------------------------------------------------
238     protected MessageContainer createContainer(String destinationName) throws JMSException {
239         TopicMessageContainer topicMessageContainer = 
240            new DurableTopicMessageContainer(this, persistenceAdapter.createTopicMessageStore(destinationName), destinationName);
241         destinationMap.put(new ActiveMQTopic(destinationName), topicMessageContainer);
242         return topicMessageContainer;
243     }
244 
245     protected Destination createDestination(String destinationName) {
246         return new ActiveMQTopic(destinationName);
247     }
248 
249     public boolean isConsumerActiveOnDestination(ActiveMQDestination dest) {
250         for (Iterator iter = activeSubscriptions.values().iterator();iter.hasNext();) {
251             Subscription subscription = (Subscription) iter.next();
252             if( subscription.getDestination().equals(dest) ) {
253                 return true;
254             }
255         }
256         return false;
257     }
258     
259     protected void doAddMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException {
260         boolean shouldRecover = false;
261         if (info.getConsumerName() != null && info.getClientId() != null) {
262             for (Iterator iter = activeSubscriptions.values().iterator();iter.hasNext();) {
263                 Subscription subscription = (Subscription) iter.next();
264                 if (subscription.isSameDurableSubscription(info)) {
265                     throw new DuplicateDurableSubscriptionException(info);
266                 }
267             }
268         }        
269         Subscription subscription = (Subscription) durableSubscriptions.get(info.getConsumerKey());  
270         //subscriptionContainer.getSubscription(info.getConsumerId());
271         if (subscription != null) {
272             //check the subscription hasn't changed
273             if (!equal(subscription.getDestination(), info.getDestination())
274                     || !equal(subscription.getSelector(), info.getSelector())) {
275                 subscriptionContainer.removeSubscription(info.getConsumerId());
276                 subscription.clear();
277                 subscription = subscriptionContainer.makeSubscription(dispatcher, client, info, createFilter(info));
278                 durableSubscriptions.put(info.getConsumerKey(), subscription);
279             }
280         }
281         else {
282             subscription = subscriptionContainer.makeSubscription(dispatcher, client,info, createFilter(info));
283             shouldRecover = true;
284             durableSubscriptions.put(info.getConsumerKey(), subscription);
285         }
286         subscription.setActiveConsumer(client,info);
287         activeSubscriptions.put(info.getConsumerId(), subscription);
288         dispatcher.addActiveSubscription(client, subscription);
289         
290         // load the container
291         getContainer(subscription.getDestination().getPhysicalName());
292         
293         Set containers = destinationMap.get(subscription.getDestination());
294         for (Iterator iter = containers.iterator();iter.hasNext();) {
295             TopicMessageContainer container = (TopicMessageContainer) iter.next();
296             if (container instanceof DurableTopicMessageContainer) {
297                 ((DurableTopicMessageContainer) container).storeSubscription(info, subscription);
298             }
299         }
300         if (shouldRecover) {
301             recoverSubscriptions(subscription);
302         }
303         // lets not make the subscription active until later
304         // as we can't start dispatching until we've sent back the receipt
305         // TODO we might wish to register a post-receipt action here
306         // to perform the wakeup
307         subscription.setActive(true);
308         //dispatcher.wakeup(subscription);
309     }
310 
311     /**
312      * Returns true if the two objects are null or are equal
313      */
314     protected final boolean equal(Object o1, Object o2) {
315         return o1 == o2 || (o1 != null && o1.equals(o2));
316     }
317 
318     /**
319      * This method is called when a new durable subscription is started and so we need to go through each matching
320      * message container and dispatch any matching messages that may be outstanding
321      * 
322      * @param subscription
323      */
324     protected void recoverSubscriptions(Subscription subscription) throws JMSException {
325         // we should load all of the message containers from disk if we're a wildcard
326         
327         getContainer(subscription.getDestination().getPhysicalName());
328         Set containers = destinationMap.get(subscription.getDestination());
329         for (Iterator iter = containers.iterator();iter.hasNext();) {
330             TopicMessageContainer container = (TopicMessageContainer) iter.next();
331             container.recoverSubscription(subscription);
332         }
333     }
334 
335     /**
336      * Called when recovering a wildcard subscription where we need to load all the durable message containers (for
337      * which we have any outstanding messages to deliver) into RAM
338      */
339     protected void loadAllMessageContainers() throws JMSException {
340         Map destinations = persistenceAdapter.getInitialDestinations();
341         if (destinations != null) {
342             for (Iterator iter = destinations.entrySet().iterator();iter.hasNext();) {
343                 Map.Entry entry = (Map.Entry) iter.next();
344                 String name = (String) entry.getKey();
345                 Destination destination = (Destination) entry.getValue();
346                 if ( destination instanceof Topic ) {
347                     loadContainer(name, destination);
348                 }
349             }
350         }
351     }
352 
353     /**
354      * Create filter for a Consumer
355      * 
356      * @param info
357      * @return the Fitler
358      * @throws javax.jms.JMSException
359      */
360     protected Filter createFilter(ConsumerInfo info) throws JMSException {
361         Filter filter = filterFactory.createFilter(info.getDestination(), info.getSelector());
362         if (info.isNoLocal()) {
363             filter = new AndFilter(filter, new NoLocalFilter(info.getClientId()));
364         }
365         return filter;
366     }
367     
368     public void createMessageContainer(ActiveMQDestination dest) throws JMSException {
369         // This container only does topics.
370         if(!dest.isTopic()) 
371             return;
372         super.createMessageContainer(dest);
373     }
374     
375     public synchronized void destroyMessageContainer(ActiveMQDestination dest) throws JMSException {
376         // This container only does topics.
377         if(!dest.isTopic()) 
378             return;
379         super.destroyMessageContainer(dest);
380         destinationMap.removeAll(dest);
381     }
382 }