Docjar: A Java Source and Docuemnt Enginecom.*    java.*    javax.*    org.*    all    new    plug-in

Quick Search    Search Deep

Source code: org/mom4j/jms/ConnectionImpl.java


1   package org.mom4j.jms;
2   
3   import javax.jms.Connection;
4   import javax.jms.ConnectionConsumer;
5   import javax.jms.ConnectionMetaData;
6   import javax.jms.Destination;
7   import javax.jms.ExceptionListener;
8   import javax.jms.JMSException;
9   import javax.jms.Queue;
10  import javax.jms.QueueConnection;
11  import javax.jms.QueueSession;
12  import javax.jms.ServerSessionPool;
13  import javax.jms.Session;
14  import javax.jms.Topic;
15  import javax.jms.TopicConnection;
16  import javax.jms.TopicSession;
17  
18  import java.rmi.server.UID;
19  import java.util.Collections;
20  import java.util.Map;
21  import java.util.HashMap;
22  import java.util.Iterator;
23  
24  import org.mom4j.xcp.XcpResponse;
25  import org.mom4j.xcp.XcpSender;
26  import org.mom4j.messaging.XcpMessage;
27  import org.mom4j.messaging.XcpMessageHandler;
28  
29  
30  public class ConnectionImpl implements Connection, TopicConnection, QueueConnection {
31      
32      private static ConnectionMetaData metaData = new ConnectionMetaDataImpl();
33  
34      private static String ID_SUFFIX;
35  
36      private static long connectionCount = 0;
37  
38      private static java.util.Random random = new java.util.Random();
39      
40      private ExceptionListener exceptionListener;
41      private String            hostname;
42      private String            clientId;
43      private boolean           started;
44      private boolean           closed;
45      private int               port;
46      private long              sessionCount;
47      private long              msgCount;
48      private long              pollSync;
49      private long              pollAsync;
50      
51      protected Map       sessions;
52      protected XcpSender sender;
53      
54      static {
55          try {
56              java.net.InetAddress addr = java.net.InetAddress.getLocalHost();
57              java.util.StringTokenizer st =
58                  new java.util.StringTokenizer(addr.getHostAddress(), ".");
59              StringBuffer sb = new StringBuffer();
60              while(st.hasMoreTokens()) {
61                  sb.insert(0, st.nextToken());
62              }
63              ID_SUFFIX = sb.toString();
64          } catch(Exception ex) {
65              ex.printStackTrace();
66              throw new IllegalStateException(ex.getMessage());
67          }
68      }
69  
70     
71      public ConnectionImpl(String hostname,
72                            int    port,
73                            String username,
74                            String password,
75                            long pollSync,
76                            long pollAsync)
77          throws JMSException
78      {
79          if(hostname == null)
80              throw new IllegalArgumentException("hostname is null!");
81          
82          long t = Math.abs(System.currentTimeMillis() ^ random.nextLong());
83  
84          this.hostname          = hostname;
85          this.port              = port;
86          this.exceptionListener = null;
87          //this.clientId          = new UID().toString() + "@" + ID_SUFFIX;
88          this.clientId          = Long.toString(t) + (connectionCount++) + "@" + ID_SUFFIX;
89          this.started           = false;
90          this.closed            = false;
91          this.sessions          = Collections.synchronizedMap(new HashMap());
92          this.sessionCount      = 0;
93          this.msgCount          = 1;
94          this.pollSync          = pollSync;
95          this.pollAsync         = pollAsync;
96          try {
97              this.sender =
98                  new XcpSender(java.net.InetAddress.getByName(hostname), port);
99          } catch(java.net.UnknownHostException ex) {
100             throw new JMSException("unknown host:" + hostname);
101         }
102     }
103     
104 
105     //
106     // javax.jms.Connection - Interface
107     //
108 
109     public String getClientID()
110         throws JMSException
111     {
112         return this.clientId;
113     }
114     
115     
116     public void start()
117         throws JMSException
118     {
119         if(this.closed) {
120             throw new javax.jms.IllegalStateException("connection is closed");
121         }
122         if(this.started) {
123             return;
124         }
125         
126         this.started = true;
127     }
128     
129     
130     public void stop()
131         throws JMSException
132     {
133         if(this.closed) {
134             throw new javax.jms.IllegalStateException("connection is closed");
135         }
136         if(!this.started) {
137             return;
138         }
139         
140         this.started = false;
141     }
142     
143     
144     public void setExceptionListener(ExceptionListener listener)
145         throws JMSException
146     {
147         if(listener == null)
148             throw new IllegalArgumentException("listener is null!");
149         
150         this.exceptionListener = listener;
151     }
152     
153     
154     public void setClientID(String id)
155         throws JMSException
156     {
157         throw new javax.jms.IllegalStateException("id is set already.");
158     }
159     
160     
161     public void close()
162         throws JMSException
163     {
164         if(this.closed) {
165             return;
166         }
167         if(this.started) {
168             this.stop();
169         }
170         Iterator it = this.sessions.keySet().iterator();
171         while(it.hasNext()) {
172             String s = (String)it.next();
173             SessionImpl si = (SessionImpl)this.sessions.get(s);
174             si.close();
175         }
176         this.closed = true;
177     }
178     
179     
180     public ConnectionMetaData getMetaData()
181         throws JMSException
182     {
183         return this.metaData;
184     }
185     
186     
187     public ExceptionListener getExceptionListener()
188         throws javax.jms.JMSException
189     {
190         return this.exceptionListener;
191     }
192 
193 
194     public ConnectionConsumer createConnectionConsumer(
195             Destination dest,
196             String messageSelector,
197             ServerSessionPool pool,
198             int maxMessages)
199         throws JMSException
200     {
201         throw new FeatureNotSupportedException();
202     }
203 
204 
205     public Session createSession(boolean transacted, int acknowledgeMode)
206         throws JMSException
207     {
208         return this.createSessionInternal(transacted, acknowledgeMode);
209     }
210 
211 
212     private SessionImpl createSessionInternal(boolean transacted, int acknowledgeMode)
213         throws JMSException
214     {
215         SessionImpl s = null;
216         String    sid = (++this.sessionCount) + this.getClientID();
217 
218         try {
219             s = new SessionImpl(sid, transacted, acknowledgeMode, this);
220         } catch(IllegalArgumentException ex) {
221             throw new JMSException(ex.getMessage());
222         }
223 
224         this.sessions.put(sid, s);
225 
226         return s;
227     }
228 
229 
230     //
231     // javax.jms.TopicConnection - Interface
232     //
233 
234     public TopicSession createTopicSession(boolean transacted, int acknowledgeMode)
235         throws JMSException
236     {
237         return this.createSessionInternal(transacted, acknowledgeMode);
238     }
239 
240 
241     public ConnectionConsumer createConnectionConsumer(
242             Topic topic,
243             String messageSelector,
244             ServerSessionPool pool,
245             int maxMessages)
246         throws JMSException
247     {
248         throw new FeatureNotSupportedException();
249     }
250 
251 
252     public ConnectionConsumer createDurableConnectionConsumer(
253             Topic topic,
254             String messageSelector,
255             String subscriptionName,
256             ServerSessionPool pool,
257             int maxMessages)
258         throws JMSException
259     {
260         throw new FeatureNotSupportedException();
261     }
262     
263     
264     //
265     // javax.jms.QueueConnection - Interface
266     //
267 
268     public QueueSession createQueueSession(boolean transacted, int acknowledgeMode)
269         throws JMSException
270     {
271         return this.createSessionInternal(transacted, acknowledgeMode);
272     }
273 
274 
275     public ConnectionConsumer createConnectionConsumer(
276             Queue queue,
277             String messageSelector,
278             ServerSessionPool pool,
279             int maxMessages)
280         throws JMSException
281     {
282         throw new FeatureNotSupportedException();
283     }
284     
285 
286     //
287     // Internal Methods
288     //
289 
290     private synchronized String nextMsgId() {
291         return (msgCount++) + this.clientId;
292     }
293 
294 
295     long getPollSync() {
296         return this.pollSync;
297     }
298 
299 
300     long getPollAsync() {
301         return this.pollAsync;
302     }
303 
304 
305     void register(DestinationImpl dest, String consumerId, String messageSelector)
306         throws JMSException
307     {
308         int pos = consumerId.lastIndexOf("@");
309         XcpMessage message = new XcpMessage(consumerId.substring(0, pos),
310                                             XcpMessage.ACTION_REGISTER,
311                                             dest.getName(),
312                                             consumerId);
313         message.setMessageSelector(messageSelector);
314         try {
315             this.sender.send(message);
316         } catch(java.io.IOException ex) {
317             ex.printStackTrace();
318             throw new JMSException(ex.getMessage());
319         }
320     }
321     
322     
323     String registerDur(DestinationImpl dest,
324                        String name,
325                        String consumerId,
326                        String messageSelector)
327         throws JMSException
328     {
329         int pos = consumerId.lastIndexOf("@");
330         XcpMessage message = new XcpMessage(consumerId.substring(0, pos),
331                                             XcpMessage.ACTION_REGISTER_DUR,
332                                             dest.getName(),
333                                             consumerId);
334         message.setMessageSelector(messageSelector);
335         message.setSubscriberName(name + "@" + ID_SUFFIX);
336         XcpResponse resp = null;
337         try {
338             resp = this.sender.send(message, new XcpMessageHandler());
339         } catch(java.io.IOException ex) {
340             ex.printStackTrace();
341             throw new JMSException(ex.getMessage());
342         }
343         XcpMessage msg = (XcpMessage)resp.getRootElement();
344 
345         return msg.getConsumerId();
346     }
347 
348 
349     void unregister(DestinationImpl dest, String consumerId)
350         throws JMSException
351     {
352         int pos = consumerId.lastIndexOf("@");
353         XcpMessage message = new XcpMessage(consumerId.substring(0, pos),
354                                             XcpMessage.ACTION_UNREGISTER,
355                                             dest.getName(),
356                                             consumerId);
357         try {
358             this.sender.send(message);
359         } catch(java.io.IOException ex) {
360             ex.printStackTrace();
361             throw new JMSException(ex.getMessage());
362         }
363     }
364 
365 
366     void unregisterDur(String sessionId, String name)
367         throws JMSException
368     {
369         XcpMessage message = new XcpMessage(sessionId,
370                                             XcpMessage.ACTION_UNREGISTER_DUR);
371         message.setSubscriberName(name + "@" + ID_SUFFIX);
372         try {
373             this.sender.send(message);
374         } catch(java.io.IOException ex) {
375             ex.printStackTrace();
376             throw new JMSException(ex.getMessage());
377         }
378     }
379 
380 
381     void createDestination(SessionImpl session, boolean queue, String name)
382         throws JMSException
383     {
384         String sid = session.getSessionId();
385         if(this.sessions.get(sid) == null)
386             throw new javax.jms.IllegalStateException("session is closed");
387 
388         String action = queue ? XcpMessage.ACTION_CREATE_QUEUE
389                               : XcpMessage.ACTION_CREATE_TOPIC;
390         XcpMessage message = new XcpMessage(sid, action, name);
391         try {
392             this.sender.send(message);
393         } catch(java.io.IOException ex) {
394             ex.printStackTrace();
395             throw new JMSException(ex.getMessage());
396         }
397     }
398     
399     
400     void send(SessionImpl session, MessageImpl msg, boolean disableMessageId)
401         throws JMSException
402     {
403         if(!disableMessageId) {
404             msg.setJMSMessageID(this.nextMsgId());
405         }
406         String sid = session.getSessionId();
407         if(this.sessions.get(sid) == null)
408             throw new javax.jms.IllegalStateException("session is closed");
409         
410         DestinationImpl d = (DestinationImpl)msg.getJMSDestination();
411         String action = session.isTransacted
412                       ? XcpMessage.ACTION_SEND_TX
413                       : XcpMessage.ACTION_SEND;
414         XcpMessage message = new XcpMessage(sid, action, d.getName());
415         message.setMessage(msg);
416         try {
417             this.sender.send(message);
418         } catch(java.io.IOException ex) {
419             ex.printStackTrace();
420             throw new JMSException(ex.getMessage());
421         }
422     }
423     
424     
425     javax.jms.Message receive(SessionImpl session,
426                               String destinationName,
427                               String consumerId)
428         throws JMSException
429     {
430         String sid = session.getSessionId();
431         if(this.sessions.get(sid) == null)
432             throw new javax.jms.IllegalStateException("session is closed");
433         
434         String action = null;
435         if(!session.isTransacted && session.ackMode != Session.CLIENT_ACKNOWLEDGE) {
436             action = XcpMessage.ACTION_RECEIVE;
437         } else {
438             action = XcpMessage.ACTION_RECEIVE_TX;
439         }
440         XcpMessage message = new XcpMessage(sid,
441                                             action,
442                                             destinationName,
443                                             consumerId);
444         XcpResponse resp = null;
445         try {
446             resp = this.sender.send(message, new XcpMessageHandler());
447         } catch(java.io.IOException ex) {
448             ex.printStackTrace();
449             throw new JMSException(ex.getMessage());
450         }
451         message = (XcpMessage)resp.getRootElement();
452         
453         return message.getMessage();
454     }
455     
456     
457     void commit(SessionImpl session)
458         throws JMSException
459     {
460         String sid = session.getSessionId();
461         if(this.sessions.get(sid) == null)
462             throw new javax.jms.IllegalStateException("session is closed");
463         
464         XcpMessage message = new XcpMessage(sid,
465                                             XcpMessage.ACTION_COMMIT,
466                                             null);
467         try {
468             this.sender.send(message);
469         } catch(java.io.IOException ex) {
470             ex.printStackTrace();
471             throw new JMSException(ex.getMessage());
472         }
473     }
474     
475     
476     void rollback(SessionImpl session)
477         throws JMSException
478     {
479         String sid = session.getSessionId();
480         if(this.sessions.get(sid) == null)
481             throw new javax.jms.IllegalStateException("session is closed");
482         
483         XcpMessage message = new XcpMessage(sid,
484                                             XcpMessage.ACTION_ROLLBACK,
485                                             null);
486         try {
487             this.sender.send(message);
488         } catch(java.io.IOException ex) {
489             ex.printStackTrace();
490             throw new JMSException(ex.getMessage());
491         }
492     }
493     
494     
495     void closeSession(SessionImpl session)
496         throws JMSException
497     {
498         this.sessions.remove(session.getSessionId());
499     }
500 
501 }