Source code: org/activemq/service/impl/InitialImageMessageContainerManager.java
1 /**
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18 package org.activemq.service.impl;
19
20 import java.util.Iterator;
21 import java.util.Map;
22
23 import javax.jms.Destination;
24 import javax.jms.JMSException;
25
26 import org.activemq.broker.BrokerClient;
27 import org.activemq.filter.DestinationFilter;
28 import org.activemq.message.ActiveMQDestination;
29 import org.activemq.message.ActiveMQMessage;
30 import org.activemq.message.ConsumerInfo;
31 import org.activemq.service.MessageContainerManager;
32
33 /**
34 * Implements an initial image service where on subscription
35 * the client will receive the last image that was previously cached.
36 * This is very useful in financial market data and in rapidly changing
37 * transient event models where you don't want to persist messages
38 * when you are away, but wish to cache the last image, per destination
39 * around so that when a new reliable consumer subscribes you receive the
40 * latest value you may have missed.
41 * <p/>
42 * This is especially true in finance with slow moving markets where you may
43 * have to wait a while for an update (or times when you subscribe after
44 * market close etc).
45 *
46 * @version $Revision: 1.1.1.1 $
47 */
48 public class InitialImageMessageContainerManager extends ProxyMessageContainerManager {
49 private Map cache;
50 private boolean topic;
51 private DestinationFilter destinationFilter;
52
53 /**
54 * Creates a topic based initial image message container manager using the given destination filter
55 *
56 * @param delegate
57 * @param cache
58 * @param destinationFilter
59 */
60 public InitialImageMessageContainerManager(MessageContainerManager delegate, Map cache, DestinationFilter destinationFilter) {
61 this(delegate, cache, true, destinationFilter);
62 }
63
64 public InitialImageMessageContainerManager(MessageContainerManager delegate, Map cache, boolean topic, DestinationFilter destinationFilter) {
65 super(delegate);
66 this.cache = cache;
67 this.topic = topic;
68 this.destinationFilter = destinationFilter;
69 }
70
71 public void addMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException {
72 super.addMessageConsumer(client, info);
73
74 // lookup message for destination
75 ActiveMQDestination destination = info.getDestination();
76 if (isValid(destination)) {
77 if (destination.isWildcard()) {
78 DestinationFilter filter = DestinationFilter.parseFilter(destination);
79 sendMatchingInitialImages(client, info, filter);
80 }
81 else {
82 ActiveMQMessage message = null;
83 synchronized (cache) {
84 message = (ActiveMQMessage) cache.get(destination);
85 }
86 if (message != null) {
87 sendMessage(client, message);
88 }
89 }
90 }
91 }
92
93 public void sendMessage(BrokerClient client, ActiveMQMessage message) throws JMSException {
94 ActiveMQDestination destination = message.getJMSActiveMQDestination();
95 if (isValid(destination)) {
96 cache.put(destination, message);
97 }
98 super.sendMessage(client, message);
99 }
100
101
102 // Implementation methods
103 //-------------------------------------------------------------------------
104 protected void sendMatchingInitialImages(BrokerClient client, ConsumerInfo info, DestinationFilter filter) throws JMSException {
105 synchronized (cache) {
106 for (Iterator iter = cache.entrySet().iterator(); iter.hasNext();) {
107 Map.Entry entry = (Map.Entry) iter.next();
108 Destination destination = (Destination) entry.getKey();
109 if (filter.matches(destination)) {
110 ActiveMQMessage message = (ActiveMQMessage) entry.getValue();
111 sendMessage(client, message);
112 }
113 }
114 }
115 }
116
117
118 /**
119 * Does this message match the destinations on which initial image caching should be used
120 *
121 * @param destination
122 * @return true if the given destination should use initial image caching
123 * which is typically true if the message is a topic which may match
124 * an optional DestinationFilter
125 */
126 protected boolean isValid(ActiveMQDestination destination) {
127 return destination.isTopic() == topic && (destinationFilter == null || destinationFilter.matches(destination));
128 }
129 }