Source code: org/mom4j/jms/MessageConsumerImpl.java
1 package org.mom4j.jms;
2
3 import javax.jms.Destination;
4 import javax.jms.JMSException;
5 import javax.jms.Message;
6 import javax.jms.MessageConsumer;
7 import javax.jms.MessageListener;
8 import javax.jms.Queue;
9 import javax.jms.QueueReceiver;
10 import javax.jms.Topic;
11 import javax.jms.TopicSubscriber;
12
13 /**
14 *
15 */
16 public class MessageConsumerImpl implements MessageConsumer, QueueReceiver, TopicSubscriber {
17
18 private static long consumerCount = 0;
19
20 private DestinationImpl destination;
21 private String messageSelector;
22 private SessionImpl session;
23 private ReceiverWorker worker;
24 private MessageListener listener;
25 private String consumerId;
26 private long pollSync;
27 private long pollAsync;
28 private boolean durable;
29 private boolean noLocal;
30
31
32 MessageConsumerImpl(DestinationImpl dest,
33 String messageSelector,
34 SessionImpl s)
35 {
36 this.destination = dest;
37 this.messageSelector = messageSelector;
38 this.session = s;
39 this.worker = null;
40 this.listener = null;
41 this.consumerId = s.getSessionId() + "@" + (consumerCount++);
42 this.pollSync = s.getPollSync();
43 this.pollAsync = s.getPollAsync();
44 this.durable = false;
45 this.noLocal = false;
46 }
47
48
49 void setConsumerId(String cid) {
50 this.consumerId = cid;
51 }
52
53
54 public void setMessageListener(MessageListener listener)
55 throws JMSException
56 {
57 if(listener != null && this.listener != null) {
58 throw new javax.jms.IllegalStateException("listener already set");
59 } else if(listener == null && this.listener != null) {
60 this.worker.stopListener();
61 this.worker = null;
62 this.listener = null;
63 } else if(listener != null && this.listener == null) {
64 this.listener = listener;
65 this.worker = new ReceiverWorker(this.listener, this, this.pollAsync);
66 this.worker.start();
67 } else {
68 throw new javax.jms.IllegalStateException("no listener set");
69 }
70 }
71
72
73 public Message receive()
74 throws JMSException
75 {
76 if(this.listener != null) {
77 throw new javax.jms.IllegalStateException("MessageListener is set");
78 }
79 Message msg = null;
80 while(msg == null) {
81 msg = this.session.receive(this.destination.getName(), this.consumerId);
82 if(msg == null) {
83 try {
84 Thread.sleep(this.pollSync);
85 } catch(InterruptedException ex) {
86 break;
87 }
88 }
89 }
90 return msg;
91 }
92
93
94 public Message receive(long timeout)
95 throws JMSException
96 {
97 if(this.listener != null) {
98 throw new javax.jms.IllegalStateException("MessageListener is set");
99 }
100 long tt = System.currentTimeMillis() + timeout;
101 Message msg = null;
102 while(msg == null) {
103 msg = this.session.receive(this.destination.getName(), this.consumerId);
104 if(msg == null) {
105 if(System.currentTimeMillis() > tt) {
106 break;
107 }
108 try {
109 Thread.sleep(this.pollSync);
110 } catch(InterruptedException ex) {
111 break;
112 }
113 }
114 }
115 return msg;
116 }
117
118
119 public Message receiveNoWait()
120 throws JMSException
121 {
122 if(this.listener != null) {
123 throw new javax.jms.IllegalStateException("MessageListener is set");
124 }
125 return this.session.receive(this.destination.getName(), this.consumerId);
126 }
127
128
129 public void close()
130 throws JMSException
131 {
132 if(this.worker != null) {
133 this.worker.stopListener();
134 this.worker = null;
135 this.listener = null;
136 }
137 this.session.closeConsumer(this);
138 }
139
140
141 public void setDurable(boolean dur) {
142 this.durable = dur;
143 }
144
145
146 public boolean getDurable() {
147 return this.durable;
148 }
149
150
151 public void setNoLocal(boolean noLocal) {
152 this.noLocal = noLocal;
153 }
154
155
156 public boolean getNoLocal() {
157 return this.noLocal;
158 }
159
160
161 public MessageListener getMessageListener()
162 throws JMSException
163 {
164 return this.listener;
165 }
166
167
168 public String getMessageSelector()
169 throws JMSException
170 {
171 return this.messageSelector;
172 }
173
174
175 public Destination getDestination()
176 throws JMSException
177 {
178 return this.destination;
179 }
180
181
182 final String getConsumerId() {
183 return this.consumerId;
184 }
185
186
187 final Message receiveInternal() {
188 Message msg = null;
189 try {
190 msg = this.session.receive(this.destination.getName(), this.consumerId);
191 } catch(JMSException ex) {
192 ex.printStackTrace();
193 }
194 return msg;
195 }
196
197
198 //
199 // QueueReceiver specific methods
200 //
201
202 public Queue getQueue()
203 throws javax.jms.JMSException
204 {
205 return (Queue)this.destination;
206 }
207
208
209 //
210 // TopicSubscriber specific methods
211 //
212
213 public Topic getTopic()
214 throws JMSException
215 {
216 return (Topic)this.destination;
217 }
218
219 }