Source code: org/activemq/ra/ActiveMQManagedConnection.java
1 /**
2 *
3 * Copyright 2004 Hiram Chirino
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18 package org.activemq.ra;
19
20 import java.io.PrintWriter;
21 import java.util.ArrayList;
22 import java.util.Iterator;
23
24 import javax.jms.Connection;
25 import javax.jms.ExceptionListener;
26 import javax.jms.JMSException;
27 import javax.resource.ResourceException;
28 import javax.resource.spi.ConnectionEvent;
29 import javax.resource.spi.ConnectionEventListener;
30 import javax.resource.spi.ConnectionRequestInfo;
31 import javax.resource.spi.LocalTransaction;
32 import javax.resource.spi.ManagedConnection;
33 import javax.resource.spi.ManagedConnectionMetaData;
34 import javax.security.auth.Subject;
35 import javax.transaction.xa.XAResource;
36
37 import org.apache.commons.logging.Log;
38 import org.apache.commons.logging.LogFactory;
39 import org.activemq.ActiveMQConnection;
40 import org.activemq.LocalTransactionEventListener;
41 import org.activemq.TransactionContext;
42
43 /**
44 * ActiveMQManagedConnection maps to real physical connection to the
45 * server. Since a ManagedConnection has to provide a transaction
46 * managment interface to the physical connection, and sessions
47 * are the objects implement transaction managment interfaces in
48 * the JMS API, this object also maps to a singe physical JMS session.
49 * <p/>
50 * The side-effect is that JMS connection the application gets
51 * will allways create the same session object. This is good if
52 * running in an app server since the sessions are elisted in the
53 * context transaction. This is bad if used outside of an app
54 * server since the user may be trying to create 2 different
55 * sessions to coordinate 2 different uow.
56 *
57 * @version $Revision: 1.1.1.1 $
58 */
59 public class ActiveMQManagedConnection implements ManagedConnection, ExceptionListener { // TODO: , DissociatableManagedConnection {
60
61 private static final Log log = LogFactory.getLog(ActiveMQManagedConnection.class);
62
63 private PrintWriter logWriter;
64
65 private final ActiveMQConnection physicalConnection;
66 private final TransactionContext transactionContext;
67 private final ArrayList proxyConnections = new ArrayList();
68 private final ArrayList listeners = new ArrayList();
69 private final LocalAndXATransaction localAndXATransaction;
70
71 private Subject subject;
72 private ActiveMQConnectionRequestInfo info;
73 private boolean destoryed;
74
75 public ActiveMQManagedConnection(Subject subject, ActiveMQConnection physicalConnection, ActiveMQConnectionRequestInfo info) throws ResourceException {
76 try {
77 this.subject = subject;
78 this.info = info;
79 this.physicalConnection = physicalConnection;
80 this.transactionContext = new TransactionContext(physicalConnection);
81
82 this.localAndXATransaction = new LocalAndXATransaction(transactionContext) {
83 public void setInManagedTx(boolean inManagedTx) throws JMSException {
84 super.setInManagedTx(inManagedTx);
85 Iterator iterator = proxyConnections.iterator();
86 while (iterator.hasNext()) {
87 JMSConnectionProxy proxy = (JMSConnectionProxy) iterator.next();
88 proxy.setUseSharedTxContext(inManagedTx);
89 }
90 }
91 };
92
93 this.transactionContext.setLocalTransactionEventListener( new LocalTransactionEventListener() {
94 public void beginEvent() {
95 fireBeginEvent();
96 }
97 public void commitEvent() {
98 fireCommitEvent();
99 }
100 public void rollbackEvent() {
101 fireRollbackEvent();
102 }
103 });
104
105 physicalConnection.setExceptionListener(this);
106 } catch (JMSException e) {
107 throw new ResourceException("Could not create a new connection: "+e.getMessage(), e);
108 }
109 }
110
111 public boolean isInManagedTx() {
112 return localAndXATransaction.isInManagedTx();
113 }
114
115 static public boolean matches(Object x, Object y) {
116 if (x == null ^ y == null) {
117 return false;
118 }
119 if (x != null && !x.equals(y)) {
120 return false;
121 }
122 return true;
123 }
124
125 public void associate(Subject subject, ActiveMQConnectionRequestInfo info) throws JMSException {
126
127 // Do we need to change the associated userid/password
128 if( !matches(info.getUserName(), this.info.getUserName()) || !matches(info.getPassword(), this.info.getPassword()) ) {
129 ((ActiveMQConnection)physicalConnection).changeUserInfo(info.getUserName(), info.getPassword());
130 }
131
132 // Do we need to set the clientId?
133 if( info.getClientid()!=null && info.getClientid().length()>0 )
134 physicalConnection.setClientID(info.getClientid());
135
136 this.subject = subject;
137 this.info = info;
138 }
139
140 public Connection getPhysicalConnection() {
141 return physicalConnection;
142 }
143
144 private void fireBeginEvent() {
145 ConnectionEvent event = new ConnectionEvent(ActiveMQManagedConnection.this,
146 ConnectionEvent.LOCAL_TRANSACTION_STARTED);
147 Iterator iterator = listeners.iterator();
148 while (iterator.hasNext()) {
149 ConnectionEventListener l = (ConnectionEventListener) iterator.next();
150 l.localTransactionStarted(event);
151 }
152 }
153
154 private void fireCommitEvent() {
155 ConnectionEvent event = new ConnectionEvent(ActiveMQManagedConnection.this,
156 ConnectionEvent.LOCAL_TRANSACTION_COMMITTED);
157 Iterator iterator = listeners.iterator();
158 while (iterator.hasNext()) {
159 ConnectionEventListener l = (ConnectionEventListener) iterator.next();
160 l.localTransactionCommitted(event);
161 }
162 }
163
164 private void fireRollbackEvent() {
165 ConnectionEvent event = new ConnectionEvent(ActiveMQManagedConnection.this,
166 ConnectionEvent.LOCAL_TRANSACTION_ROLLEDBACK);
167 Iterator iterator = listeners.iterator();
168 while (iterator.hasNext()) {
169 ConnectionEventListener l = (ConnectionEventListener) iterator.next();
170 l.localTransactionRolledback(event);
171 }
172 }
173
174 private void fireCloseEvent(JMSConnectionProxy proxy) {
175 ConnectionEvent event = new ConnectionEvent(ActiveMQManagedConnection.this,
176 ConnectionEvent.CONNECTION_CLOSED);
177 event.setConnectionHandle(proxy);
178
179 Iterator iterator = listeners.iterator();
180 while (iterator.hasNext()) {
181 ConnectionEventListener l = (ConnectionEventListener) iterator.next();
182 l.connectionClosed(event);
183 }
184 }
185
186 private void fireErrorOccurredEvent(Exception error) {
187 ConnectionEvent event = new ConnectionEvent(ActiveMQManagedConnection.this,
188 ConnectionEvent.CONNECTION_ERROR_OCCURRED, error);
189 Iterator iterator = listeners.iterator();
190 while (iterator.hasNext()) {
191 ConnectionEventListener l = (ConnectionEventListener) iterator.next();
192 l.connectionErrorOccurred(event);
193 }
194 }
195
196 /**
197 * @see javax.resource.spi.ManagedConnection#getConnection(javax.security.auth.Subject,
198 * javax.resource.spi.ConnectionRequestInfo)
199 */
200 public Object getConnection(Subject subject, ConnectionRequestInfo info)
201 throws ResourceException {
202 JMSConnectionProxy proxy = new JMSConnectionProxy(this);
203 proxyConnections.add(proxy);
204 return proxy;
205 }
206
207 private boolean isDestroyed() {
208 return destoryed;
209 }
210
211 /**
212 * Close down the physical connection to the server.
213 *
214 * @see javax.resource.spi.ManagedConnection#destroy()
215 */
216 public void destroy() throws ResourceException {
217 // Have we allready been destroyed??
218 if (isDestroyed()) {
219 return;
220 }
221
222 cleanup();
223
224 try {
225 physicalConnection.close();
226 destoryed = true;
227 } catch (JMSException e) {
228 log.info("Error occured during close of a JMS connection.", e);
229 }
230 }
231
232 /**
233 * Cleans up all proxy handles attached to this physical connection so that
234 * they cannot be used anymore.
235 *
236 * @see javax.resource.spi.ManagedConnection#cleanup()
237 */
238 public void cleanup() throws ResourceException {
239
240 // Have we allready been destroyed??
241 if (isDestroyed()) {
242 return;
243 }
244
245 Iterator iterator = proxyConnections.iterator();
246 while (iterator.hasNext()) {
247 JMSConnectionProxy proxy = (JMSConnectionProxy) iterator.next();
248 proxy.cleanup();
249 }
250 proxyConnections.clear();
251
252 try {
253 ((ActiveMQConnection)physicalConnection).cleanup();
254 } catch (JMSException e) {
255 throw new ResourceException("Could cleanup the ActiveMQ connection: "+e, e);
256 }
257
258 }
259
260 /**
261 * @see javax.resource.spi.ManagedConnection#associateConnection(java.lang.Object)
262 */
263 public void associateConnection(Object connection) throws ResourceException {
264 throw new ResourceException("Not supported.");
265 }
266
267 /**
268 * @see javax.resource.spi.ManagedConnection#addConnectionEventListener(javax.resource.spi.ConnectionEventListener)
269 */
270 public void addConnectionEventListener(ConnectionEventListener listener) {
271 listeners.add(listener);
272 }
273
274 /**
275 * @see javax.resource.spi.ManagedConnection#removeConnectionEventListener(javax.resource.spi.ConnectionEventListener)
276 */
277 public void removeConnectionEventListener(ConnectionEventListener listener) {
278 listeners.remove(listener);
279 }
280
281 /**
282 * @see javax.resource.spi.ManagedConnection#getXAResource()
283 */
284 public XAResource getXAResource() throws ResourceException {
285 return localAndXATransaction;
286 }
287
288 /**
289 * @see javax.resource.spi.ManagedConnection#getLocalTransaction()
290 */
291 public LocalTransaction getLocalTransaction() throws ResourceException {
292 return localAndXATransaction;
293 }
294
295 /**
296 * @see javax.resource.spi.ManagedConnection#getMetaData()
297 */
298 public ManagedConnectionMetaData getMetaData() throws ResourceException {
299 return new ManagedConnectionMetaData() {
300
301 public String getEISProductName() throws ResourceException {
302 if (physicalConnection == null) {
303 throw new ResourceException("Not connected.");
304 }
305 try {
306 return physicalConnection.getMetaData().getJMSProviderName();
307 }
308 catch (JMSException e) {
309 throw new ResourceException("Error accessing provider.", e);
310 }
311 }
312
313 public String getEISProductVersion() throws ResourceException {
314 if (physicalConnection == null) {
315 throw new ResourceException("Not connected.");
316 }
317 try {
318 return physicalConnection.getMetaData().getProviderVersion();
319 }
320 catch (JMSException e) {
321 throw new ResourceException("Error accessing provider.", e);
322 }
323 }
324
325 public int getMaxConnections() throws ResourceException {
326 if (physicalConnection == null) {
327 throw new ResourceException("Not connected.");
328 }
329 return Integer.MAX_VALUE;
330 }
331
332 public String getUserName() throws ResourceException {
333 if (physicalConnection == null) {
334 throw new ResourceException("Not connected.");
335 }
336 try {
337 return physicalConnection.getClientID();
338 }
339 catch (JMSException e) {
340 throw new ResourceException("Error accessing provider.", e);
341 }
342 }
343 };
344 }
345
346 /**
347 * @see javax.resource.spi.ManagedConnection#setLogWriter(java.io.PrintWriter)
348 */
349 public void setLogWriter(PrintWriter logWriter) throws ResourceException {
350 this.logWriter = logWriter;
351 }
352
353 /**
354 * @see javax.resource.spi.ManagedConnection#getLogWriter()
355 */
356 public PrintWriter getLogWriter() throws ResourceException {
357 return logWriter;
358 }
359
360 /**
361 * @param subject
362 * @param info
363 * @return
364 */
365 public boolean matches(Subject subject, ConnectionRequestInfo info) {
366
367 // Check to see if it is our info class
368 if (info == null) {
369 return false;
370 }
371 if (info.getClass() != ActiveMQConnectionRequestInfo.class) {
372 return false;
373 }
374
375 // Do the subjects match?
376 if (subject == null ^ this.subject == null) {
377 return false;
378 }
379 if (subject != null && !subject.equals(this.subject)) {
380 return false;
381 }
382
383 // Does the info match?
384 return info.equals(this.info);
385 }
386
387 /**
388 * When a proxy is closed this cleans up the proxy and notifys the
389 * ConnectionEventListeners that a connection closed.
390 *
391 * @param proxy
392 */
393 public void proxyClosedEvent(JMSConnectionProxy proxy) {
394 proxyConnections.remove(proxy);
395 proxy.cleanup();
396 fireCloseEvent(proxy);
397 }
398
399 public void onException(JMSException e) {
400 log.warn("Connection failed: "+e);
401 log.debug("Cause: ", e);
402
403 // Let any active proxy connections know that exception occured.
404 for (Iterator iter = proxyConnections.iterator(); iter.hasNext();) {
405 JMSConnectionProxy proxy = (JMSConnectionProxy) iter.next();
406 proxy.onException(e);
407 }
408 // Let the container know that the error occured.
409 fireErrorOccurredEvent(e);
410 }
411
412 /**
413 * @return Returns the transactionContext.
414 */
415 public TransactionContext getTransactionContext() {
416 return transactionContext;
417 }
418
419 }