Source code: org/mom4j/jms/SessionImpl.java
1
2 package org.mom4j.jms;
3
4 import javax.jms.BytesMessage;
5 import javax.jms.Destination;
6 import javax.jms.JMSException;
7 import javax.jms.Message;
8 import javax.jms.MapMessage;
9 import javax.jms.MessageConsumer;
10 import javax.jms.MessageListener;
11 import javax.jms.MessageProducer;
12 import javax.jms.ObjectMessage;
13 import javax.jms.Queue;
14 import javax.jms.QueueBrowser;
15 import javax.jms.QueueReceiver;
16 import javax.jms.QueueSender;
17 import javax.jms.QueueSession;
18 import javax.jms.Session;
19 import javax.jms.StreamMessage;
20 import javax.jms.TemporaryQueue;
21 import javax.jms.TemporaryTopic;
22 import javax.jms.TextMessage;
23 import javax.jms.Topic;
24 import javax.jms.TopicPublisher;
25 import javax.jms.TopicSession;
26 import javax.jms.TopicSubscriber;
27
28 import java.util.ArrayList;
29 import java.util.Map;
30 import java.util.WeakHashMap;
31
32
33 public class SessionImpl implements Session, QueueSession, TopicSession {
34
35 protected boolean isTransacted;
36 protected int ackMode;
37 protected ConnectionImpl connection;
38 protected String sessionId;
39
40 private ArrayList consumers;
41 private ArrayList producers;
42 private WeakHashMap redelivered;
43 private long tmpCount;
44
45
46 /** Creates new SessionImpl */
47 public SessionImpl(String sessionId, boolean tx, int ackMode, ConnectionImpl c) {
48 if(ackMode != Session.AUTO_ACKNOWLEDGE &&
49 ackMode != Session.CLIENT_ACKNOWLEDGE &&
50 ackMode != Session.DUPS_OK_ACKNOWLEDGE)
51 {
52 throw new IllegalArgumentException("acknowledge-mode not known");
53 }
54 this.isTransacted = tx;
55 this.ackMode = ackMode;
56 this.connection = c;
57 this.sessionId = sessionId;
58 this.consumers = new ArrayList();
59 this.producers = new ArrayList();
60 this.redelivered = new WeakHashMap();
61 this.tmpCount = 0;
62 }
63
64
65 public String getSessionId() {
66 return this.sessionId;
67 }
68
69
70 public void recover()
71 throws javax.jms.JMSException
72 {
73 if(this.isTransacted) {
74 throw new javax.jms.IllegalStateException("session is transacted");
75 }
76 this.connection.rollback(this);
77 }
78
79
80 public QueueBrowser createBrowser(Queue queue)
81 throws JMSException
82 {
83 throw new FeatureNotSupportedException();
84 }
85
86
87 public QueueBrowser createBrowser(Queue queue, String messageSelector)
88 throws JMSException
89 {
90 throw new FeatureNotSupportedException();
91 }
92
93
94 public BytesMessage createBytesMessage()
95 throws javax.jms.JMSException
96 {
97 return null;
98 }
99
100
101 public MessageConsumer createConsumer(Destination destination)
102 throws JMSException
103 {
104 return this.createConsumer(destination, null);
105 }
106
107
108 public MessageConsumer createConsumer(Destination destination,
109 String messageSelector)
110 throws JMSException
111 {
112 return this.createConsumer(destination, messageSelector, false);
113 }
114
115
116 public MessageConsumer createConsumer(Destination destination,
117 String messageSelector,
118 boolean noLocal)
119 throws JMSException
120 {
121 return this.createConsumerInternal(destination, messageSelector, noLocal);
122 }
123
124
125 private MessageConsumerImpl createConsumerInternal(Destination destination,
126 String messageSelector,
127 boolean noLocal)
128 throws JMSException
129 {
130 if(destination == null)
131 throw new java.lang.IllegalArgumentException("destination may not be null!");
132
133 MessageConsumerImpl mc =
134 new MessageConsumerImpl((DestinationImpl)destination, messageSelector, this);
135 mc.setNoLocal(noLocal);
136 this.register((DestinationImpl)destination, mc.getConsumerId(), messageSelector);
137 this.consumers.add(mc);
138 return mc;
139 }
140
141
142 public QueueReceiver createReceiver(Queue queue, String messageSelector)
143 throws javax.jms.JMSException
144 {
145 return this.createConsumerInternal((Destination)queue, messageSelector, false);
146 }
147
148
149 public QueueReceiver createReceiver(Queue queue)
150 throws javax.jms.JMSException
151 {
152 return this.createReceiver(queue, null);
153 }
154
155
156 public TopicSubscriber createSubscriber(Topic topic)
157 throws javax.jms.JMSException
158 {
159 return this.createSubscriber(topic, null, false);
160 }
161
162
163 public TopicSubscriber createSubscriber(Topic topic,
164 String selector,
165 boolean noLocal)
166 throws javax.jms.JMSException
167 {
168 return this.createConsumerInternal(topic, selector, noLocal);
169 }
170
171
172 public TopicSubscriber createDurableSubscriber(Topic topic, String name)
173 throws javax.jms.JMSException
174 {
175 return this.createDurableSubscriber(topic, name, null, false);
176 }
177
178
179 public TopicSubscriber createDurableSubscriber(Topic topic,
180 String name,
181 String selector,
182 boolean noLocal)
183 throws javax.jms.JMSException
184 {
185 if(topic == null)
186 throw new java.lang.IllegalArgumentException("topic is null!");
187 if(name == null)
188 throw new java.lang.IllegalArgumentException("name is null!");
189
190 MessageConsumerImpl mc =
191 new MessageConsumerImpl((DestinationImpl)topic,
192 selector,
193 this);
194 mc.setNoLocal(noLocal);
195 mc.setDurable(true);
196 String cid = this.registerDur((DestinationImpl)topic,
197 name,
198 mc.getConsumerId(),
199 selector);
200 mc.setConsumerId(cid);
201 return mc;
202 }
203
204
205 public MapMessage createMapMessage()
206 throws javax.jms.JMSException
207 {
208 return null;
209 }
210
211
212 public MessageProducer createProducer(Destination destination)
213 throws JMSException
214 {
215 return this.createProducerInternal(destination);
216 }
217
218
219 private MessageProducerImpl createProducerInternal(Destination destination)
220 throws JMSException
221 {
222 if(destination == null) {
223 throw new java.lang.IllegalArgumentException("destination may not be null!");
224 }
225 MessageProducerImpl mp = new MessageProducerImpl((DestinationImpl)destination, this);
226 this.producers.add(mp);
227 return mp;
228 }
229
230
231 public QueueSender createSender(Queue queue)
232 throws javax.jms.JMSException
233 {
234 return this.createProducerInternal((Destination)queue);
235 }
236
237
238 public TopicPublisher createPublisher(Topic topic)
239 throws javax.jms.JMSException
240 {
241 return this.createProducerInternal((Destination)topic);
242 }
243
244
245 public Queue createQueue(java.lang.String queueName)
246 throws JMSException
247 {
248 if(queueName == null) {
249 throw new javax.jms.JMSException("name is null!");
250 }
251 this.createDestination(this, true, queueName);
252 return new QueueImpl(queueName);
253 }
254
255
256 public TemporaryTopic createTemporaryTopic()
257 throws javax.jms.JMSException
258 {
259 String tn = "tmpt" + (++this.tmpCount) + this.connection.getClientID();
260 this.createDestination(this, false, tn);
261 return new TempTopicImpl(tn);
262 }
263
264
265 public TemporaryQueue createTemporaryQueue()
266 throws javax.jms.JMSException
267 {
268 String qn = "tmpq" + (++this.tmpCount) + this.connection.getClientID();
269 this.createDestination(this, true, qn);
270 return new TempQueueImpl(qn);
271 }
272
273
274 public Topic createTopic(String name)
275 throws javax.jms.JMSException
276 {
277 if(name == null) {
278 throw new javax.jms.JMSException("name is null!");
279 }
280 this.createDestination(this, false, name);
281 return new TopicImpl(name);
282 }
283
284
285 public MessageListener getMessageListener()
286 throws javax.jms.JMSException
287 {
288 return null;
289 }
290
291
292 public void commit()
293 throws javax.jms.JMSException
294 {
295 if(!this.isTransacted) {
296 throw new IllegalStateException("session is not transacted");
297 }
298 this.doCommit();
299 }
300
301
302 private void doCommit()
303 throws javax.jms.JMSException
304 {
305 this.redelivered.clear();
306 this.connection.commit(this);
307 }
308
309
310 void acknowledge()
311 throws javax.jms.JMSException
312 {
313 if(!this.isTransacted && this.ackMode == Session.CLIENT_ACKNOWLEDGE) {
314 this.doCommit();
315 }
316 }
317
318
319 public void rollback()
320 throws javax.jms.JMSException
321 {
322 if(!this.isTransacted) {
323 throw new IllegalStateException("session is not transacted");
324 }
325 this.connection.rollback(this);
326 }
327
328
329 public ObjectMessage createObjectMessage(java.io.Serializable s)
330 throws javax.jms.JMSException
331 {
332 ObjectMessage om = new ObjectMessageImpl();
333 om.setObject(s);
334 return om;
335 }
336
337
338 public TextMessage createTextMessage()
339 throws javax.jms.JMSException
340 {
341 return new TextMessageImpl();
342 }
343
344
345 public ObjectMessage createObjectMessage()
346 throws javax.jms.JMSException
347 {
348 ObjectMessage om = new ObjectMessageImpl();
349 return om;
350 }
351
352
353 public void close()
354 throws javax.jms.JMSException
355 {
356 for(int i = 0; i < this.producers.size(); i++) {
357 ((MessageProducer)this.producers.get(i)).close();
358 }
359 this.producers.clear();
360 for(int i = 0; i < this.consumers.size(); i++) {
361 ((MessageConsumer)this.consumers.get(i)).close();
362 }
363 this.consumers.clear();
364 this.connection.closeSession(this);
365 }
366
367
368 void closeConsumer(MessageConsumerImpl consumer)
369 throws javax.jms.JMSException
370 {
371 this.consumers.remove(consumer);
372 if(!consumer.getDurable()) {
373 this.unregister((DestinationImpl)consumer.getDestination(),
374 consumer.getConsumerId());
375 }
376 }
377
378
379 void closeProducer(MessageProducer producer) {
380 this.producers.remove(producer);
381 }
382
383
384 public TextMessage createTextMessage(String text)
385 throws javax.jms.JMSException
386 {
387 return new TextMessageImpl(text);
388 }
389
390
391 public void setMessageListener(MessageListener listener)
392 throws javax.jms.JMSException
393 {
394 throw new javax.jms.IllegalStateException("not supported. use MessageConcumser.setMessageListener");
395 }
396
397
398 public StreamMessage createStreamMessage()
399 throws javax.jms.JMSException
400 {
401 return null;
402 }
403
404
405 public Message createMessage()
406 throws javax.jms.JMSException
407 {
408 return null;
409 }
410
411
412 public boolean getTransacted()
413 throws javax.jms.JMSException
414 {
415 return this.isTransacted;
416 }
417
418
419 public int getAcknowledgeMode()
420 throws javax.jms.JMSException
421 {
422 return this.ackMode;
423 }
424
425
426 public void run() {
427 }
428
429
430 public void unsubscribe(String name)
431 throws javax.jms.JMSException
432 {
433 this.unregisterDur(name);
434 }
435
436
437 long getPollSync() {
438 return this.connection.getPollSync();
439 }
440
441
442 long getPollAsync() {
443 return this.connection.getPollAsync();
444 }
445
446
447 void register(DestinationImpl dest, String consumerId, String selector)
448 throws JMSException
449 {
450 this.connection.register(dest, consumerId, selector);
451 }
452
453
454 String registerDur(DestinationImpl dest,
455 String name,
456 String consumerId,
457 String selector)
458 throws JMSException
459 {
460 return this.connection.registerDur(dest, name, consumerId, selector);
461 }
462
463
464 void unregister(DestinationImpl dest, String consumerId)
465 throws JMSException
466 {
467 this.connection.unregister(dest, consumerId);
468 }
469
470
471 void unregisterDur(String name)
472 throws JMSException
473 {
474 this.connection.unregisterDur(this.sessionId, name);
475 }
476
477
478 void send(MessageImpl msg, boolean disableMessageId)
479 throws javax.jms.JMSException
480 {
481 this.connection.send(this, msg, disableMessageId);
482 }
483
484
485 void createDestination(SessionImpl session, boolean queue, String name)
486 throws JMSException
487 {
488 this.connection.createDestination(session, queue, name);
489 }
490
491
492 javax.jms.Message receive(String destinationName, String consumerId)
493 throws javax.jms.JMSException
494 {
495 javax.jms.Message msg = this.connection.receive(this, destinationName, consumerId);
496 if(msg != null) {
497 Map map = (Map)this.redelivered.get(consumerId);
498 if(map == null) {
499 map = new WeakHashMap();
500 this.redelivered.put(consumerId, map);
501 }
502 String msgid = msg.getJMSMessageID();
503 if(map.get(msgid) != null) {
504 msg.setJMSRedelivered(true);
505 } else {
506 map.put(msgid, msgid);
507 }
508 ((MessageImpl)msg).setReceivingSession(this);
509 }
510 return msg;
511 }
512
513 }