Source code: org/activemq/service/impl/TransientTopicMessageContainerManager.java
1 /**
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18 package org.activemq.service.impl;
19
20 import org.apache.commons.logging.Log;
21 import org.apache.commons.logging.LogFactory;
22 import org.activemq.broker.BrokerClient;
23 import org.activemq.filter.FilterFactory;
24 import org.activemq.filter.FilterFactoryImpl;
25 import org.activemq.message.ActiveMQDestination;
26 import org.activemq.message.ActiveMQMessage;
27 import org.activemq.message.ConsumerInfo;
28 import org.activemq.service.Dispatcher;
29 import org.activemq.service.MessageContainer;
30 import org.activemq.service.Subscription;
31 import org.activemq.service.SubscriptionContainer;
32 import org.activemq.service.RedeliveryPolicy;
33 import org.activemq.service.DeadLetterPolicy;
34 import org.activemq.service.TransactionManager;
35 import org.activemq.service.TransactionTask;
36 import org.activemq.store.PersistenceAdapter;
37
38 import javax.jms.DeliveryMode;
39 import javax.jms.JMSException;
40 import java.util.Iterator;
41 import java.util.Set;
42
43 /**
44 * A default implementation of a Broker of Topic messages for transient consumers
45 *
46 * @version $Revision: 1.1.1.1 $
47 */
48 public class TransientTopicMessageContainerManager extends DurableTopicMessageContainerManager {
49 private static final Log log = LogFactory.getLog(TransientTopicMessageContainerManager.class);
50
51 public TransientTopicMessageContainerManager(PersistenceAdapter persistenceAdapter) {
52 this(persistenceAdapter, new SubscriptionContainerImpl(new RedeliveryPolicy(), new DeadLetterPolicy()), new FilterFactoryImpl(), new DispatcherImpl());
53 }
54
55 public TransientTopicMessageContainerManager(PersistenceAdapter persistenceAdapter, SubscriptionContainer subscriptionContainer, FilterFactory filterFactory, Dispatcher dispatcher) {
56 super(persistenceAdapter, subscriptionContainer, filterFactory, dispatcher);
57 }
58
59 /**
60 * @param client
61 * @param info
62 * @throws javax.jms.JMSException
63 */
64 public void addMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException {
65 if (info.getDestination().isTopic()) {
66 doAddMessageConsumer(client, info);
67 }
68 }
69
70
71 /**
72 * @param client
73 * @param info
74 * @throws javax.jms.JMSException
75 */
76 public void removeMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException {
77 Subscription sub = (Subscription) activeSubscriptions.remove(info.getConsumerId());
78 if (sub != null) {
79 sub.setActive(false);
80 dispatcher.removeActiveSubscription(client, sub);
81 subscriptionContainer.removeSubscription(info.getConsumerId());
82 sub.clear();
83 }
84 }
85
86
87 /**
88 * @param client
89 * @param message
90 * @throws javax.jms.JMSException
91 */
92 public void sendMessage(final BrokerClient client, final ActiveMQMessage message) throws JMSException {
93 final ActiveMQDestination destination = message.getJMSActiveMQDestination();
94 if (destination == null || !destination.isTopic()) {
95 return;
96 }
97
98 TransactionManager.getContexTransaction().addPostCommitTask(new TransactionTask(){
99 public void execute() throws Throwable {
100 doSendMessage(client, message, destination);
101 }
102 });
103
104 }
105
106 /**
107 * @param client
108 * @param message
109 * @param destination
110 * @throws JMSException
111 */
112 private void doSendMessage(BrokerClient client, ActiveMQMessage message, ActiveMQDestination destination) throws JMSException {
113 MessageContainer container = null;
114 if (log.isDebugEnabled()) {
115 log.debug("Dispaching to " + subscriptionContainer + " subscriptions with message: " + message);
116 }
117 Set subscriptions = subscriptionContainer.getSubscriptions(destination);
118 for (Iterator i = subscriptions.iterator(); i.hasNext();) {
119 Subscription sub = (Subscription) i.next();
120 if (sub.isTarget(message) && (!sub.isDurableTopic() || message.getJMSDeliveryMode() == DeliveryMode.NON_PERSISTENT)) {
121 if (container == null) {
122 container = getContainer(message.getJMSDestination().toString());
123 container.addMessage(message);
124 }
125 sub.addMessage(container, message);
126 }
127 }
128 updateSendStats(client, message);
129 }
130
131 /**
132 * Delete a durable subscriber
133 *
134 * @param clientId
135 * @param subscriberName
136 * @throws javax.jms.JMSException if the subscriber doesn't exist or is still active
137 */
138 public void deleteSubscription(String clientId, String subscriberName) throws JMSException {
139 }
140 }