Source code: org/activemq/service/impl/DurableTopicMessageContainerManager.java
1 /**
2 * Copyright 2004 Protique Ltd
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *
16 **/
17
18 package org.activemq.service.impl;
19 import java.util.Collections;
20 import java.util.HashMap;
21 import java.util.Iterator;
22 import java.util.Map;
23 import java.util.Set;
24
25 import javax.jms.DeliveryMode;
26 import javax.jms.Destination;
27 import javax.jms.InvalidDestinationException;
28 import javax.jms.JMSException;
29 import javax.jms.Topic;
30
31 import org.apache.commons.logging.Log;
32 import org.apache.commons.logging.LogFactory;
33 import org.activemq.DuplicateDurableSubscriptionException;
34 import org.activemq.broker.BrokerClient;
35 import org.activemq.filter.AndFilter;
36 import org.activemq.filter.DestinationMap;
37 import org.activemq.filter.Filter;
38 import org.activemq.filter.FilterFactory;
39 import org.activemq.filter.FilterFactoryImpl;
40 import org.activemq.filter.NoLocalFilter;
41 import org.activemq.message.ActiveMQDestination;
42 import org.activemq.message.ActiveMQMessage;
43 import org.activemq.message.ActiveMQTopic;
44 import org.activemq.message.ConsumerInfo;
45 import org.activemq.message.MessageAck;
46 import org.activemq.service.DeadLetterPolicy;
47 import org.activemq.service.Dispatcher;
48 import org.activemq.service.MessageContainer;
49 import org.activemq.service.RedeliveryPolicy;
50 import org.activemq.service.Subscription;
51 import org.activemq.service.SubscriptionContainer;
52 import org.activemq.service.TopicMessageContainer;
53 import org.activemq.service.TransactionManager;
54 import org.activemq.service.TransactionTask;
55 import org.activemq.store.PersistenceAdapter;
56
57 import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
58
59 /**
60 * A default Broker used for Topic messages for durable consumers
61 *
62 * @version $Revision: 1.1.1.1 $
63 */
64 public class DurableTopicMessageContainerManager extends MessageContainerManagerSupport {
65 private static final Log log = LogFactory.getLog(DurableTopicMessageContainerManager.class);
66 private PersistenceAdapter persistenceAdapter;
67 protected SubscriptionContainer subscriptionContainer;
68 protected FilterFactory filterFactory;
69 protected Map activeSubscriptions = new ConcurrentHashMap();
70 private DestinationMap destinationMap = new DestinationMap();
71 private ConcurrentHashMap durableSubscriptions = new ConcurrentHashMap();
72
73 public DurableTopicMessageContainerManager(PersistenceAdapter persistenceAdapter, RedeliveryPolicy redeliveryPolicy,DeadLetterPolicy deadLetterPolicy) {
74 this(persistenceAdapter, new DurableTopicSubscriptionContainerImpl(redeliveryPolicy,deadLetterPolicy), new FilterFactoryImpl(),
75 new DispatcherImpl());
76 }
77
78 public DurableTopicMessageContainerManager(PersistenceAdapter persistenceAdapter,
79 SubscriptionContainer subscriptionContainer, FilterFactory filterFactory, Dispatcher dispatcher) {
80 super(dispatcher);
81 this.persistenceAdapter = persistenceAdapter;
82 this.subscriptionContainer = subscriptionContainer;
83 this.filterFactory = filterFactory;
84 try {
85 loadAllMessageContainers();
86 }
87 catch (JMSException e) {
88 log.error("Failed to load initial Topic Containers",e);
89 }
90 }
91
92 public void addMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException {
93 if (info.isDurableTopic()) {
94 if (log.isDebugEnabled()) {
95 log.debug("Adding consumer: " + info);
96 }
97
98 doAddMessageConsumer(client, info);
99 }
100 }
101
102 public void removeMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException {
103 // we should not remove a durable topic subscription from the subscriptionContainer
104 // unless via the deleteSubscription() method
105
106 // subscriptionContainer.removeSubscription(info.getConsumerId());
107 // subscription.clear();
108
109 Subscription sub = (Subscription) activeSubscriptions.remove(info.getConsumerId());
110 if (sub != null) {
111 sub.setActive(false);
112 dispatcher.removeActiveSubscription(client, sub);
113 }
114 }
115
116 /**
117 * Delete a durable subscriber
118 *
119 * @param clientId
120 * @param subscriberName
121 * @throws javax.jms.JMSException if the subscriber doesn't exist or is still active
122 */
123 public void deleteSubscription(String clientId, String subscriberName) throws JMSException {
124
125 String consumerKey = ConsumerInfo.generateConsumerKey(clientId, subscriberName);
126 Subscription sub = (Subscription) durableSubscriptions.remove(consumerKey);
127 if( sub!=null ) {
128 //only delete if not active
129 if (sub.isActive()) {
130 throw new JMSException("The Consummer " + subscriberName + " is still active");
131 }
132 else {
133 subscriptionContainer.removeSubscription(sub.getConsumerId());
134 sub.clear();
135
136 Set containers = destinationMap.get(sub.getDestination());
137 for (Iterator iter = containers.iterator();iter.hasNext();) {
138 TopicMessageContainer container = (TopicMessageContainer) iter.next();
139 if (container instanceof DurableTopicMessageContainer) {
140 ((DurableTopicMessageContainer) container).deleteSubscription(sub.getPersistentKey());
141 }
142 }
143
144 }
145 } else {
146 throw new InvalidDestinationException("The Consumer " + subscriberName + " does not exist for client: "
147 + clientId);
148 }
149 }
150
151 /**
152 * @param client
153 * @param message
154 * @throws javax.jms.JMSException
155 */
156 public void sendMessage(final BrokerClient client, final ActiveMQMessage message) throws JMSException {
157 ActiveMQDestination dest = (ActiveMQDestination) message.getJMSDestination();
158 if (dest != null && dest.isTopic() && message.getJMSDeliveryMode() == DeliveryMode.PERSISTENT) {
159 // note that we still need to persist the message even if there are no matching
160 // subscribers as they may come along later
161 // plus we don't pre-load subscription information
162 final MessageContainer container = getContainer(dest.getPhysicalName());
163 container.addMessage(message);
164 TransactionManager.getContexTransaction().addPostCommitTask(new TransactionTask() {
165 public void execute() throws Throwable {
166 doSendMessage(client, message, container);
167 }
168 });
169 }
170 }
171
172 /**
173 * @param client
174 * @param message
175 * @throws JMSException
176 */
177 private void doSendMessage(BrokerClient client, ActiveMQMessage message, MessageContainer container) throws JMSException {
178 Set matchingSubscriptions = subscriptionContainer.getSubscriptions(message.getJMSActiveMQDestination());
179 if (!matchingSubscriptions.isEmpty()) {
180 for (Iterator i = matchingSubscriptions.iterator();i.hasNext();) {
181 Subscription sub = (Subscription) i.next();
182 if (sub.isTarget(message)) {
183 sub.addMessage(container, message);
184 }
185 }
186 updateSendStats(client, message);
187 }
188 }
189
190 /**
191 * Returns an unmodifiable map, indexed by String name, of all the {@link javax.jms.Destination}
192 * objects used by non-broker consumers directly connected to this container
193 *
194 * @return
195 */
196 public Map getLocalDestinations() {
197 Map localDestinations = new HashMap();
198 for (Iterator iter = subscriptionContainer.subscriptionIterator(); iter.hasNext();) {
199 Subscription sub = (Subscription) iter.next();
200 if (sub.isLocalSubscription()) {
201 final ActiveMQDestination dest = sub.getDestination();
202 localDestinations.put(dest.getPhysicalName(), dest);
203 }
204 }
205 return Collections.unmodifiableMap(localDestinations);
206 }
207
208 /**
209 * Acknowledge a message as being read and consumed byh the Consumer
210 *
211 * @param client
212 * @param ack
213 * @throws javax.jms.JMSException
214 */
215 public void acknowledgeMessage(BrokerClient client, final MessageAck ack) throws JMSException {
216 if ( !ack.getDestination().isTopic() || !ack.isPersistent())
217 return;
218
219 Subscription sub = (Subscription) activeSubscriptions.get(ack.getConsumerId());
220 if (sub == null) {
221 return;
222 }
223
224 sub.messageConsumed(ack);
225 }
226
227 /**
228 * poll or messages
229 *
230 * @throws javax.jms.JMSException
231 */
232 public void poll() throws JMSException {
233 //do nothing
234 }
235
236 // Implementation methods
237 //-------------------------------------------------------------------------
238 protected MessageContainer createContainer(String destinationName) throws JMSException {
239 TopicMessageContainer topicMessageContainer =
240 new DurableTopicMessageContainer(this, persistenceAdapter.createTopicMessageStore(destinationName), destinationName);
241 destinationMap.put(new ActiveMQTopic(destinationName), topicMessageContainer);
242 return topicMessageContainer;
243 }
244
245 protected Destination createDestination(String destinationName) {
246 return new ActiveMQTopic(destinationName);
247 }
248
249 public boolean isConsumerActiveOnDestination(ActiveMQDestination dest) {
250 for (Iterator iter = activeSubscriptions.values().iterator();iter.hasNext();) {
251 Subscription subscription = (Subscription) iter.next();
252 if( subscription.getDestination().equals(dest) ) {
253 return true;
254 }
255 }
256 return false;
257 }
258
259 protected void doAddMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException {
260 boolean shouldRecover = false;
261 if (info.getConsumerName() != null && info.getClientId() != null) {
262 for (Iterator iter = activeSubscriptions.values().iterator();iter.hasNext();) {
263 Subscription subscription = (Subscription) iter.next();
264 if (subscription.isSameDurableSubscription(info)) {
265 throw new DuplicateDurableSubscriptionException(info);
266 }
267 }
268 }
269 Subscription subscription = (Subscription) durableSubscriptions.get(info.getConsumerKey());
270 //subscriptionContainer.getSubscription(info.getConsumerId());
271 if (subscription != null) {
272 //check the subscription hasn't changed
273 if (!equal(subscription.getDestination(), info.getDestination())
274 || !equal(subscription.getSelector(), info.getSelector())) {
275 subscriptionContainer.removeSubscription(info.getConsumerId());
276 subscription.clear();
277 subscription = subscriptionContainer.makeSubscription(dispatcher, client, info, createFilter(info));
278 durableSubscriptions.put(info.getConsumerKey(), subscription);
279 }
280 }
281 else {
282 subscription = subscriptionContainer.makeSubscription(dispatcher, client,info, createFilter(info));
283 shouldRecover = true;
284 durableSubscriptions.put(info.getConsumerKey(), subscription);
285 }
286 subscription.setActiveConsumer(client,info);
287 activeSubscriptions.put(info.getConsumerId(), subscription);
288 dispatcher.addActiveSubscription(client, subscription);
289
290 // load the container
291 getContainer(subscription.getDestination().getPhysicalName());
292
293 Set containers = destinationMap.get(subscription.getDestination());
294 for (Iterator iter = containers.iterator();iter.hasNext();) {
295 TopicMessageContainer container = (TopicMessageContainer) iter.next();
296 if (container instanceof DurableTopicMessageContainer) {
297 ((DurableTopicMessageContainer) container).storeSubscription(info, subscription);
298 }
299 }
300 if (shouldRecover) {
301 recoverSubscriptions(subscription);
302 }
303 // lets not make the subscription active until later
304 // as we can't start dispatching until we've sent back the receipt
305 // TODO we might wish to register a post-receipt action here
306 // to perform the wakeup
307 subscription.setActive(true);
308 //dispatcher.wakeup(subscription);
309 }
310
311 /**
312 * Returns true if the two objects are null or are equal
313 */
314 protected final boolean equal(Object o1, Object o2) {
315 return o1 == o2 || (o1 != null && o1.equals(o2));
316 }
317
318 /**
319 * This method is called when a new durable subscription is started and so we need to go through each matching
320 * message container and dispatch any matching messages that may be outstanding
321 *
322 * @param subscription
323 */
324 protected void recoverSubscriptions(Subscription subscription) throws JMSException {
325 // we should load all of the message containers from disk if we're a wildcard
326
327 getContainer(subscription.getDestination().getPhysicalName());
328 Set containers = destinationMap.get(subscription.getDestination());
329 for (Iterator iter = containers.iterator();iter.hasNext();) {
330 TopicMessageContainer container = (TopicMessageContainer) iter.next();
331 container.recoverSubscription(subscription);
332 }
333 }
334
335 /**
336 * Called when recovering a wildcard subscription where we need to load all the durable message containers (for
337 * which we have any outstanding messages to deliver) into RAM
338 */
339 protected void loadAllMessageContainers() throws JMSException {
340 Map destinations = persistenceAdapter.getInitialDestinations();
341 if (destinations != null) {
342 for (Iterator iter = destinations.entrySet().iterator();iter.hasNext();) {
343 Map.Entry entry = (Map.Entry) iter.next();
344 String name = (String) entry.getKey();
345 Destination destination = (Destination) entry.getValue();
346 if ( destination instanceof Topic ) {
347 loadContainer(name, destination);
348 }
349 }
350 }
351 }
352
353 /**
354 * Create filter for a Consumer
355 *
356 * @param info
357 * @return the Fitler
358 * @throws javax.jms.JMSException
359 */
360 protected Filter createFilter(ConsumerInfo info) throws JMSException {
361 Filter filter = filterFactory.createFilter(info.getDestination(), info.getSelector());
362 if (info.isNoLocal()) {
363 filter = new AndFilter(filter, new NoLocalFilter(info.getClientId()));
364 }
365 return filter;
366 }
367
368 public void createMessageContainer(ActiveMQDestination dest) throws JMSException {
369 // This container only does topics.
370 if(!dest.isTopic())
371 return;
372 super.createMessageContainer(dest);
373 }
374
375 public synchronized void destroyMessageContainer(ActiveMQDestination dest) throws JMSException {
376 // This container only does topics.
377 if(!dest.isTopic())
378 return;
379 super.destroyMessageContainer(dest);
380 destinationMap.removeAll(dest);
381 }
382 }