Source code: org/activemq/ActiveMQQueueBrowser.java
1 /**
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18
19 package org.activemq;
20
21 import java.util.Enumeration;
22
23 import javax.jms.IllegalStateException;
24 import javax.jms.JMSException;
25 import javax.jms.Queue;
26 import javax.jms.QueueBrowser;
27
28 import org.activemq.message.ActiveMQQueue;
29
30 /**
31 * A client uses a <CODE>QueueBrowser</CODE> object to look at messages on a
32 * queue without removing them.
33 * <p/>
34 * <P>
35 * The <CODE>getEnumeration</CODE> method returns a <CODE>
36 * java.util.Enumeration</CODE> that is used to scan the queue's messages. It
37 * may be an enumeration of the entire content of a queue, or it may contain
38 * only the messages matching a message selector.
39 * <p/>
40 * <P>
41 * Messages may be arriving and expiring while the scan is done. The JMS API
42 * does not require the content of an enumeration to be a static snapshot of
43 * queue content. Whether these changes are visible or not depends on the JMS
44 * provider.
45 * <p/>
46 * <P>
47 * A <CODE>QueueBrowser</CODE> can be created from either a <CODE>Session
48 * </CODE> or a <CODE>QueueSession</CODE>.
49 *
50 * @see javax.jms.Session#createBrowser
51 * @see javax.jms.QueueSession#createBrowser
52 * @see javax.jms.QueueBrowser
53 * @see javax.jms.QueueReceiver
54 */
55
56 public class ActiveMQQueueBrowser implements
57 QueueBrowser, Enumeration {
58
59 private final ActiveMQSession session;
60 private final ActiveMQQueue destination;
61 private final String selector;
62 private final int cnum;
63
64 private ActiveMQMessageConsumer consumer;
65 private boolean closed;
66
67 /**
68 * Constructor for an ActiveMQQueueBrowser - used internally
69 *
70 * @param theSession
71 * @param dest
72 * @param selector
73 * @param cnum
74 * @throws JMSException
75 */
76 protected ActiveMQQueueBrowser(ActiveMQSession session, ActiveMQQueue destination, String selector, int cnum) throws JMSException {
77 this.session = session;
78 this.destination = destination;
79 this.selector = selector;
80 this.cnum = cnum;
81 consumer = createConsumer();
82 }
83
84 /**
85 * @param session
86 * @param destination
87 * @param selector
88 * @param cnum
89 * @return
90 * @throws JMSException
91 */
92 private ActiveMQMessageConsumer createConsumer() throws JMSException {
93 return new ActiveMQMessageConsumer(session, destination, "", selector, cnum, session.connection.getPrefetchPolicy().getQueueBrowserPrefetch(), false, true);
94 }
95
96 private void destroyConsumer() {
97 if( consumer == null )
98 return;
99
100 try {
101 consumer.close();
102 consumer=null;
103 } catch (JMSException e) {
104 e.printStackTrace();
105 }
106 }
107
108 /**
109 * Gets an enumeration for browsing the current queue messages in the order
110 * they would be received.
111 *
112 * @return an enumeration for browsing the messages
113 * @throws JMSException if the JMS provider fails to get the enumeration for this
114 * browser due to some internal error.
115 */
116
117 public Enumeration getEnumeration() throws JMSException {
118 checkClosed();
119
120 if( consumer==null )
121 consumer = createConsumer();
122
123 //ok - started browsing - wait for inbound messages
124 if (consumer.messageQueue.size() == 0) {
125 try {
126 Thread.sleep(1000);
127 }
128 catch (InterruptedException e) {
129 }
130 }
131 return this;
132 }
133
134 private void checkClosed() throws IllegalStateException {
135 if (closed) {
136 throw new IllegalStateException("The Consumer is closed");
137 }
138 }
139
140 /**
141 * @return true if more messages to process
142 */
143 public boolean hasMoreElements() {
144 if( consumer==null )
145 return false;
146
147 boolean rc = consumer.messageQueue.size() > 0;
148 if( !rc ) {
149 destroyConsumer();
150 }
151 return rc;
152 }
153
154
155 /**
156 * @return the next message
157 */
158 public Object nextElement() {
159 if( consumer == null )
160 return null;
161
162 Object answer = null;
163 try {
164 answer = consumer.receiveNoWait();
165 if( answer==null ) {
166 destroyConsumer();
167 }
168 }
169 catch (JMSException e) {
170 e.printStackTrace();
171 }
172 return answer;
173 }
174
175 public void close() throws JMSException {
176 destroyConsumer();
177 closed=true;
178 }
179
180 /**
181 * Gets the queue associated with this queue browser.
182 *
183 * @return the queue
184 * @throws JMSException if the JMS provider fails to get the queue associated
185 * with this browser due to some internal error.
186 */
187
188 public Queue getQueue() throws JMSException {
189 return destination;
190 }
191
192
193 public String getMessageSelector() throws JMSException {
194 return selector;
195 }
196
197
198 }