Source code: org/activemq/service/boundedvm/TransientTopicBoundedMessageContainer.java
1 /**
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18
19 package org.activemq.service.boundedvm;
20 import java.util.ArrayList;
21 import java.util.Iterator;
22 import java.util.List;
23 import java.util.Set;
24
25 import javax.jms.JMSException;
26
27 import org.apache.commons.logging.Log;
28 import org.apache.commons.logging.LogFactory;
29 import org.activemq.broker.BrokerClient;
30 import org.activemq.filter.DestinationMap;
31 import org.activemq.filter.Filter;
32 import org.activemq.io.util.MemoryBoundedQueue;
33 import org.activemq.message.ActiveMQDestination;
34 import org.activemq.message.ActiveMQMessage;
35 import org.activemq.message.ConsumerInfo;
36 import org.activemq.message.MessageAck;
37 import org.activemq.service.MessageContainer;
38 import org.activemq.service.MessageContainerAdmin;
39 import org.activemq.service.MessageIdentity;
40 import org.activemq.service.Service;
41
42 import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
43 import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArrayList;
44 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
45
46 /**
47 * A MessageContainer for transient topics One of these exists for every active Connection consuming transient Topic
48 * messages
49 *
50 * @version $Revision: 1.1.1.1 $
51 */
52 public class TransientTopicBoundedMessageContainer
53 implements
54 MessageContainer,
55 Service,
56 Runnable,
57 MessageContainerAdmin {
58 private SynchronizedBoolean started;
59 private TransientTopicBoundedMessageManager manager;
60 private BrokerClient client;
61 private MemoryBoundedQueue queue;
62 private Thread worker;
63 private CopyOnWriteArrayList subscriptions;
64 private DestinationMap accel;
65 private ConcurrentHashMap subMap;
66 private Log log;
67
68 /**
69 * Construct this beast
70 *
71 * @param manager
72 * @param client
73 * @param queue
74 */
75 public TransientTopicBoundedMessageContainer(TransientTopicBoundedMessageManager manager, BrokerClient client,
76 MemoryBoundedQueue queue) {
77 this.manager = manager;
78 this.client = client;
79 this.queue = queue;
80 this.started = new SynchronizedBoolean(false);
81 this.subscriptions = new CopyOnWriteArrayList();
82 this.accel = new DestinationMap();
83 this.subMap = new ConcurrentHashMap(100,0.25f);
84 this.log = LogFactory.getLog("TransientTopicBoundedMessageContainer:- " + client);
85 }
86
87 /**
88 * @return true if this Container has no active subscriptions
89 */
90 public boolean isInactive() {
91 return subscriptions.isEmpty();
92 }
93
94 /**
95 * @return the BrokerClient this Container is dispatching to
96 */
97 public BrokerClient getBrokerClient() {
98 return client;
99 }
100
101 /**
102 * Add a consumer to dispatch messages to
103 *
104 * @param filter
105 * @param info
106 */
107 public TransientTopicSubscription addConsumer(Filter filter, ConsumerInfo info) {
108 TransientTopicSubscription ts = findMatch(info);
109 if (ts == null) {
110 ts = new TransientTopicSubscription(filter, info, client);
111 subscriptions.add(ts);
112 accel.put(info.getDestination(),ts);
113 subMap.put(info,ts);
114 }
115 return ts;
116 }
117
118 /**
119 * Remove a consumer
120 *
121 * @param info
122 */
123 public void removeConsumer(ConsumerInfo info) {
124 TransientTopicSubscription ts = findMatch(info);
125 if (ts != null) {
126 subscriptions.remove(ts);
127 accel.remove(info.getDestination(),ts);
128 subMap.remove(info);
129 }
130 }
131
132 /**
133 * start working
134 */
135 public void start() {
136 if (started.commit(false, true)) {
137 if (manager.isDecoupledDispatch()) {
138 worker = new Thread(this, "TransientTopicDispatcher");
139 worker.setPriority(Thread.NORM_PRIORITY + 2);
140 worker.start();
141 }
142 }
143 }
144
145 /**
146 * See if this container should get this message and dispatch it
147 *
148 * @param sender the BrokerClient the message came from
149 * @param message
150 * @return true if it is a valid container
151 * @throws JMSException
152 */
153 public boolean targetAndDispatch(BrokerClient sender, ActiveMQMessage message) throws JMSException {
154 boolean result = false;
155 if (!this.client.isClusteredConnection() || !sender.isClusteredConnection()) {
156 List tmpList = null;
157
158 Set set = accel.get(message.getJMSActiveMQDestination());
159 if (!set.isEmpty()) {
160 for (Iterator i = set.iterator(); i.hasNext();) {
161 TransientTopicSubscription ts = (TransientTopicSubscription) i.next();
162 if (ts.isTarget(message)) {
163 if (tmpList == null) {
164 tmpList = new ArrayList();
165 }
166 tmpList.add(ts);
167 }
168 }
169 }
170 dispatchToQueue(message, tmpList);
171 result = tmpList != null;
172 }
173 return result;
174 }
175
176 /**
177 * stop working
178 */
179 public void stop() {
180 started.set(false);
181 queue.clear();
182 }
183
184 /**
185 * close down this container
186 */
187 public void close() {
188 if (started.get()) {
189 stop();
190 }
191 queue.close();
192 }
193
194
195 /**
196 * do some dispatching
197 */
198 public void run() {
199 int count = 0;
200 ActiveMQMessage message = null;
201 while (started.get()) {
202 try {
203 message = (ActiveMQMessage) queue.dequeue(2000);
204 if (message != null) {
205 if (!message.isExpired()) {
206 client.dispatch(message);
207 if (++count == 250) {
208 count = 0;
209 Thread.yield();
210 }
211 }else {
212 if (log.isDebugEnabled()){
213 log.debug("Message: " + message + " has expired");
214 }
215 }
216 }
217 }
218 catch (Exception e) {
219 stop();
220 log.warn("stop dispatching", e);
221 }
222 }
223 }
224
225 private void dispatchToQueue(ActiveMQMessage message, List list) throws JMSException {
226 if (list != null && !list.isEmpty()) {
227 int[] ids = new int[list.size()];
228 for (int i = 0;i < list.size();i++) {
229 TransientTopicSubscription ts = (TransientTopicSubscription) list.get(i);
230 ids[i] = ts.getConsumerInfo().getConsumerNo();
231 }
232 message = message.shallowCopy();
233 message.setConsumerNos(ids);
234 if (manager.isDecoupledDispatch()) {
235 queue.enqueue(message);
236 }
237 else {
238 client.dispatch(message);
239 }
240 }
241 }
242
243 private TransientTopicSubscription findMatch(ConsumerInfo info) {
244 return (TransientTopicSubscription) subMap.get(info);
245 }
246
247 /**
248 * @param destination
249 * @return true if a
250 */
251 public boolean hasConsumerFor(ActiveMQDestination destination) {
252 for (Iterator i = subscriptions.iterator();i.hasNext();) {
253 TransientTopicSubscription ts = (TransientTopicSubscription) i.next();
254 ConsumerInfo info = ts.getConsumerInfo();
255 if (info.getDestination().matches(destination)) {
256 return true;
257 }
258 }
259 return false;
260 }
261
262 /**
263 * @return the destination name
264 */
265 public String getDestinationName() {
266 return "";
267 }
268
269 /**
270 * @param msg
271 * @return @throws JMSException
272 */
273 public void addMessage(ActiveMQMessage msg) throws JMSException {
274 }
275
276 /**
277 * @param messageIdentity
278 * @param ack
279 * @throws JMSException
280 */
281 public void delete(MessageIdentity messageIdentity, MessageAck ack) throws JMSException {
282 }
283
284 /**
285 * @param messageIdentity
286 * @return @throws JMSException
287 */
288 public ActiveMQMessage getMessage(MessageIdentity messageIdentity) throws JMSException {
289 return null;
290 }
291
292 /**
293 * @param messageIdentity
294 * @throws JMSException
295 */
296 public void registerMessageInterest(MessageIdentity messageIdentity) throws JMSException {
297 }
298
299 /**
300 * @param messageIdentity
301 * @param ack
302 * @throws JMSException
303 */
304 public void unregisterMessageInterest(MessageIdentity messageIdentity) throws JMSException {
305 }
306
307 /**
308 * @param messageIdentity
309 * @return @throws JMSException
310 */
311 public boolean containsMessage(MessageIdentity messageIdentity) throws JMSException {
312 return false;
313 }
314
315 /**
316 * @see org.activemq.service.MessageContainer#getMessageContainerAdmin()
317 */
318 public MessageContainerAdmin getMessageContainerAdmin() {
319 return this;
320 }
321
322 /**
323 * @see org.activemq.service.MessageContainerAdmin#empty()
324 */
325 public void empty() throws JMSException {
326 // TODO implement me
327 }
328
329 /**
330 * @see org.activemq.service.MessageContainer#isDeadLetterQueue()
331 */
332 public boolean isDeadLetterQueue() {
333 return false;
334 }
335 }