Docjar: A Java Source and Docuemnt Enginecom.*    java.*    javax.*    org.*    all    new    plug-in

Quick Search    Search Deep

Source code: org/mom4j/jms/SessionImpl.java


1   
2   package org.mom4j.jms;
3   
4   import javax.jms.BytesMessage;
5   import javax.jms.Destination;
6   import javax.jms.JMSException;
7   import javax.jms.Message;
8   import javax.jms.MapMessage;
9   import javax.jms.MessageConsumer;
10  import javax.jms.MessageListener;
11  import javax.jms.MessageProducer;
12  import javax.jms.ObjectMessage;
13  import javax.jms.Queue;
14  import javax.jms.QueueBrowser;
15  import javax.jms.QueueReceiver;
16  import javax.jms.QueueSender;
17  import javax.jms.QueueSession;
18  import javax.jms.Session;
19  import javax.jms.StreamMessage;
20  import javax.jms.TemporaryQueue;
21  import javax.jms.TemporaryTopic;
22  import javax.jms.TextMessage;
23  import javax.jms.Topic;
24  import javax.jms.TopicPublisher;
25  import javax.jms.TopicSession;
26  import javax.jms.TopicSubscriber;
27  
28  import java.util.ArrayList;
29  import java.util.Map;
30  import java.util.WeakHashMap;
31  
32  
33  public class SessionImpl implements Session, QueueSession, TopicSession {
34      
35      protected boolean         isTransacted;
36      protected int             ackMode;
37      protected ConnectionImpl  connection;
38      protected String          sessionId;
39  
40      private ArrayList   consumers;
41      private ArrayList   producers;
42      private WeakHashMap redelivered;
43      private long        tmpCount;
44  
45  
46      /** Creates new SessionImpl */
47      public SessionImpl(String sessionId, boolean tx, int ackMode, ConnectionImpl c) {
48          if(ackMode != Session.AUTO_ACKNOWLEDGE &&
49             ackMode != Session.CLIENT_ACKNOWLEDGE &&
50             ackMode != Session.DUPS_OK_ACKNOWLEDGE)
51          {
52              throw new IllegalArgumentException("acknowledge-mode not known");
53          }
54          this.isTransacted = tx;
55          this.ackMode      = ackMode;
56          this.connection   = c;
57          this.sessionId    = sessionId;
58          this.consumers    = new ArrayList();
59          this.producers    = new ArrayList();
60          this.redelivered  = new WeakHashMap();
61          this.tmpCount     = 0;
62      }
63      
64      
65      public String getSessionId() {
66          return this.sessionId;
67      }
68      
69  
70      public void recover()
71          throws javax.jms.JMSException
72      {
73          if(this.isTransacted) {
74              throw new javax.jms.IllegalStateException("session is transacted");
75          }
76          this.connection.rollback(this);
77      }
78  
79  
80      public QueueBrowser createBrowser(Queue queue)
81          throws JMSException
82      {
83          throw new FeatureNotSupportedException();
84      }
85      
86      
87      public QueueBrowser createBrowser(Queue queue, String messageSelector)
88          throws JMSException
89      {
90          throw new FeatureNotSupportedException();
91      }
92      
93      
94      public BytesMessage createBytesMessage()
95          throws javax.jms.JMSException
96      {
97          return null;
98      }
99  
100 
101     public MessageConsumer createConsumer(Destination destination)
102         throws JMSException
103     {
104         return this.createConsumer(destination, null);
105     }
106     
107     
108     public MessageConsumer createConsumer(Destination destination,
109                                           String messageSelector)
110         throws JMSException
111     {
112         return this.createConsumer(destination, messageSelector, false);
113     }
114     
115     
116     public MessageConsumer createConsumer(Destination destination,
117                                           String messageSelector,
118                                           boolean noLocal)
119         throws JMSException
120     {
121         return this.createConsumerInternal(destination, messageSelector, noLocal);
122     }
123 
124 
125     private MessageConsumerImpl createConsumerInternal(Destination destination,
126                                                        String messageSelector,
127                                                        boolean noLocal)
128         throws JMSException
129     {
130         if(destination == null)
131             throw new java.lang.IllegalArgumentException("destination may not be null!");
132 
133         MessageConsumerImpl mc =
134             new MessageConsumerImpl((DestinationImpl)destination, messageSelector, this);
135         mc.setNoLocal(noLocal);
136         this.register((DestinationImpl)destination, mc.getConsumerId(), messageSelector);
137         this.consumers.add(mc);
138         return mc;
139     }
140 
141 
142     public QueueReceiver createReceiver(Queue queue, String messageSelector)
143         throws javax.jms.JMSException
144     {
145         return this.createConsumerInternal((Destination)queue, messageSelector, false);
146     }
147 
148 
149     public QueueReceiver createReceiver(Queue queue)
150         throws javax.jms.JMSException
151     {
152         return this.createReceiver(queue, null);
153     }
154 
155 
156     public TopicSubscriber createSubscriber(Topic topic)
157         throws javax.jms.JMSException
158     {
159         return this.createSubscriber(topic, null, false);
160     }
161 
162 
163     public TopicSubscriber createSubscriber(Topic topic,
164                                             String selector,
165                                             boolean noLocal)
166         throws javax.jms.JMSException
167     {
168         return this.createConsumerInternal(topic, selector, noLocal);
169     }
170     
171     
172     public TopicSubscriber createDurableSubscriber(Topic topic, String name)
173         throws javax.jms.JMSException
174     {
175         return this.createDurableSubscriber(topic, name, null, false);
176     }
177 
178 
179     public TopicSubscriber createDurableSubscriber(Topic topic,
180                                                    String name,
181                                                    String selector,
182                                                    boolean noLocal)
183         throws javax.jms.JMSException
184     {
185         if(topic == null)
186             throw new java.lang.IllegalArgumentException("topic is null!");
187         if(name == null)
188             throw new java.lang.IllegalArgumentException("name is null!");
189 
190         MessageConsumerImpl mc =
191             new MessageConsumerImpl((DestinationImpl)topic,
192                                      selector,
193                                      this);
194         mc.setNoLocal(noLocal);
195         mc.setDurable(true);
196         String cid = this.registerDur((DestinationImpl)topic,
197                                       name,
198                                       mc.getConsumerId(),
199                                       selector);
200         mc.setConsumerId(cid);
201         return mc;
202     }
203 
204 
205     public MapMessage createMapMessage()
206         throws javax.jms.JMSException
207     {
208         return null;
209     }
210 
211 
212     public MessageProducer createProducer(Destination destination)
213         throws JMSException
214     {
215         return this.createProducerInternal(destination);
216     }
217 
218 
219     private MessageProducerImpl createProducerInternal(Destination destination)
220         throws JMSException
221     {
222         if(destination == null) {
223             throw new java.lang.IllegalArgumentException("destination may not be null!");
224         }
225         MessageProducerImpl mp = new MessageProducerImpl((DestinationImpl)destination, this);
226         this.producers.add(mp);
227         return mp;
228     }
229 
230 
231     public QueueSender createSender(Queue queue)
232         throws javax.jms.JMSException
233     {
234         return this.createProducerInternal((Destination)queue);
235     }
236 
237 
238     public TopicPublisher createPublisher(Topic topic)
239         throws javax.jms.JMSException
240     {
241         return this.createProducerInternal((Destination)topic);
242     }
243 
244 
245     public Queue createQueue(java.lang.String queueName)
246         throws JMSException
247     {
248         if(queueName == null) {
249             throw new javax.jms.JMSException("name is null!");
250         }
251         this.createDestination(this, true, queueName);
252         return new QueueImpl(queueName);
253     }
254 
255 
256     public TemporaryTopic createTemporaryTopic()
257         throws javax.jms.JMSException
258     {
259          String tn = "tmpt" + (++this.tmpCount) + this.connection.getClientID();
260          this.createDestination(this, false, tn);
261          return new TempTopicImpl(tn);
262     }
263 
264     
265     public TemporaryQueue createTemporaryQueue()
266         throws javax.jms.JMSException
267     {
268         String qn = "tmpq" + (++this.tmpCount) + this.connection.getClientID();
269         this.createDestination(this, true, qn);
270         return new TempQueueImpl(qn);
271     }
272 
273 
274     public Topic createTopic(String name)
275         throws javax.jms.JMSException
276     {
277         if(name == null) {
278             throw new javax.jms.JMSException("name is null!");
279         }
280         this.createDestination(this, false, name);
281         return new TopicImpl(name);
282     }
283     
284     
285     public MessageListener getMessageListener()
286         throws javax.jms.JMSException 
287     {
288         return null;
289     }
290     
291     
292     public void commit()
293         throws javax.jms.JMSException
294     {
295         if(!this.isTransacted) {
296             throw new IllegalStateException("session is not transacted");
297         }
298         this.doCommit();
299     }
300 
301 
302     private void doCommit()
303         throws javax.jms.JMSException
304     {
305         this.redelivered.clear();
306         this.connection.commit(this);
307     }
308 
309 
310     void acknowledge()
311         throws javax.jms.JMSException
312     {
313         if(!this.isTransacted && this.ackMode == Session.CLIENT_ACKNOWLEDGE) {
314             this.doCommit();
315         }
316     }
317     
318     
319     public void rollback()
320         throws javax.jms.JMSException
321     {
322         if(!this.isTransacted) {
323             throw new IllegalStateException("session is not transacted");
324         }
325         this.connection.rollback(this);
326     }
327     
328     
329     public ObjectMessage createObjectMessage(java.io.Serializable s)
330         throws javax.jms.JMSException
331     {
332         ObjectMessage om = new ObjectMessageImpl();
333         om.setObject(s);
334         return om;
335     }
336     
337     
338     public TextMessage createTextMessage()
339         throws javax.jms.JMSException
340     {
341         return new TextMessageImpl();
342     }
343     
344     
345     public ObjectMessage createObjectMessage()
346         throws javax.jms.JMSException
347     {
348         ObjectMessage om = new ObjectMessageImpl();
349         return om;
350     }
351     
352     
353     public void close()
354         throws javax.jms.JMSException
355     {
356         for(int i = 0; i < this.producers.size(); i++) {
357             ((MessageProducer)this.producers.get(i)).close();
358         }
359         this.producers.clear();
360         for(int i = 0; i < this.consumers.size(); i++) {
361             ((MessageConsumer)this.consumers.get(i)).close();
362         }
363         this.consumers.clear();
364         this.connection.closeSession(this);
365     }
366 
367 
368     void closeConsumer(MessageConsumerImpl consumer)
369         throws javax.jms.JMSException
370     {
371         this.consumers.remove(consumer);
372         if(!consumer.getDurable()) {
373             this.unregister((DestinationImpl)consumer.getDestination(),
374                             consumer.getConsumerId());
375         }
376     }
377 
378 
379     void closeProducer(MessageProducer producer) {
380         this.producers.remove(producer);
381     }
382 
383     
384     public TextMessage createTextMessage(String text)
385         throws javax.jms.JMSException
386     {
387         return new TextMessageImpl(text);
388     }
389     
390     
391     public void setMessageListener(MessageListener listener)
392         throws javax.jms.JMSException
393     {
394         throw new javax.jms.IllegalStateException("not supported. use MessageConcumser.setMessageListener");
395     }
396     
397     
398     public StreamMessage createStreamMessage()
399         throws javax.jms.JMSException
400     {
401         return null;
402     }
403     
404     
405     public Message createMessage()
406         throws javax.jms.JMSException
407     {
408         return null;
409     }
410     
411     
412     public boolean getTransacted()
413         throws javax.jms.JMSException
414     {
415         return this.isTransacted;
416     }
417 
418 
419     public int getAcknowledgeMode()
420         throws javax.jms.JMSException
421     {
422         return this.ackMode;
423     }
424     
425     
426     public void run() {
427     }
428 
429 
430     public void unsubscribe(String name)
431         throws javax.jms.JMSException
432     {
433         this.unregisterDur(name);
434     }
435 
436 
437     long getPollSync() {
438         return this.connection.getPollSync();
439     }
440 
441 
442     long getPollAsync() {
443         return this.connection.getPollAsync();
444     }
445 
446 
447     void register(DestinationImpl dest, String consumerId, String selector)
448         throws JMSException
449     {
450         this.connection.register(dest, consumerId, selector);
451     }
452 
453 
454     String registerDur(DestinationImpl dest,
455                        String name,
456                        String consumerId,
457                        String selector)
458         throws JMSException
459     {
460         return this.connection.registerDur(dest, name, consumerId, selector);
461     }
462 
463 
464     void unregister(DestinationImpl dest, String consumerId)
465         throws JMSException
466     {
467         this.connection.unregister(dest, consumerId);
468     }
469 
470 
471     void unregisterDur(String name)
472         throws JMSException
473     {
474         this.connection.unregisterDur(this.sessionId, name);
475     }
476 
477 
478     void send(MessageImpl msg, boolean disableMessageId)
479         throws javax.jms.JMSException
480     {
481         this.connection.send(this, msg, disableMessageId);
482     }
483 
484 
485     void createDestination(SessionImpl session, boolean queue, String name)
486         throws JMSException
487     {
488         this.connection.createDestination(session, queue, name);
489     }
490 
491 
492     javax.jms.Message receive(String destinationName, String consumerId)
493         throws javax.jms.JMSException
494     {
495         javax.jms.Message msg = this.connection.receive(this, destinationName, consumerId);
496         if(msg != null) {
497             Map map = (Map)this.redelivered.get(consumerId);
498             if(map == null) {
499                 map = new WeakHashMap();
500                 this.redelivered.put(consumerId, map);
501             }
502             String msgid = msg.getJMSMessageID();
503             if(map.get(msgid) != null) {
504                 msg.setJMSRedelivered(true);
505             } else {
506                 map.put(msgid, msgid);
507             }
508             ((MessageImpl)msg).setReceivingSession(this);
509         }
510         return msg;
511     }
512 
513 }