Source code: org/activemq/advisories/TempDestinationAdvisor.java
1 /**
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18
19 package org.activemq.advisories;
20 import java.util.Iterator;
21 import java.util.List;
22 import java.util.Set;
23 import javax.jms.Connection;
24 import javax.jms.Destination;
25 import javax.jms.JMSException;
26 import javax.jms.Message;
27 import javax.jms.MessageConsumer;
28 import javax.jms.MessageListener;
29 import javax.jms.ObjectMessage;
30 import javax.jms.Session;
31 import org.apache.commons.logging.Log;
32 import org.apache.commons.logging.LogFactory;
33 import org.activemq.ActiveMQConnection;
34 import org.activemq.ActiveMQSession;
35 import org.activemq.message.ActiveMQDestination;
36 import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArrayList;
37 import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArraySet;
38 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
39
40 /**
41 * A helper class for listening for TempDestination advisories
42 *
43 * @version $Revision: 1.1.1.1 $
44 */
45 public class TempDestinationAdvisor implements MessageListener {
46 private static final Log log = LogFactory.getLog(TempDestinationAdvisor.class);
47 private Connection connection;
48 private ActiveMQDestination destination;
49 private Session session;
50 private List listeners = new CopyOnWriteArrayList();
51 private Set activeDestinations = new CopyOnWriteArraySet();
52 private SynchronizedBoolean started = new SynchronizedBoolean(false);
53 private long startedAt;
54
55 /**
56 * Construct a TempDestinationAdvisor
57 *
58 * @param connection
59 * @param destination the destination to listen for TempDestination events
60 * @throws JMSException
61 */
62 public TempDestinationAdvisor(Connection connection, Destination destination) throws JMSException {
63 this.connection = connection;
64 this.destination = ActiveMQDestination.transformDestination(destination);
65 }
66
67 /**
68 * start listening for advisories
69 *
70 * @throws JMSException
71 */
72 public void start() throws JMSException {
73 if (started.commit(false, true)) {
74 if (connection instanceof ActiveMQConnection) {
75 session = ((ActiveMQConnection) connection).createSession(false, Session.AUTO_ACKNOWLEDGE, true);
76 ((ActiveMQSession) session).setInternalSession(true);
77 }
78 else {
79 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
80 }
81 MessageConsumer consumer = session.createConsumer(destination.getTopicForTempAdvisory());
82 consumer.setMessageListener(this);
83 startedAt = System.currentTimeMillis();
84 }
85 }
86
87 /**
88 * stop listening for advisories
89 *
90 * @throws JMSException
91 */
92 public void stop() throws JMSException {
93 if (started.commit(true, false)) {
94 if (session != null) {
95 session.close();
96 }
97 }
98 }
99
100 /**
101 * returns true if the temporary destination is active
102 *
103 * @param destination
104 * @return true if a subscriber for the destination
105 */
106 public boolean isActive(Destination destination) {
107 boolean rtnval = false;
108 synchronized(this)
109 {
110 rtnval = activeDestinations.contains(destination);
111 if (rtnval == false && startedAt > 0)
112 {
113 // wait a while to see if the advisory event arrives (no longer than 5 seconds)
114 long waittime = 5000 - (System.currentTimeMillis() - startedAt);
115 startedAt = 0;
116 try {
117 wait(waittime);
118 } catch (Exception e) {}
119 rtnval = activeDestinations.contains(destination);
120 }
121 }
122 return rtnval;
123 }
124
125 /**
126 * Add a listener
127 *
128 * @param l
129 */
130 public void addListener(TempDestinationAdvisoryEventListener l) {
131 listeners.add(l);
132 }
133
134 /**
135 * Remove a listener
136 *
137 * @param l
138 */
139 public void removeListener(TempDestinationAdvisoryEventListener l) {
140 listeners.remove(l);
141 }
142
143 /**
144 * OnMessage() implementation
145 *
146 * @param msg
147 */
148 public void onMessage(Message msg) {
149 if (msg instanceof ObjectMessage) {
150 try {
151 TempDestinationAdvisoryEvent event = (TempDestinationAdvisoryEvent) ((ObjectMessage) msg).getObject();
152 if (event.isStarted()) {
153 activeDestinations.add(event.getDestination());
154 synchronized (this) {
155 notifyAll();
156 }
157 }
158 else {
159 activeDestinations.remove(event.getDestination());
160 }
161 fireEvent(event);
162 }
163 catch (JMSException e) {
164 log.error("Failed to process message: " + msg);
165 }
166 }
167 }
168
169 private void fireEvent(TempDestinationAdvisoryEvent event) {
170 for (Iterator i = listeners.iterator();i.hasNext();) {
171 TempDestinationAdvisoryEventListener l = (TempDestinationAdvisoryEventListener) i.next();
172 l.onEvent(event);
173 }
174 }
175 }