1 /*
2 * JBoss, Home of Professional Open Source.
3 * Copyright 2006, Red Hat Middleware LLC, and individual contributors
4 * as indicated by the @author tags. See the copyright.txt file in the
5 * distribution for a full listing of individual contributors.
6 *
7 * This is free software; you can redistribute it and/or modify it
8 * under the terms of the GNU Lesser General Public License as
9 * published by the Free Software Foundation; either version 2.1 of
10 * the License, or (at your option) any later version.
11 *
12 * This software is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this software; if not, write to the Free
19 * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
20 * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
21 */
22 package org.jboss.resource.adapter.jms.inflow;
23
24 import java.util.ArrayList;
25
26 import javax.jms.Connection;
27 import javax.jms.ConnectionConsumer;
28 import javax.jms.JMSException;
29 import javax.jms.Queue;
30 import javax.jms.ServerSession;
31 import javax.jms.ServerSessionPool;
32 import javax.jms.Topic;
33
34 import org.jboss.logging.Logger;
35
36 /**
37 * A generic jms session pool.
38 *
39 * @author <a href="adrian@jboss.com">Adrian Brock</a>
40 * @version $Revision: 72163 $
41 */
42 public class JmsServerSessionPool implements ServerSessionPool
43 {
44 /** The logger */
45 private static final Logger log = Logger.getLogger(JmsServerSessionPool.class);
46
47 /** The activation */
48 JmsActivation activation;
49
50 /** The consumer */
51 ConnectionConsumer consumer;
52
53 /** The server sessions */
54 ArrayList serverSessions = new ArrayList();
55
56 /** Whether the pool is stopped */
57 boolean stopped = false;
58
59 /** The number of sessions */
60 int sessionCount = 0;
61
62
63 /**
64 * Create a new session pool
65 *
66 * @param activation the jms activation
67 */
68 public JmsServerSessionPool(JmsActivation activation)
69 {
70 this.activation = activation;
71 }
72
73 /**
74 * @return the activation
75 */
76 public JmsActivation getActivation()
77 {
78 return activation;
79 }
80
81 /**
82 * Start the server session pool
83 *
84 * @throws Exeption for any error
85 */
86 public void start() throws Exception
87 {
88 setupSessions();
89 setupConsumer();
90 }
91
92 /**
93 * Stop the server session pool
94 */
95 public void stop()
96 {
97 teardownConsumer();
98 teardownSessions();
99 }
100
101 public ServerSession getServerSession() throws JMSException
102 {
103 boolean trace = log.isTraceEnabled();
104 if (trace)
105 log.trace("getServerSession");
106
107 ServerSession result = null;
108
109 try
110 {
111 synchronized (serverSessions)
112 {
113 while (true)
114 {
115 int sessionsSize = serverSessions.size();
116
117 if (stopped)
118 throw new Exception("Cannot get a server session after the pool is stopped");
119
120 else if (sessionsSize > 0)
121 {
122 result = (ServerSession) serverSessions.remove(sessionsSize-1);
123 break;
124 }
125
126 else
127 {
128 try
129 {
130 serverSessions.wait();
131 }
132 catch (InterruptedException ignored)
133 {
134 }
135 }
136 }
137 }
138 }
139 catch (Throwable t)
140 {
141 throw new JMSException("Unable to get a server session " + t);
142 }
143
144 if (trace)
145 log.trace("Returning server session " + result);
146
147 return result;
148 }
149
150 /**
151 * Return the server session
152 *
153 * @param session the session
154 */
155 protected void returnServerSession(JmsServerSession session)
156 {
157 synchronized (serverSessions)
158 {
159 if (stopped)
160 {
161 session.teardown();
162 --sessionCount;
163 }
164 else
165 serverSessions.add(session);
166 serverSessions.notifyAll();
167 }
168 }
169
170 /**
171 * Setup the sessions
172 *
173 * @throws Exeption for any error
174 */
175 protected void setupSessions() throws Exception
176 {
177 JmsActivationSpec spec = activation.getActivationSpec();
178 ArrayList clonedSessions = null;
179
180 // Create the sessions
181 synchronized (serverSessions)
182 {
183 for (int i = 0; i < spec.getMaxSessionInt(); ++i)
184 {
185 JmsServerSession session = new JmsServerSession(this);
186 serverSessions.add(session);
187 }
188 sessionCount = serverSessions.size();
189 clonedSessions = (ArrayList) serverSessions.clone();
190
191 }
192
193 // Start the sessions
194 for (int i = 0; i < clonedSessions.size(); ++ i)
195 {
196 JmsServerSession session = (JmsServerSession) clonedSessions.get(i);
197 session.setup();
198 }
199 }
200
201 /**
202 * Stop the sessions
203 */
204 protected void teardownSessions()
205 {
206 synchronized (serverSessions)
207 {
208 // Disallow any new sessions
209 stopped = true;
210 serverSessions.notifyAll();
211
212 // Stop inactive sessions
213 for (int i = 0; i < serverSessions.size(); ++i)
214 {
215 JmsServerSession session = (JmsServerSession) serverSessions.get(i);
216 session.teardown();
217 --sessionCount;
218 }
219
220 serverSessions.clear();
221
222 if (activation.getActivationSpec().isForceClearOnShutdown())
223 {
224 int attempts = 0;
225 int forceClearAttempts = activation.getActivationSpec().getForceClearAttempts();
226 long forceClearInterval = activation.getActivationSpec().getForceClearOnShutdownInterval();
227
228 log.trace(this + " force clear behavior in effect. Waiting for " + forceClearInterval + " milliseconds for " + forceClearAttempts + " attempts.");
229
230 while((sessionCount > 0) && (attempts < forceClearAttempts))
231 {
232 try
233 {
234 int currentSessions = sessionCount;
235 serverSessions.wait(forceClearInterval);
236 // Number of session didn't change
237 if (sessionCount == currentSessions)
238 {
239 ++attempts;
240 log.trace(this + " clear attempt failed " + attempts);
241 }
242 }
243 catch(InterruptedException ignore)
244 {
245 }
246
247 }
248 }
249 else
250 {
251 // Wait for inuse sessions
252 while (sessionCount > 0)
253 {
254 try
255 {
256 serverSessions.wait();
257 }
258 catch (InterruptedException ignore)
259 {
260 }
261 }
262 }
263 }
264 }
265
266 /**
267 * Setup the connection consumer
268 *
269 * @throws Exeption for any error
270 */
271 protected void setupConsumer() throws Exception
272 {
273 Connection connection = activation.getConnection();
274 JmsActivationSpec spec = activation.getActivationSpec();
275 String selector = spec.getMessageSelector();
276 int maxMessages = spec.getMaxMessagesInt();
277 if (spec.isTopic())
278 {
279 Topic topic = (Topic) activation.getDestination();
280 String subscriptionName = spec.getSubscriptionName();
281 if (spec.isDurable())
282 consumer = connection.createDurableConnectionConsumer(topic, subscriptionName, selector, this, maxMessages);
283 else
284 consumer = connection.createConnectionConsumer(topic, selector, this, maxMessages);
285 }
286 else
287 {
288 Queue queue = (Queue) activation.getDestination();
289 consumer = connection.createConnectionConsumer(queue, selector, this, maxMessages);
290 }
291 log.debug("Created consumer " + consumer);
292 }
293
294 /**
295 * Stop the connection consumer
296 */
297 protected void teardownConsumer()
298 {
299 try
300 {
301 if (consumer != null)
302 {
303 log.debug("Closing the " + consumer);
304 consumer.close();
305 }
306 }
307 catch (Throwable t)
308 {
309 log.debug("Error closing the consumer " + consumer, t);
310 }
311 }
312
313 }