Docjar: A Java Source and Docuemnt Enginecom.*    java.*    javax.*    org.*    all    new    plug-in

Quick Search    Search Deep

Source code: org/mom4j/jms/MessageConsumerImpl.java


1   package org.mom4j.jms;
2   
3   import javax.jms.Destination;
4   import javax.jms.JMSException;
5   import javax.jms.Message;
6   import javax.jms.MessageConsumer;
7   import javax.jms.MessageListener;
8   import javax.jms.Queue;
9   import javax.jms.QueueReceiver;
10  import javax.jms.Topic;
11  import javax.jms.TopicSubscriber;
12  
13  /**
14   *
15   */
16  public class MessageConsumerImpl implements MessageConsumer, QueueReceiver, TopicSubscriber {
17      
18      private static long consumerCount = 0;
19  
20      private DestinationImpl destination;
21      private String          messageSelector;
22      private SessionImpl     session;
23      private ReceiverWorker  worker;
24      private MessageListener listener;
25      private String          consumerId;
26      private long            pollSync;
27      private long            pollAsync;
28      private boolean         durable;
29      private boolean         noLocal;
30  
31  
32      MessageConsumerImpl(DestinationImpl dest,
33                          String messageSelector,
34                          SessionImpl s)
35      {
36          this.destination     = dest;
37          this.messageSelector = messageSelector;
38          this.session         = s;
39          this.worker          = null;
40          this.listener        = null;
41          this.consumerId      = s.getSessionId() + "@" + (consumerCount++);
42          this.pollSync        = s.getPollSync();
43          this.pollAsync       = s.getPollAsync();
44          this.durable         = false;
45          this.noLocal         = false;
46      }
47  
48  
49      void setConsumerId(String cid) {
50          this.consumerId = cid;
51      }
52      
53  
54      public void setMessageListener(MessageListener listener)
55          throws JMSException
56      {
57          if(listener != null && this.listener != null) {
58              throw new javax.jms.IllegalStateException("listener already set");
59          } else if(listener == null && this.listener != null) {
60              this.worker.stopListener();
61              this.worker   = null;
62              this.listener = null;
63          } else if(listener != null && this.listener == null) {
64              this.listener = listener;
65              this.worker   = new ReceiverWorker(this.listener, this, this.pollAsync);
66              this.worker.start();
67          } else {
68              throw new javax.jms.IllegalStateException("no listener set");
69          }
70      }
71      
72      
73      public Message receive()
74          throws JMSException
75      {
76          if(this.listener != null) {
77              throw new javax.jms.IllegalStateException("MessageListener is set");
78          }
79          Message msg = null;
80          while(msg == null) {
81              msg = this.session.receive(this.destination.getName(), this.consumerId);
82              if(msg == null) {
83                  try {
84                      Thread.sleep(this.pollSync);
85                  } catch(InterruptedException ex) {
86                      break;
87                  }
88              }
89          }
90          return msg;
91      }
92      
93      
94      public Message receive(long timeout)
95          throws JMSException
96      {
97          if(this.listener != null) {
98              throw new javax.jms.IllegalStateException("MessageListener is set");
99          }
100         long tt = System.currentTimeMillis() + timeout;
101         Message msg = null;
102         while(msg == null) {
103             msg = this.session.receive(this.destination.getName(), this.consumerId);
104             if(msg == null) {
105                 if(System.currentTimeMillis() > tt) {
106                     break;
107                 }
108                 try {
109                     Thread.sleep(this.pollSync);
110                 } catch(InterruptedException ex) {
111                     break;
112                 }
113             }
114         }
115         return msg;
116     }
117     
118     
119     public Message receiveNoWait()
120         throws JMSException
121     {
122         if(this.listener != null) {
123             throw new javax.jms.IllegalStateException("MessageListener is set");
124         }
125         return this.session.receive(this.destination.getName(), this.consumerId);
126     }
127 
128     
129     public void close()
130         throws JMSException
131     {
132         if(this.worker != null) {
133             this.worker.stopListener();
134             this.worker   = null;
135             this.listener = null;
136         }
137         this.session.closeConsumer(this);
138     }
139     
140     
141     public void setDurable(boolean dur) {
142         this.durable = dur;
143     }
144 
145 
146     public boolean getDurable() {
147         return this.durable;
148     }
149 
150 
151     public void setNoLocal(boolean noLocal) {
152         this.noLocal = noLocal;
153     }
154 
155 
156     public boolean getNoLocal() {
157         return this.noLocal;
158     }
159 
160 
161     public MessageListener getMessageListener()
162         throws JMSException
163     {
164         return this.listener;
165     }
166     
167     
168     public String getMessageSelector()
169         throws JMSException
170     {
171         return this.messageSelector;
172     }
173 
174 
175     public Destination getDestination()
176         throws JMSException
177     {
178         return this.destination;
179     }
180 
181 
182     final String getConsumerId() {
183         return this.consumerId;
184     }
185     
186     
187     final Message receiveInternal() {
188         Message msg = null;
189         try {
190             msg = this.session.receive(this.destination.getName(), this.consumerId);
191         } catch(JMSException ex) {
192             ex.printStackTrace();
193         }
194         return msg;
195     }
196 
197 
198     //
199     // QueueReceiver specific methods
200     //
201 
202     public Queue getQueue()
203         throws javax.jms.JMSException
204     {
205         return (Queue)this.destination;
206     }
207 
208     
209     //
210     // TopicSubscriber specific methods
211     //
212 
213     public Topic getTopic()
214         throws JMSException
215     {
216         return (Topic)this.destination;
217     }    
218     
219 }