1 /*
2 * JBoss, the OpenSource J2EE webOS
3 *
4 * Distributable under LGPL license.
5 * See terms of license at gnu.org.
6 */
7 package org.jboss.jms.asf;
8
9 import javax.jms.JMSException;
10 import javax.jms.Message;
11 import javax.jms.MessageListener;
12 import javax.jms.ServerSession;
13 import javax.jms.Session;
14 import javax.jms.XASession;
15 import javax.naming.InitialContext;
16 import javax.transaction.Status;
17 import javax.transaction.Transaction;
18 import javax.transaction.TransactionManager;
19 import javax.transaction.xa.XAResource;
20 import javax.transaction.xa.Xid;
21
22 import org.jboss.logging.Logger;
23 import org.jboss.tm.TransactionManagerService;
24 import org.jboss.tm.XidFactoryMBean;
25
26 /**
27 * An implementation of ServerSession. <p>
28 *
29 * Created: Thu Dec 7 18:25:40 2000
30 *
31 * @author <a href="mailto:peter.antman@tim.se">Peter Antman</a> .
32 * @author <a href="mailto:jason@planet57.com">Jason Dillon</a>
33 * @author <a href="mailto:hiram.chirino@jboss.org">Hiram Chirino</a> .
34 * @version $Revision: 1.14.2.2 $
35 */
36 public class StdServerSession
37 implements Runnable, ServerSession, MessageListener
38 {
39 /**
40 * Instance logger.
41 */
42 static Logger log = Logger.getLogger(StdServerSession.class);
43
44 /**
45 * The server session pool which we belong to.
46 */
47 private StdServerSessionPool serverSessionPool;
48
49 /**
50 * Our session resource.
51 */
52 private Session session;
53
54 /**
55 * Our XA session resource.
56 */
57 private XASession xaSession;
58
59 /**
60 * The transaction manager that we will use for transactions.
61 */
62 private TransactionManager tm;
63
64 /**
65 * Use the session's XAResource directly if we have an JBossMQ XASession.
66 * this allows us to get around the TX timeout problem when you have
67 * extensive message processing.
68 */
69 private boolean useLocalTX;
70
71 /**
72 * The listener to delegate calls, to. In our case the container invoker.
73 */
74 private MessageListener delegateListener;
75
76 private XidFactoryMBean xidFactory;
77
78 /**
79 * Create a <tt>StdServerSession</tt> .
80 *
81 * @param pool The server session pool which we belong to.
82 * @param session Our session resource.
83 * @param xaSession Our XA session resource.
84 * @param delegateListener Listener to call when messages arrives.
85 * @param useLocalTX Will this session be used in a global TX (we can optimize with 1 phase commit)
86 * @throws JMSException Transation manager was not found.
87 * @exception JMSException Description of Exception
88 */
89 StdServerSession(final StdServerSessionPool pool,
90 final Session session,
91 final XASession xaSession,
92 final MessageListener delegateListener,
93 boolean useLocalTX,
94 final XidFactoryMBean xidFactory)
95 throws JMSException
96 {
97 // assert pool != null
98 // assert session != null
99
100 this.serverSessionPool = pool;
101 this.session = session;
102 this.xaSession = xaSession;
103 this.delegateListener = delegateListener;
104 if( xaSession == null )
105 useLocalTX = false;
106 this.useLocalTX = useLocalTX;
107 this.xidFactory = xidFactory;
108
109 if (log.isDebugEnabled())
110 log.debug("initializing (pool, session, xaSession, useLocalTX): " +
111 pool + ", " + session + ", " + xaSession + ", " + useLocalTX);
112
113 // Set out self as message listener
114 if (xaSession != null)
115 xaSession.setMessageListener(this);
116 else
117 session.setMessageListener(this);
118
119 InitialContext ctx = null;
120 try
121 {
122 ctx = new InitialContext();
123 tm = (TransactionManager) ctx.lookup(TransactionManagerService.JNDI_NAME);
124 }
125 catch (Exception e)
126 {
127 throw new JMSException("Transation manager was not found");
128 }
129 finally
130 {
131 if (ctx != null)
132 {
133 try
134 {
135 ctx.close();
136 }
137 catch (Exception ignore)
138 {
139 }
140 }
141 }
142 }
143
144 // --- Impl of JMS standard API
145
146 /**
147 * Returns the session. <p>
148 *
149 * This simply returns what it has fetched from the connection. It is up to
150 * the jms provider to typecast it and have a private API to stuff messages
151 * into it.
152 *
153 * @return The session.
154 * @exception JMSException Description of Exception
155 */
156 public Session getSession() throws JMSException
157 {
158 if (xaSession != null)
159 return xaSession;
160 else
161 return session;
162 }
163
164 //--- Protected parts, used by other in the package
165
166 /**
167 * Runs in an own thread, basically calls the session.run(), it is up to the
168 * session to have been filled with messages and it will run against the
169 * listener set in StdServerSessionPool. When it has send all its messages it
170 * returns.
171 */
172 public void run()
173 {
174 boolean trace = log.isTraceEnabled();
175 if (trace)
176 log.trace("running...");
177 try
178 {
179 if (xaSession != null)
180 xaSession.run();
181 else
182 session.run();
183 }
184 finally
185 {
186 if (trace)
187 log.trace("recycling...");
188
189 recycle();
190
191 if (trace)
192 log.trace("finished run");
193 }
194 }
195
196 /**
197 * Will get called from session for each message stuffed into it.
198 *
199 * Starts a transaction with the TransactionManager
200 * and enlists the XAResource of the JMS XASession if a XASession was
201 * available. A good JMS implementation should provide the XASession for use
202 * in the ASF. So we optimize for the case where we have an XASession. So,
203 * for the case where we do not have an XASession and the bean is not
204 * transacted, we have the unneeded overhead of creating a Transaction. I'm
205 * leaving it this way since it keeps the code simpler and that case should
206 * not be too common (JBossMQ provides XASessions).
207 */
208 public void onMessage(Message msg)
209 {
210 boolean trace = log.isTraceEnabled();
211 if( trace )
212 log.trace("onMessage running (pool, session, xaSession, useLocalTX): " +
213 ", " + session + ", " + xaSession + ", " + useLocalTX);
214
215 // Used if run with useLocalTX if true
216 Xid localXid = null;
217 boolean localRollbackFlag=false;
218 // Used if run with useLocalTX if false
219 Transaction trans = null;
220 try
221 {
222
223 if (useLocalTX)
224 {
225 // Use JBossMQ One Phase Commit to commit the TX
226 localXid = xidFactory.newXid();//new XidImpl();
227 XAResource res = xaSession.getXAResource();
228 res.start(localXid, XAResource.TMNOFLAGS);
229
230 if( trace )
231 log.trace("Using optimized 1p commit to control TX.");
232 }
233 else
234 {
235
236 // Use the TM to control the TX
237 tm.begin();
238 trans = tm.getTransaction();
239
240 if (xaSession != null)
241 {
242 XAResource res = xaSession.getXAResource();
243 if (!trans.enlistResource(res))
244 {
245 throw new JMSException("could not enlist resource");
246 }
247 if( trace )
248 log.trace("XAResource '" + res + "' enlisted.");
249 }
250 }
251 //currentTransactionId = connection.spyXAResourceManager.startTx();
252
253 // run the session
254 //session.run();
255 // Call delegate listener
256 delegateListener.onMessage(msg);
257 }
258 catch (Exception e)
259 {
260 log.error("session failed to run; setting rollback only", e);
261
262 if (useLocalTX)
263 {
264 // Use JBossMQ One Phase Commit to commit the TX
265 localRollbackFlag = true;
266 }
267 else
268 {
269 // Mark for tollback TX via TM
270 try
271 {
272 // The transaction will be rolledback in the finally
273 if( trace )
274 log.trace("Using TM to mark TX for rollback.");
275 trans.setRollbackOnly();
276 }
277 catch (Exception x)
278 {
279 log.error("failed to set rollback only", x);
280 }
281 }
282
283 }
284 finally
285 {
286 try
287 {
288 if (useLocalTX)
289 {
290 if( localRollbackFlag == true )
291 {
292 if( trace )
293 log.trace("Using optimized 1p commit to rollback TX.");
294
295 XAResource res = xaSession.getXAResource();
296 res.end(localXid, XAResource.TMSUCCESS);
297 res.rollback(localXid);
298
299 }
300 else
301 {
302 if( trace )
303 log.trace("Using optimized 1p commit to commit TX.");
304
305 XAResource res = xaSession.getXAResource();
306 res.end(localXid, XAResource.TMSUCCESS);
307 res.commit(localXid, true);
308 }
309 }
310 else
311 {
312 // Use the TM to commit the Tx (assert the correct association)
313 Transaction currentTx = tm.getTransaction();
314 if (trans.equals(currentTx) == false)
315 throw new IllegalStateException("Wrong tx association: expected " + trans + " was " + currentTx);
316
317 // Marked rollback
318 if (trans.getStatus() == Status.STATUS_MARKED_ROLLBACK)
319 {
320 if( trace )
321 log.trace("Rolling back JMS transaction");
322 // actually roll it back
323 tm.rollback();
324
325 // NO XASession? then manually rollback.
326 // This is not so good but
327 // it's the best we can do if we have no XASession.
328 if (xaSession == null && serverSessionPool.isTransacted())
329 {
330 session.rollback();
331 }
332 }
333 else if (trans.getStatus() == Status.STATUS_ACTIVE)
334 {
335 // Commit tx
336 // This will happen if
337 // a) everything goes well
338 // b) app. exception was thrown
339 if( trace )
340 log.trace("Commiting the JMS transaction");
341 tm.commit();
342
343 // NO XASession? then manually commit. This is not so good but
344 // it's the best we can do if we have no XASession.
345 if (xaSession == null && serverSessionPool.isTransacted())
346 {
347 session.commit();
348 }
349 }
350 }
351 }
352 catch (Exception e)
353 {
354 log.error("failed to commit/rollback", e);
355 }
356 }
357 if( trace )
358 log.trace("onMessage done");
359 }
360
361 /**
362 * Start the session and begin consuming messages.
363 *
364 * @throws JMSException No listener has been specified.
365 */
366 public void start() throws JMSException
367 {
368 log.trace("starting invokes on server session");
369
370 if (session != null)
371 {
372 try
373 {
374 serverSessionPool.getExecutor().execute(this);
375 }
376 catch (InterruptedException ignore)
377 {
378 }
379 }
380 else
381 {
382 throw new JMSException("No listener has been specified");
383 }
384 }
385
386 /**
387 * Called by the ServerSessionPool when the sessions should be closed.
388 */
389 void close()
390 {
391 if (session != null)
392 {
393 try
394 {
395 session.close();
396 }
397 catch (Exception ignore)
398 {
399 }
400
401 session = null;
402 }
403
404 if (xaSession != null)
405 {
406 try
407 {
408 xaSession.close();
409 }
410 catch (Exception ignore)
411 {
412 }
413 xaSession = null;
414 }
415
416 log.debug("closed");
417 }
418
419 /**
420 * This method is called by the ServerSessionPool when it is ready to be
421 * recycled intot the pool
422 */
423 void recycle()
424 {
425 serverSessionPool.recycle(this);
426 }
427
428 }
429