Docjar: A Java Source and Docuemnt Enginecom.*    java.*    javax.*    org.*    all    new    plug-in

Quick Search    Search Deep

Source code: org/activemq/service/impl/TransientTopicMessageContainerManager.java


1   /** 
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  package org.activemq.service.impl;
19  
20  import org.apache.commons.logging.Log;
21  import org.apache.commons.logging.LogFactory;
22  import org.activemq.broker.BrokerClient;
23  import org.activemq.filter.FilterFactory;
24  import org.activemq.filter.FilterFactoryImpl;
25  import org.activemq.message.ActiveMQDestination;
26  import org.activemq.message.ActiveMQMessage;
27  import org.activemq.message.ConsumerInfo;
28  import org.activemq.service.Dispatcher;
29  import org.activemq.service.MessageContainer;
30  import org.activemq.service.Subscription;
31  import org.activemq.service.SubscriptionContainer;
32  import org.activemq.service.RedeliveryPolicy;
33  import org.activemq.service.DeadLetterPolicy;
34  import org.activemq.service.TransactionManager;
35  import org.activemq.service.TransactionTask;
36  import org.activemq.store.PersistenceAdapter;
37  
38  import javax.jms.DeliveryMode;
39  import javax.jms.JMSException;
40  import java.util.Iterator;
41  import java.util.Set;
42  
43  /**
44   * A default implementation of a Broker of Topic messages for transient consumers
45   *
46   * @version $Revision: 1.1.1.1 $
47   */
48  public class TransientTopicMessageContainerManager extends DurableTopicMessageContainerManager {
49      private static final Log log = LogFactory.getLog(TransientTopicMessageContainerManager.class);
50  
51      public TransientTopicMessageContainerManager(PersistenceAdapter persistenceAdapter) {
52          this(persistenceAdapter, new SubscriptionContainerImpl(new RedeliveryPolicy(), new DeadLetterPolicy()), new FilterFactoryImpl(), new DispatcherImpl());
53      }
54  
55      public TransientTopicMessageContainerManager(PersistenceAdapter persistenceAdapter, SubscriptionContainer subscriptionContainer, FilterFactory filterFactory, Dispatcher dispatcher) {
56          super(persistenceAdapter, subscriptionContainer, filterFactory, dispatcher);
57      }
58  
59      /**
60       * @param client
61       * @param info
62       * @throws javax.jms.JMSException
63       */
64      public void addMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException {
65          if (info.getDestination().isTopic()) {
66              doAddMessageConsumer(client, info);
67          }
68      }
69  
70  
71      /**
72       * @param client
73       * @param info
74       * @throws javax.jms.JMSException
75       */
76      public void removeMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException {
77          Subscription sub = (Subscription) activeSubscriptions.remove(info.getConsumerId());
78          if (sub != null) {
79              sub.setActive(false);
80              dispatcher.removeActiveSubscription(client, sub);
81              subscriptionContainer.removeSubscription(info.getConsumerId());
82              sub.clear();
83          }
84      }
85  
86  
87      /**
88       * @param client
89       * @param message
90       * @throws javax.jms.JMSException
91       */
92      public void sendMessage(final BrokerClient client, final ActiveMQMessage message) throws JMSException {
93          final ActiveMQDestination destination = message.getJMSActiveMQDestination();
94          if (destination == null || !destination.isTopic()) {
95              return;
96          }
97          
98          TransactionManager.getContexTransaction().addPostCommitTask(new TransactionTask(){
99              public void execute() throws Throwable {
100                 doSendMessage(client, message, destination);
101             }
102         });      
103         
104     }
105 
106     /**
107      * @param client
108      * @param message
109      * @param destination
110      * @throws JMSException
111      */
112     private void doSendMessage(BrokerClient client, ActiveMQMessage message, ActiveMQDestination destination) throws JMSException {
113         MessageContainer container = null;
114         if (log.isDebugEnabled()) {
115             log.debug("Dispaching to " + subscriptionContainer + " subscriptions with message: " + message);
116         }
117         Set subscriptions = subscriptionContainer.getSubscriptions(destination);
118         for (Iterator i = subscriptions.iterator(); i.hasNext();) {
119             Subscription sub = (Subscription) i.next();
120             if (sub.isTarget(message) && (!sub.isDurableTopic() || message.getJMSDeliveryMode() == DeliveryMode.NON_PERSISTENT)) {
121                 if (container == null) {
122                     container = getContainer(message.getJMSDestination().toString());
123                     container.addMessage(message);
124                 }
125                 sub.addMessage(container, message);
126             }
127         }
128         updateSendStats(client, message);
129     }
130 
131     /**
132      * Delete a durable subscriber
133      *
134      * @param clientId
135      * @param subscriberName
136      * @throws javax.jms.JMSException if the subscriber doesn't exist or is still active
137      */
138     public void deleteSubscription(String clientId, String subscriberName) throws JMSException {
139     }
140 }