1 /*
2 * JBoss, Home of Professional Open Source
3 * Copyright 2005, JBoss Inc., and individual contributors as indicated
4 * by the @authors tag. See the copyright.txt in the distribution for a
5 * full listing of individual contributors.
6 *
7 * This is free software; you can redistribute it and/or modify it
8 * under the terms of the GNU Lesser General Public License as
9 * published by the Free Software Foundation; either version 2.1 of
10 * the License, or (at your option) any later version.
11 *
12 * This software is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this software; if not, write to the Free
19 * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
20 * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
21 */
22 package org.jboss.mq;
23
24 import java.io.Serializable;
25
26 import javax.jms.ConnectionConsumer;
27 import javax.jms.Destination;
28 import javax.jms.IllegalStateException;
29 import javax.jms.InvalidDestinationException;
30 import javax.jms.JMSException;
31 import javax.jms.Queue;
32 import javax.jms.QueueConnection;
33 import javax.jms.QueueSession;
34 import javax.jms.ServerSessionPool;
35 import javax.jms.Session;
36 import javax.jms.TemporaryQueue;
37 import javax.jms.TemporaryTopic;
38 import javax.jms.Topic;
39 import javax.jms.TopicConnection;
40 import javax.jms.TopicSession;
41
42 import org.jboss.util.UnreachableStatementException;
43
44 /**
45 * This class implements javax.jms.QueueConnection and
46 * javax.jms.TopicConnection
47 *
48 * @author Norbert Lataille (Norbert.Lataille@m4x.org)
49 * @author Hiram Chirino (Cojonudo14@hotmail.com)
50 * @author <a href="mailto:adrian@jboss.org">Adrian Brock</a>
51 * @version $Revision: 39159 $
52 */
53 public class SpyConnection extends Connection implements Serializable, TopicConnection, QueueConnection
54 {
55 private static final long serialVersionUID = -6227193901482445607L;
56
57 /** Unified */
58 public static final int UNIFIED = 0;
59
60 /** Queue */
61 public static final int QUEUE = 1;
62
63 /** Topic */
64 public static final int TOPIC = 2;
65
66 /** The type of connection */
67 private int type = UNIFIED;
68
69 /**
70 * Create a new SpyConnection
71 *
72 * @param userId the user
73 * @param password the password
74 * @param gcf the constructing class
75 * @throws JMSException for any error
76 */
77 public SpyConnection(String userId, String password, GenericConnectionFactory gcf) throws JMSException
78 {
79 super(userId, password, gcf);
80 }
81
82 /**
83 * Create a new SpyConnection
84 *
85 * @param gcf the constructing class
86 * @throws JMSException for any error
87 */
88 public SpyConnection(GenericConnectionFactory gcf) throws JMSException
89 {
90 super(gcf);
91 }
92
93 /**
94 * Create a new SpyConnection
95 *
96 * @param type the type of connection
97 * @param userId the user
98 * @param password the password
99 * @param gcf the constructing class
100 * @throws JMSException for any error
101 */
102 public SpyConnection(int type, String userId, String password, GenericConnectionFactory gcf) throws JMSException
103 {
104 super(userId, password, gcf);
105 this.type = type;
106 }
107
108 /**
109 * Create a new SpyConnection
110 * @param type the type of connection
111 * @param gcf the constructing class
112 * @throws JMSException for any error
113 */
114 public SpyConnection(int type, GenericConnectionFactory gcf) throws JMSException
115 {
116 super(gcf);
117 this.type = type;
118 }
119
120 public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector,
121 ServerSessionPool sessionPool, int maxMessages) throws JMSException
122 {
123 checkClosed();
124 if (destination == null)
125 throw new InvalidDestinationException("Null destination");
126 checkTemporary(destination);
127
128 return new SpyConnectionConsumer(this, destination, messageSelector, sessionPool, maxMessages);
129 }
130
131 public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException
132 {
133 checkClosed();
134 checkClientID();
135
136 if (transacted)
137 acknowledgeMode = 0;
138 Session session = new SpySession(this, transacted, acknowledgeMode, false);
139
140 //add the new session to the createdSessions list
141 synchronized (createdSessions)
142 {
143 createdSessions.add(session);
144 }
145
146 return session;
147 }
148
149 public TopicSession createTopicSession(boolean transacted, int acknowledgeMode) throws JMSException
150 {
151 checkClosed();
152 checkClientID();
153
154 if (transacted)
155 acknowledgeMode = 0;
156 TopicSession session = new SpyTopicSession(this, transacted, acknowledgeMode);
157
158 //add the new session to the createdSessions list
159 synchronized (createdSessions)
160 {
161 createdSessions.add(session);
162 }
163
164 return session;
165 }
166
167 public ConnectionConsumer createConnectionConsumer(Topic topic, String messageSelector,
168 ServerSessionPool sessionPool, int maxMessages) throws JMSException
169 {
170 checkClosed();
171 if (type == QUEUE)
172 throw new IllegalStateException("Cannot create a topic consumer on a QueueConnection");
173 if (topic == null)
174 throw new InvalidDestinationException("Null topic");
175 checkClientID();
176 checkTemporary(topic);
177
178 return new SpyConnectionConsumer(this, topic, messageSelector, sessionPool, maxMessages);
179 }
180
181 public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName,
182 String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException
183 {
184 checkClosed();
185 if (type == QUEUE)
186 throw new IllegalStateException("Cannot create a topic consumer on a QueueConnection");
187 if (topic == null)
188 throw new InvalidDestinationException("Null topic");
189 if (topic instanceof TemporaryTopic)
190 throw new InvalidDestinationException("Attempt to create a durable subscription for a temporary topic");
191
192 if (subscriptionName == null || subscriptionName.trim().length() == 0)
193 throw new JMSException("Null or empty subscription");
194
195 SpyTopic t = new SpyTopic((SpyTopic) topic, getClientID(), subscriptionName, messageSelector);
196 return new SpyConnectionConsumer(this, t, messageSelector, sessionPool, maxMessages);
197 }
198
199 public ConnectionConsumer createConnectionConsumer(Queue queue, String messageSelector,
200 ServerSessionPool sessionPool, int maxMessages) throws JMSException
201 {
202 checkClosed();
203 if (type == TOPIC)
204 throw new IllegalStateException("Cannot create a queue consumer on a TopicConnection");
205 if (queue == null)
206 throw new InvalidDestinationException("Null queue");
207 checkTemporary(queue);
208
209 return new SpyConnectionConsumer(this, queue, messageSelector, sessionPool, maxMessages);
210 }
211
212 public QueueSession createQueueSession(boolean transacted, int acknowledgeMode) throws JMSException
213 {
214 checkClosed();
215 checkClientID();
216 if (transacted)
217 acknowledgeMode = 0;
218 QueueSession session = new SpyQueueSession(this, transacted, acknowledgeMode);
219
220 //add the new session to the createdSessions list
221 synchronized (createdSessions)
222 {
223 createdSessions.add(session);
224 }
225
226 return session;
227 }
228
229 TemporaryTopic getTemporaryTopic() throws JMSException
230 {
231 checkClosed();
232 checkClientID();
233 try
234 {
235 SpyTemporaryTopic temp = (SpyTemporaryTopic) serverIL.getTemporaryTopic(connectionToken);
236 temp.setConnection(this);
237 synchronized (temps)
238 {
239 temps.add(temp);
240 }
241 return temp;
242 }
243 catch (Throwable t)
244 {
245 SpyJMSException.rethrowAsJMSException("Cannot create a Temporary Topic", t);
246 throw new UnreachableStatementException();
247 }
248 }
249
250 Topic createTopic(String name) throws JMSException
251 {
252 checkClosed();
253 checkClientID();
254 try
255 {
256 return serverIL.createTopic(connectionToken, name);
257 }
258 catch (Throwable t)
259 {
260 SpyJMSException.rethrowAsJMSException("Cannot get the Topic from the provider", t);
261 throw new UnreachableStatementException();
262 }
263 }
264
265 TemporaryQueue getTemporaryQueue() throws JMSException
266 {
267 checkClosed();
268 checkClientID();
269 try
270 {
271 SpyTemporaryQueue temp = (SpyTemporaryQueue) serverIL.getTemporaryQueue(connectionToken);
272 temp.setConnection(this);
273 synchronized (temps)
274 {
275 temps.add(temp);
276 }
277 return temp;
278 }
279 catch (Throwable t)
280 {
281 SpyJMSException.rethrowAsJMSException("Cannot create a Temporary Queue", t);
282 throw new UnreachableStatementException();
283 }
284 }
285
286 Queue createQueue(String name) throws JMSException
287 {
288 checkClosed();
289 checkClientID();
290 try
291 {
292
293 return serverIL.createQueue(connectionToken, name);
294 }
295 catch (Throwable t)
296 {
297 SpyJMSException.rethrowAsJMSException("Cannot get the Queue from the provider", t);
298 throw new UnreachableStatementException();
299 }
300 }
301 }