Source code: org/mom4j/jms/ConnectionImpl.java
1 package org.mom4j.jms;
2
3 import javax.jms.Connection;
4 import javax.jms.ConnectionConsumer;
5 import javax.jms.ConnectionMetaData;
6 import javax.jms.Destination;
7 import javax.jms.ExceptionListener;
8 import javax.jms.JMSException;
9 import javax.jms.Queue;
10 import javax.jms.QueueConnection;
11 import javax.jms.QueueSession;
12 import javax.jms.ServerSessionPool;
13 import javax.jms.Session;
14 import javax.jms.Topic;
15 import javax.jms.TopicConnection;
16 import javax.jms.TopicSession;
17
18 import java.rmi.server.UID;
19 import java.util.Collections;
20 import java.util.Map;
21 import java.util.HashMap;
22 import java.util.Iterator;
23
24 import org.mom4j.xcp.XcpResponse;
25 import org.mom4j.xcp.XcpSender;
26 import org.mom4j.messaging.XcpMessage;
27 import org.mom4j.messaging.XcpMessageHandler;
28
29
30 public class ConnectionImpl implements Connection, TopicConnection, QueueConnection {
31
32 private static ConnectionMetaData metaData = new ConnectionMetaDataImpl();
33
34 private static String ID_SUFFIX;
35
36 private static long connectionCount = 0;
37
38 private static java.util.Random random = new java.util.Random();
39
40 private ExceptionListener exceptionListener;
41 private String hostname;
42 private String clientId;
43 private boolean started;
44 private boolean closed;
45 private int port;
46 private long sessionCount;
47 private long msgCount;
48 private long pollSync;
49 private long pollAsync;
50
51 protected Map sessions;
52 protected XcpSender sender;
53
54 static {
55 try {
56 java.net.InetAddress addr = java.net.InetAddress.getLocalHost();
57 java.util.StringTokenizer st =
58 new java.util.StringTokenizer(addr.getHostAddress(), ".");
59 StringBuffer sb = new StringBuffer();
60 while(st.hasMoreTokens()) {
61 sb.insert(0, st.nextToken());
62 }
63 ID_SUFFIX = sb.toString();
64 } catch(Exception ex) {
65 ex.printStackTrace();
66 throw new IllegalStateException(ex.getMessage());
67 }
68 }
69
70
71 public ConnectionImpl(String hostname,
72 int port,
73 String username,
74 String password,
75 long pollSync,
76 long pollAsync)
77 throws JMSException
78 {
79 if(hostname == null)
80 throw new IllegalArgumentException("hostname is null!");
81
82 long t = Math.abs(System.currentTimeMillis() ^ random.nextLong());
83
84 this.hostname = hostname;
85 this.port = port;
86 this.exceptionListener = null;
87 //this.clientId = new UID().toString() + "@" + ID_SUFFIX;
88 this.clientId = Long.toString(t) + (connectionCount++) + "@" + ID_SUFFIX;
89 this.started = false;
90 this.closed = false;
91 this.sessions = Collections.synchronizedMap(new HashMap());
92 this.sessionCount = 0;
93 this.msgCount = 1;
94 this.pollSync = pollSync;
95 this.pollAsync = pollAsync;
96 try {
97 this.sender =
98 new XcpSender(java.net.InetAddress.getByName(hostname), port);
99 } catch(java.net.UnknownHostException ex) {
100 throw new JMSException("unknown host:" + hostname);
101 }
102 }
103
104
105 //
106 // javax.jms.Connection - Interface
107 //
108
109 public String getClientID()
110 throws JMSException
111 {
112 return this.clientId;
113 }
114
115
116 public void start()
117 throws JMSException
118 {
119 if(this.closed) {
120 throw new javax.jms.IllegalStateException("connection is closed");
121 }
122 if(this.started) {
123 return;
124 }
125
126 this.started = true;
127 }
128
129
130 public void stop()
131 throws JMSException
132 {
133 if(this.closed) {
134 throw new javax.jms.IllegalStateException("connection is closed");
135 }
136 if(!this.started) {
137 return;
138 }
139
140 this.started = false;
141 }
142
143
144 public void setExceptionListener(ExceptionListener listener)
145 throws JMSException
146 {
147 if(listener == null)
148 throw new IllegalArgumentException("listener is null!");
149
150 this.exceptionListener = listener;
151 }
152
153
154 public void setClientID(String id)
155 throws JMSException
156 {
157 throw new javax.jms.IllegalStateException("id is set already.");
158 }
159
160
161 public void close()
162 throws JMSException
163 {
164 if(this.closed) {
165 return;
166 }
167 if(this.started) {
168 this.stop();
169 }
170 Iterator it = this.sessions.keySet().iterator();
171 while(it.hasNext()) {
172 String s = (String)it.next();
173 SessionImpl si = (SessionImpl)this.sessions.get(s);
174 si.close();
175 }
176 this.closed = true;
177 }
178
179
180 public ConnectionMetaData getMetaData()
181 throws JMSException
182 {
183 return this.metaData;
184 }
185
186
187 public ExceptionListener getExceptionListener()
188 throws javax.jms.JMSException
189 {
190 return this.exceptionListener;
191 }
192
193
194 public ConnectionConsumer createConnectionConsumer(
195 Destination dest,
196 String messageSelector,
197 ServerSessionPool pool,
198 int maxMessages)
199 throws JMSException
200 {
201 throw new FeatureNotSupportedException();
202 }
203
204
205 public Session createSession(boolean transacted, int acknowledgeMode)
206 throws JMSException
207 {
208 return this.createSessionInternal(transacted, acknowledgeMode);
209 }
210
211
212 private SessionImpl createSessionInternal(boolean transacted, int acknowledgeMode)
213 throws JMSException
214 {
215 SessionImpl s = null;
216 String sid = (++this.sessionCount) + this.getClientID();
217
218 try {
219 s = new SessionImpl(sid, transacted, acknowledgeMode, this);
220 } catch(IllegalArgumentException ex) {
221 throw new JMSException(ex.getMessage());
222 }
223
224 this.sessions.put(sid, s);
225
226 return s;
227 }
228
229
230 //
231 // javax.jms.TopicConnection - Interface
232 //
233
234 public TopicSession createTopicSession(boolean transacted, int acknowledgeMode)
235 throws JMSException
236 {
237 return this.createSessionInternal(transacted, acknowledgeMode);
238 }
239
240
241 public ConnectionConsumer createConnectionConsumer(
242 Topic topic,
243 String messageSelector,
244 ServerSessionPool pool,
245 int maxMessages)
246 throws JMSException
247 {
248 throw new FeatureNotSupportedException();
249 }
250
251
252 public ConnectionConsumer createDurableConnectionConsumer(
253 Topic topic,
254 String messageSelector,
255 String subscriptionName,
256 ServerSessionPool pool,
257 int maxMessages)
258 throws JMSException
259 {
260 throw new FeatureNotSupportedException();
261 }
262
263
264 //
265 // javax.jms.QueueConnection - Interface
266 //
267
268 public QueueSession createQueueSession(boolean transacted, int acknowledgeMode)
269 throws JMSException
270 {
271 return this.createSessionInternal(transacted, acknowledgeMode);
272 }
273
274
275 public ConnectionConsumer createConnectionConsumer(
276 Queue queue,
277 String messageSelector,
278 ServerSessionPool pool,
279 int maxMessages)
280 throws JMSException
281 {
282 throw new FeatureNotSupportedException();
283 }
284
285
286 //
287 // Internal Methods
288 //
289
290 private synchronized String nextMsgId() {
291 return (msgCount++) + this.clientId;
292 }
293
294
295 long getPollSync() {
296 return this.pollSync;
297 }
298
299
300 long getPollAsync() {
301 return this.pollAsync;
302 }
303
304
305 void register(DestinationImpl dest, String consumerId, String messageSelector)
306 throws JMSException
307 {
308 int pos = consumerId.lastIndexOf("@");
309 XcpMessage message = new XcpMessage(consumerId.substring(0, pos),
310 XcpMessage.ACTION_REGISTER,
311 dest.getName(),
312 consumerId);
313 message.setMessageSelector(messageSelector);
314 try {
315 this.sender.send(message);
316 } catch(java.io.IOException ex) {
317 ex.printStackTrace();
318 throw new JMSException(ex.getMessage());
319 }
320 }
321
322
323 String registerDur(DestinationImpl dest,
324 String name,
325 String consumerId,
326 String messageSelector)
327 throws JMSException
328 {
329 int pos = consumerId.lastIndexOf("@");
330 XcpMessage message = new XcpMessage(consumerId.substring(0, pos),
331 XcpMessage.ACTION_REGISTER_DUR,
332 dest.getName(),
333 consumerId);
334 message.setMessageSelector(messageSelector);
335 message.setSubscriberName(name + "@" + ID_SUFFIX);
336 XcpResponse resp = null;
337 try {
338 resp = this.sender.send(message, new XcpMessageHandler());
339 } catch(java.io.IOException ex) {
340 ex.printStackTrace();
341 throw new JMSException(ex.getMessage());
342 }
343 XcpMessage msg = (XcpMessage)resp.getRootElement();
344
345 return msg.getConsumerId();
346 }
347
348
349 void unregister(DestinationImpl dest, String consumerId)
350 throws JMSException
351 {
352 int pos = consumerId.lastIndexOf("@");
353 XcpMessage message = new XcpMessage(consumerId.substring(0, pos),
354 XcpMessage.ACTION_UNREGISTER,
355 dest.getName(),
356 consumerId);
357 try {
358 this.sender.send(message);
359 } catch(java.io.IOException ex) {
360 ex.printStackTrace();
361 throw new JMSException(ex.getMessage());
362 }
363 }
364
365
366 void unregisterDur(String sessionId, String name)
367 throws JMSException
368 {
369 XcpMessage message = new XcpMessage(sessionId,
370 XcpMessage.ACTION_UNREGISTER_DUR);
371 message.setSubscriberName(name + "@" + ID_SUFFIX);
372 try {
373 this.sender.send(message);
374 } catch(java.io.IOException ex) {
375 ex.printStackTrace();
376 throw new JMSException(ex.getMessage());
377 }
378 }
379
380
381 void createDestination(SessionImpl session, boolean queue, String name)
382 throws JMSException
383 {
384 String sid = session.getSessionId();
385 if(this.sessions.get(sid) == null)
386 throw new javax.jms.IllegalStateException("session is closed");
387
388 String action = queue ? XcpMessage.ACTION_CREATE_QUEUE
389 : XcpMessage.ACTION_CREATE_TOPIC;
390 XcpMessage message = new XcpMessage(sid, action, name);
391 try {
392 this.sender.send(message);
393 } catch(java.io.IOException ex) {
394 ex.printStackTrace();
395 throw new JMSException(ex.getMessage());
396 }
397 }
398
399
400 void send(SessionImpl session, MessageImpl msg, boolean disableMessageId)
401 throws JMSException
402 {
403 if(!disableMessageId) {
404 msg.setJMSMessageID(this.nextMsgId());
405 }
406 String sid = session.getSessionId();
407 if(this.sessions.get(sid) == null)
408 throw new javax.jms.IllegalStateException("session is closed");
409
410 DestinationImpl d = (DestinationImpl)msg.getJMSDestination();
411 String action = session.isTransacted
412 ? XcpMessage.ACTION_SEND_TX
413 : XcpMessage.ACTION_SEND;
414 XcpMessage message = new XcpMessage(sid, action, d.getName());
415 message.setMessage(msg);
416 try {
417 this.sender.send(message);
418 } catch(java.io.IOException ex) {
419 ex.printStackTrace();
420 throw new JMSException(ex.getMessage());
421 }
422 }
423
424
425 javax.jms.Message receive(SessionImpl session,
426 String destinationName,
427 String consumerId)
428 throws JMSException
429 {
430 String sid = session.getSessionId();
431 if(this.sessions.get(sid) == null)
432 throw new javax.jms.IllegalStateException("session is closed");
433
434 String action = null;
435 if(!session.isTransacted && session.ackMode != Session.CLIENT_ACKNOWLEDGE) {
436 action = XcpMessage.ACTION_RECEIVE;
437 } else {
438 action = XcpMessage.ACTION_RECEIVE_TX;
439 }
440 XcpMessage message = new XcpMessage(sid,
441 action,
442 destinationName,
443 consumerId);
444 XcpResponse resp = null;
445 try {
446 resp = this.sender.send(message, new XcpMessageHandler());
447 } catch(java.io.IOException ex) {
448 ex.printStackTrace();
449 throw new JMSException(ex.getMessage());
450 }
451 message = (XcpMessage)resp.getRootElement();
452
453 return message.getMessage();
454 }
455
456
457 void commit(SessionImpl session)
458 throws JMSException
459 {
460 String sid = session.getSessionId();
461 if(this.sessions.get(sid) == null)
462 throw new javax.jms.IllegalStateException("session is closed");
463
464 XcpMessage message = new XcpMessage(sid,
465 XcpMessage.ACTION_COMMIT,
466 null);
467 try {
468 this.sender.send(message);
469 } catch(java.io.IOException ex) {
470 ex.printStackTrace();
471 throw new JMSException(ex.getMessage());
472 }
473 }
474
475
476 void rollback(SessionImpl session)
477 throws JMSException
478 {
479 String sid = session.getSessionId();
480 if(this.sessions.get(sid) == null)
481 throw new javax.jms.IllegalStateException("session is closed");
482
483 XcpMessage message = new XcpMessage(sid,
484 XcpMessage.ACTION_ROLLBACK,
485 null);
486 try {
487 this.sender.send(message);
488 } catch(java.io.IOException ex) {
489 ex.printStackTrace();
490 throw new JMSException(ex.getMessage());
491 }
492 }
493
494
495 void closeSession(SessionImpl session)
496 throws JMSException
497 {
498 this.sessions.remove(session.getSessionId());
499 }
500
501 }