Docjar: A Java Source and Docuemnt Enginecom.*    java.*    javax.*    org.*    all    new    plug-in

Quick Search    Search Deep

Source code: org/activemq/service/boundedvm/TransientTopicBoundedMessageContainer.java


1   /** 
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  
19  package org.activemq.service.boundedvm;
20  import java.util.ArrayList;
21  import java.util.Iterator;
22  import java.util.List;
23  import java.util.Set;
24  
25  import javax.jms.JMSException;
26  
27  import org.apache.commons.logging.Log;
28  import org.apache.commons.logging.LogFactory;
29  import org.activemq.broker.BrokerClient;
30  import org.activemq.filter.DestinationMap;
31  import org.activemq.filter.Filter;
32  import org.activemq.io.util.MemoryBoundedQueue;
33  import org.activemq.message.ActiveMQDestination;
34  import org.activemq.message.ActiveMQMessage;
35  import org.activemq.message.ConsumerInfo;
36  import org.activemq.message.MessageAck;
37  import org.activemq.service.MessageContainer;
38  import org.activemq.service.MessageContainerAdmin;
39  import org.activemq.service.MessageIdentity;
40  import org.activemq.service.Service;
41  
42  import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
43  import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArrayList;
44  import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
45  
46  /**
47   * A MessageContainer for transient topics One of these exists for every active Connection consuming transient Topic
48   * messages
49   *
50   * @version $Revision: 1.1.1.1 $
51   */
52  public class TransientTopicBoundedMessageContainer
53          implements
54              MessageContainer,
55              Service,
56              Runnable,
57              MessageContainerAdmin {
58      private SynchronizedBoolean started;
59      private TransientTopicBoundedMessageManager manager;
60      private BrokerClient client;
61      private MemoryBoundedQueue queue;
62      private Thread worker;
63      private CopyOnWriteArrayList subscriptions;
64      private DestinationMap accel;
65      private ConcurrentHashMap subMap;
66      private Log log;
67  
68      /**
69       * Construct this beast
70       *
71       * @param manager
72       * @param client
73       * @param queue
74       */
75      public TransientTopicBoundedMessageContainer(TransientTopicBoundedMessageManager manager, BrokerClient client,
76              MemoryBoundedQueue queue) {
77          this.manager = manager;
78          this.client = client;
79          this.queue = queue;
80          this.started = new SynchronizedBoolean(false);
81          this.subscriptions = new CopyOnWriteArrayList();
82          this.accel = new DestinationMap();
83          this.subMap = new ConcurrentHashMap(100,0.25f);
84          this.log = LogFactory.getLog("TransientTopicBoundedMessageContainer:- " + client);
85      }
86  
87      /**
88       * @return true if this Container has no active subscriptions
89       */
90      public boolean isInactive() {
91          return subscriptions.isEmpty();
92      }
93  
94      /**
95       * @return the BrokerClient this Container is dispatching to
96       */
97      public BrokerClient getBrokerClient() {
98          return client;
99      }
100 
101     /**
102      * Add a consumer to dispatch messages to
103      *
104      * @param filter
105      * @param info
106      */
107     public TransientTopicSubscription addConsumer(Filter filter, ConsumerInfo info) {
108         TransientTopicSubscription ts = findMatch(info);
109         if (ts == null) {
110             ts = new TransientTopicSubscription(filter, info, client);
111             subscriptions.add(ts);
112             accel.put(info.getDestination(),ts);
113             subMap.put(info,ts);
114         }
115         return ts;
116     }
117 
118     /**
119      * Remove a consumer
120      *
121      * @param info
122      */
123     public void removeConsumer(ConsumerInfo info) {
124         TransientTopicSubscription ts = findMatch(info);
125         if (ts != null) {
126             subscriptions.remove(ts);
127             accel.remove(info.getDestination(),ts);
128             subMap.remove(info);
129         }
130     }
131 
132     /**
133      * start working
134      */
135     public void start() {
136         if (started.commit(false, true)) {
137             if (manager.isDecoupledDispatch()) {
138                 worker = new Thread(this, "TransientTopicDispatcher");
139                 worker.setPriority(Thread.NORM_PRIORITY + 2);
140                 worker.start();
141             }
142         }
143     }
144 
145     /**
146      * See if this container should get this message and dispatch it
147      *
148      * @param sender the BrokerClient the message came from
149      * @param message
150      * @return true if it is a valid container
151      * @throws JMSException
152      */
153     public boolean targetAndDispatch(BrokerClient sender, ActiveMQMessage message) throws JMSException {
154         boolean result = false;
155         if (!this.client.isClusteredConnection() || !sender.isClusteredConnection()) {
156             List tmpList = null;
157 
158             Set set = accel.get(message.getJMSActiveMQDestination());
159             if (!set.isEmpty()) {
160                 for (Iterator i = set.iterator(); i.hasNext();) {
161                     TransientTopicSubscription ts = (TransientTopicSubscription) i.next();
162                     if (ts.isTarget(message)) {
163                         if (tmpList == null) {
164                             tmpList = new ArrayList();
165                         }
166                         tmpList.add(ts);
167                     }
168                 }
169             }
170             dispatchToQueue(message, tmpList);
171             result = tmpList != null;
172         }
173         return result;
174     }
175 
176     /**
177      * stop working
178      */
179     public void stop() {
180         started.set(false);
181         queue.clear();
182     }
183 
184     /**
185      * close down this container
186      */
187     public void close() {
188         if (started.get()) {
189             stop();
190         }
191         queue.close();
192     }
193 
194 
195     /**
196      * do some dispatching
197      */
198     public void run() {
199         int count = 0;
200         ActiveMQMessage message = null;
201         while (started.get()) {
202             try {
203                 message = (ActiveMQMessage) queue.dequeue(2000);
204                 if (message != null) {
205                     if (!message.isExpired()) {
206                         client.dispatch(message);
207                         if (++count == 250) {
208                             count = 0;
209                             Thread.yield();
210                         }
211                     }else {
212                         if (log.isDebugEnabled()){
213                             log.debug("Message: " + message + " has expired");
214                         }
215                     }
216                 }
217             }
218             catch (Exception e) {
219                 stop();
220                 log.warn("stop dispatching", e);
221             }
222         }
223     }
224 
225     private void dispatchToQueue(ActiveMQMessage message, List list) throws JMSException {
226         if (list != null && !list.isEmpty()) {
227             int[] ids = new int[list.size()];
228             for (int i = 0;i < list.size();i++) {
229                 TransientTopicSubscription ts = (TransientTopicSubscription) list.get(i);
230                 ids[i] = ts.getConsumerInfo().getConsumerNo();
231             }
232             message = message.shallowCopy();
233             message.setConsumerNos(ids);
234             if (manager.isDecoupledDispatch()) {
235                 queue.enqueue(message);
236             }
237             else {
238                 client.dispatch(message);
239             }
240         }
241     }
242 
243     private TransientTopicSubscription findMatch(ConsumerInfo info) {
244         return (TransientTopicSubscription) subMap.get(info);
245     }
246 
247     /**
248      * @param destination
249      * @return true if a
250      */
251     public boolean hasConsumerFor(ActiveMQDestination destination) {
252         for (Iterator i = subscriptions.iterator();i.hasNext();) {
253             TransientTopicSubscription ts = (TransientTopicSubscription) i.next();
254             ConsumerInfo info = ts.getConsumerInfo();
255             if (info.getDestination().matches(destination)) {
256                 return true;
257             }
258         }
259         return false;
260     }
261 
262     /**
263      * @return the destination name
264      */
265     public String getDestinationName() {
266         return "";
267     }
268 
269     /**
270      * @param msg
271      * @return @throws JMSException
272      */
273     public void addMessage(ActiveMQMessage msg) throws JMSException {
274     }
275 
276     /**
277      * @param messageIdentity
278      * @param ack
279      * @throws JMSException
280      */
281     public void delete(MessageIdentity messageIdentity, MessageAck ack) throws JMSException {
282     }
283 
284     /**
285      * @param messageIdentity
286      * @return @throws JMSException
287      */
288     public ActiveMQMessage getMessage(MessageIdentity messageIdentity) throws JMSException {
289         return null;
290     }
291 
292     /**
293      * @param messageIdentity
294      * @throws JMSException
295      */
296     public void registerMessageInterest(MessageIdentity messageIdentity) throws JMSException {
297     }
298 
299     /**
300      * @param messageIdentity
301      * @param ack
302      * @throws JMSException
303      */
304     public void unregisterMessageInterest(MessageIdentity messageIdentity) throws JMSException {
305     }
306 
307     /**
308      * @param messageIdentity
309      * @return @throws JMSException
310      */
311     public boolean containsMessage(MessageIdentity messageIdentity) throws JMSException {
312         return false;
313     }
314 
315     /**
316      * @see org.activemq.service.MessageContainer#getMessageContainerAdmin()
317      */
318     public MessageContainerAdmin getMessageContainerAdmin() {
319         return this;
320     }
321 
322     /**
323      * @see org.activemq.service.MessageContainerAdmin#empty()
324      */
325     public void empty() throws JMSException {
326         // TODO implement me
327     }
328 
329     /**
330      * @see org.activemq.service.MessageContainer#isDeadLetterQueue()
331      */
332     public boolean isDeadLetterQueue() {
333         return false;
334     }
335 }