Docjar: A Java Source and Docuemnt Enginecom.*    java.*    javax.*    org.*    all    new    plug-in

Quick Search    Search Deep

Source code: org/activemq/ra/ServerSessionImpl.java


1   /** 
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  package org.activemq.ra;
19  
20  import java.lang.reflect.Method;
21  
22  import javax.jms.JMSException;
23  import javax.jms.Message;
24  import javax.jms.MessageListener;
25  import javax.jms.MessageProducer;
26  import javax.jms.ServerSession;
27  import javax.jms.Session;
28  import javax.resource.spi.endpoint.MessageEndpoint;
29  import javax.resource.spi.work.Work;
30  import javax.resource.spi.work.WorkEvent;
31  import javax.resource.spi.work.WorkException;
32  import javax.resource.spi.work.WorkListener;
33  import javax.resource.spi.work.WorkManager;
34  
35  import org.activemq.ActiveMQSession;
36  import org.activemq.ActiveMQSession.DeliveryListener;
37  import org.activemq.util.JMSExceptionHelper;
38  import org.apache.commons.logging.Log;
39  import org.apache.commons.logging.LogFactory;
40  
41  /**
42   * @version $Revision: 1.1.1.1 $
43   */
44  public class ServerSessionImpl implements ServerSession, SessionAndProducer, Work, DeliveryListener {
45  
46      public static final Method ON_MESSAGE_METHOD;
47  
48      static {
49          try {
50              ON_MESSAGE_METHOD = MessageListener.class.getMethod("onMessage", new Class[]{Message.class});
51          }
52          catch (Exception e) {
53              throw new ExceptionInInitializerError(e);
54          }
55      }
56  
57      private static int nextLogId=0;
58      synchronized static private int getNextLogId() {
59          return nextLogId++;
60      }
61  
62      private int serverSessionId = getNextLogId();
63      private final Log log = LogFactory.getLog( ServerSessionImpl.class.getName()+":"+serverSessionId );
64      
65      private ActiveMQSession session;
66      private WorkManager workManager;
67      private MessageEndpoint endpoint;
68      private MessageProducer messageProducer;
69      private final ServerSessionPoolImpl pool;
70  
71      private Object runControlMutex = new Object();
72      private boolean runningFlag = false;
73      /** 
74       * True if an error was detected that cause this session to be stale.  When a session 
75       * is stale, it should not be used again for proccessing.
76       */
77      private boolean stale;
78      /**
79       * Does the TX commit need to be managed by the RA?
80       */
81      private final boolean useRAManagedTx;
82      /**
83       * The maximum number of messages to batch
84       */
85      private final int batchSize;
86      /**
87       * The current number of messages in the batch
88       */
89      private int currentBatchSize;
90  
91      public ServerSessionImpl(ServerSessionPoolImpl pool, ActiveMQSession session, WorkManager workManager, MessageEndpoint endpoint, boolean useRAManagedTx, int batchSize) throws JMSException {
92          this.pool = pool;
93          this.session = session;
94          this.workManager = workManager;
95          this.endpoint = endpoint;
96          this.useRAManagedTx = useRAManagedTx;
97          this.session.setMessageListener((MessageListener) endpoint);
98          this.session.setDeliveryListener(this);
99          this.batchSize = batchSize;
100     }
101 
102     public Session getSession() throws JMSException {
103         return session;
104     }
105 
106     public MessageProducer getMessageProducer() throws JMSException {
107         if (messageProducer == null) {
108             messageProducer = getSession().createProducer(null);
109         }
110         return messageProducer;
111     }
112 
113     /**
114      * @see javax.jms.ServerSession#start()
115      */
116     public void start() throws JMSException {
117 
118         synchronized (runControlMutex) {
119             if (runningFlag) {
120                 log.debug("Start request ignored, allready running.");
121                 return;
122             }
123             runningFlag = true;
124         }
125 
126         // We get here because we need to start a async worker.
127         log.debug("Starting run.");
128         try {
129             workManager.scheduleWork(this, WorkManager.INDEFINITE, null,
130                     new WorkListener() {
131                         //The work listener is useful only for debugging...
132                         public void workAccepted(WorkEvent event) {
133                             log.debug("Work accepted: " + event);
134                         }
135 
136                         public void workRejected(WorkEvent event) {
137                             log.debug("Work rejected: " + event);
138                         }
139 
140                         public void workStarted(WorkEvent event) {
141                             log.debug("Work started: " + event);
142                         }
143 
144                         public void workCompleted(WorkEvent event) {
145                             log.debug("Work completed: " + event);
146                         }
147 
148                     });
149         }
150         catch (WorkException e) {
151             throw (JMSException) new JMSException("Start failed: " + e).initCause(e);
152         }
153     }
154 
155     /**
156      * @see java.lang.Runnable#run()
157      */
158     synchronized public void run() {
159         log.debug("Running"); 
160         while (true) {
161             log.debug("run loop start");            
162             try {
163                 SessionAndProducerHelper.register(this);                
164                 currentBatchSize = 0;
165                 session.run();
166             }
167             catch (Throwable e) {
168                 stale=true;
169                 log.debug("Endpoint failed to process message.", e);
170                 log.info("Endpoint failed to process message. Reason: " + e);
171             }            
172             finally {
173                 SessionAndProducerHelper.unregister(this);                
174                 log.debug("run loop end");            
175                 synchronized (runControlMutex) {
176                     // This endpoint may have gone stale due to error
177                     if( stale) {
178                         runningFlag = false;
179                         pool.removeFromPool(this);
180                         break;
181                     }
182                     if( !session.hasUncomsumedMessages() ) {
183                         runningFlag = false;
184                         pool.returnToPool(this);
185                         break;
186                     }                
187                 }
188             }
189         }
190         log.debug("Run finished");
191     }
192 
193 
194     /**
195      * The ActiveMQSession's run method will call back to this method before 
196      * dispactching a message to the MessageListener.
197      */
198     public void beforeDelivery(ActiveMQSession session, Message msg) {
199       if (currentBatchSize == 0) {
200           try {
201             endpoint.beforeDelivery(ON_MESSAGE_METHOD);
202           } catch (Throwable e) {
203               throw new RuntimeException("Endpoint before delivery notification failure", e);
204           }
205         }
206     }
207 
208     /**
209      * The ActiveMQSession's run method will call back to this method after 
210      * dispactching a message to the MessageListener.
211      */
212     public void afterDelivery(ActiveMQSession session, Message msg) {
213       if (++currentBatchSize >= batchSize || !session.hasUncomsumedMessages()) {
214         currentBatchSize = 0;
215           try {
216             endpoint.afterDelivery();
217           } catch (Throwable e) {
218               throw new RuntimeException("Endpoint after delivery notification failure", e);
219           } finally {
220               if( session.getTransactionContext().isInLocalTransaction() ) {
221                   if( !useRAManagedTx ) {
222                       // Sanitiy Check: If the local transaction has not been commited..
223                       // Commit it now.
224                       log.warn("Local transaction had not been commited.  Commiting now.");
225                   }
226                   try {
227                       session.commit();
228                   } catch (JMSException e) {
229                       log.info("Commit failed:", e);
230                   }
231               }
232           }
233       }
234     }
235 
236     /**
237      * @see javax.resource.spi.work.Work#release()
238      */
239     public void release() {
240         log.debug("release called");
241     }
242 
243     /**
244      * @see java.lang.Object#toString()
245      */
246     public String toString() {
247         return "ServerSessionImpl:"+serverSessionId;
248     }
249 
250     public void close() {
251         try {
252             endpoint.release();
253         } catch (Throwable e) {
254             log.debug("Endpoint did not release properly: "+e,e);
255         }
256         try {
257             session.close();
258         } catch (Throwable e) {
259             log.debug("Session did not close properly: "+e,e);
260         }
261     }
262 
263 }