Docjar: A Java Source and Docuemnt Enginecom.*    java.*    javax.*    org.*    all    new    plug-in

Quick Search    Search Deep

Source code: org/activemq/service/impl/InitialImageMessageContainerManager.java


1   /** 
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  package org.activemq.service.impl;
19  
20  import java.util.Iterator;
21  import java.util.Map;
22  
23  import javax.jms.Destination;
24  import javax.jms.JMSException;
25  
26  import org.activemq.broker.BrokerClient;
27  import org.activemq.filter.DestinationFilter;
28  import org.activemq.message.ActiveMQDestination;
29  import org.activemq.message.ActiveMQMessage;
30  import org.activemq.message.ConsumerInfo;
31  import org.activemq.service.MessageContainerManager;
32  
33  /**
34   * Implements an initial image service where on subscription
35   * the client will receive the last image that was previously cached.
36   * This is very useful in financial market data and in rapidly changing
37   * transient event models where you don't want to persist messages
38   * when you are away, but wish to cache the last image, per destination
39   * around so that when a new reliable consumer subscribes you receive the
40   * latest value you may have missed.
41   * <p/>
42   * This is especially true in finance with slow moving markets where you may
43   * have to wait a while for an update (or times when you subscribe after
44   * market close etc).
45   *
46   * @version $Revision: 1.1.1.1 $
47   */
48  public class InitialImageMessageContainerManager extends ProxyMessageContainerManager {
49      private Map cache;
50      private boolean topic;
51      private DestinationFilter destinationFilter;
52  
53      /**
54       * Creates a topic based initial image message container manager using the given destination filter
55       *
56       * @param delegate
57       * @param cache
58       * @param destinationFilter
59       */
60      public InitialImageMessageContainerManager(MessageContainerManager delegate, Map cache, DestinationFilter destinationFilter) {
61          this(delegate, cache, true, destinationFilter);
62      }
63  
64      public InitialImageMessageContainerManager(MessageContainerManager delegate, Map cache, boolean topic, DestinationFilter destinationFilter) {
65          super(delegate);
66          this.cache = cache;
67          this.topic = topic;
68          this.destinationFilter = destinationFilter;
69      }
70  
71      public void addMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException {
72          super.addMessageConsumer(client, info);
73  
74          // lookup message for destination
75          ActiveMQDestination destination = info.getDestination();
76          if (isValid(destination)) {
77              if (destination.isWildcard()) {
78                  DestinationFilter filter = DestinationFilter.parseFilter(destination);
79                  sendMatchingInitialImages(client, info, filter);
80              }
81              else {
82                  ActiveMQMessage message = null;
83                  synchronized (cache) {
84                      message = (ActiveMQMessage) cache.get(destination);
85                  }
86                  if (message != null) {
87                      sendMessage(client, message);
88                  }
89              }
90          }
91      }
92  
93      public void sendMessage(BrokerClient client, ActiveMQMessage message) throws JMSException {
94          ActiveMQDestination destination = message.getJMSActiveMQDestination();
95          if (isValid(destination)) {
96              cache.put(destination, message);
97          }
98          super.sendMessage(client, message);
99      }
100    
101 
102     // Implementation methods
103     //-------------------------------------------------------------------------
104     protected void sendMatchingInitialImages(BrokerClient client, ConsumerInfo info, DestinationFilter filter) throws JMSException {
105         synchronized (cache) {
106             for (Iterator iter = cache.entrySet().iterator(); iter.hasNext();) {
107                 Map.Entry entry = (Map.Entry) iter.next();
108                 Destination destination = (Destination) entry.getKey();
109                 if (filter.matches(destination)) {
110                     ActiveMQMessage message = (ActiveMQMessage) entry.getValue();
111                     sendMessage(client, message);
112                 }
113             }
114         }
115     }
116 
117 
118     /**
119      * Does this message match the destinations on which initial image caching should be used
120      *
121      * @param destination
122      * @return true if the given destination should use initial image caching
123      *         which is typically true if the message is a topic which may match
124      *         an optional DestinationFilter
125      */
126     protected boolean isValid(ActiveMQDestination destination) {
127         return destination.isTopic() == topic && (destinationFilter == null || destinationFilter.matches(destination));
128     }
129 }