Source code: com/presumo/jms/JmsServer.java
1 /**
2 * This file is part of Presumo.
3 *
4 * Presumo is free software; you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License as published by
6 * the Free Software Foundation; either version 2 of the License, or
7 * (at your option) any later version.
8 *
9 * Presumo is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public License
15 * along with Presumo; if not, write to the Free Software
16 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17 *
18 *
19 * Copyright (c) 2001, 2002 Dan Greff
20 */
21 package com.presumo.jms;
22
23 import com.presumo.util.config.Configuration;
24 import com.presumo.util.config.Preferences;
25 import com.presumo.jms.persistence.PersistentQueue;
26 import com.presumo.jms.plugin.implementation.MemoryMessageQueue;
27 import com.presumo.jms.plugin.MessageQueue;
28 import com.presumo.jms.plugin.transport.ServerTransport;
29 import com.presumo.jms.plugin.transport.Transport;
30 import com.presumo.jms.resources.Resources;
31 import com.presumo.jms.router.RemoteSession;
32 import com.presumo.jms.router.ConnectionListener;
33 import com.presumo.jms.router.Router;
34
35 import com.presumo.util.log.Logger;
36 import com.presumo.util.log.LoggerFactory;
37
38 import java.io.BufferedReader;
39 import java.io.InputStreamReader;
40 import java.io.IOException;
41 import java.io.File;
42 import java.io.FileInputStream;
43
44 import java.util.HashMap;
45 import java.util.Map;
46 import java.util.Set;
47 import java.util.StringTokenizer;
48 import java.util.Properties;
49
50 import javax.jms.JMSException;
51
52 /**
53 * <p>
54 * Encapsulation of a Presumo JMS server. Any application may start
55 * a JMS server within their application with the following steps.
56 * <ol>
57 * <li>Instantiate the server with the constructor that is appropriate
58 * to your application's needs.
59 * <li>Start the server with <code>startup()</code>
60 * <li>Start every type of server listener that is needed. To start
61 * a tcp socket listener on <code>127.0.0.1:2323</code> call:
62 * <ul><li>
63 * <code>startServerTransport("tcp://127.0.0.1:2323")</code>.
64 * </li></ul>
65 * <li>Connect the server to other servers if needed. To connect
66 * to a server at 192.168.1.1 running a TCP listener on port 2323
67 * you would call:
68 * <ul><li>
69 * <code>startClientTransport("tcp://192.168.1.1:2323")</code>
70 * </li></ul>
71 * You may connect to multiple servers.
72 * </ol>
73 * <p>
74 * Currently, only TCP/IP connections are supported.
75 * <p>
76 * If you want to use Presumo for intra-JVM communication you do not
77 * need to create an instance of this class. All message consumers
78 * and producers created from the same <code>javax.jms.Connection</code>
79 * can communicate with each other without a server. Only producers
80 * and consumers created from different <code>javax.jms.Connection's</code>
81 * will need a server to communicate.
82 * </p>
83 *
84 * @author Dan Greff
85 */
86 public class JmsServer
87 {
88
89 /**
90 * The directory to store messages queues which will contain persistent
91 * messages. If set to <code>null</code> the JmsServer will not
92 * write anything to disk and persistent messages will not be much more
93 * reliable than non-persistent messages.
94 */
95 protected final String persistentDir;
96
97 /**
98 * If using persistent messages, the files created by Presumo will have
99 * this prefix.
100 */
101 protected final String persistentPrefix;
102
103 /**
104 * Once the persistent queue size on disk (in bytes) is greater than
105 * this value it will be compressed and internal fragmentation caused
106 * by deleted messages will be removed.
107 */
108 protected final int persistentLogSize;
109
110 private final Logger logger;
111 private Map serverMap;
112 private Map clientMap;
113 private Router router;
114
115 /////////////////////////////////////////////////////////////////////////
116 // Constructors //
117 /////////////////////////////////////////////////////////////////////////
118
119 /**
120 * Create a server with no mechanisms to transactionally store messages
121 * on the disk. All persistent and non-persistent messages will be kept
122 * in memory. You should only use the server if you are not using
123 * persistent messages.
124 */
125 public JmsServer()
126 {
127 this(null);
128 }
129
130 /**
131 * Create a server which will create transactional queues for the
132 * storage of persistent messages in the given directory. The
133 * directory must be ths same between application reboots. It
134 * is erroneous to use a temporary directory since the queues
135 * must servive a reboot.
136 *
137 * @param persistentDir Directory which persistent queues will be
138 * written to. Application must have write permissions.
139 */
140 public JmsServer(String persistentDir)
141 {
142 this(persistentDir, "PresumoJms");
143 }
144
145
146
147 /**
148 * Create a server which will create transactional queues for the
149 * storage of persistent messages in the given directory. The
150 * directory must be ths same between application reboots. It
151 * is erroneous to use a temporary directory since the queues
152 * must servive a reboot.
153 * <p>
154 * You may change the prefix of all files created by Presumo
155 * using this constructor. The default value is "PresumoJms".
156 * <p>
157 * The prefix must be the same across reboots and cannot change.
158 * If you change the prefix name, all stored messages using
159 * the previous prefix name will be lost.
160 * </p>
161 *
162 * @param persistentDir Directory which persistent queues will be
163 * written to. Application must have write permissions.
164 * @param persistentPrefix Prefix to use for all persistent filenames.
165 */
166 public JmsServer(String persistentDir,
167 String persistentPrefix)
168 {
169 this(persistentDir, persistentPrefix, 100000);
170 }
171
172
173 /**
174 * Create a server which will create transactional queues for the
175 * storage of persistent messages in the given directory. The
176 * directory must be ths same between application reboots. It
177 * is erroneous to use a temporary directory since the queues
178 * must servive a reboot.
179 * <p>
180 * You may change the prefix of all files created by Presumo
181 * using this constructor. The default value is "PresumoJms".
182 * <p>
183 * The prefix must be the same across reboots and cannot change.
184 * If you change the prefix name, all stored messages using
185 * the previous prefix name will be lost.
186 * <p>
187 * You may also specify the max size in bytes the log file size
188 * will get before the persistent mechanism resolves the contents
189 * to a more permanent file.
190 * </p>
191 * The default value is 100,000 bytes which is a good number for
192 * small messages (less than 1K) but might need to be larger
193 * for larger messages.
194 *
195 * @param persistentDir Directory which persistent queues will be
196 * written to. Application must have write permissions.
197 * @param persistentPrefix Prefix to use for all persistent filenames.
198 * @param persistentLogSize Max size in bytes of the persistent log file.
199 */
200 public JmsServer(String persistentDir,
201 String persistentPrefix,
202 int persistentLogSize)
203 {
204 this.persistentDir = persistentDir;
205 this.persistentPrefix = persistentPrefix;
206 this.persistentLogSize = persistentLogSize;
207
208 serverMap = new HashMap();
209 clientMap = new HashMap();
210 logger = LoggerFactory.getLogger(JmsServer.class, Resources.getBundle());
211 }
212
213
214 ///////////////////////////////////////////////////////////////////////////
215 // Public Methods //
216 ///////////////////////////////////////////////////////////////////////////
217
218
219 /**
220 * Starts the server, but you must specifically create server listners
221 * or connections to other servers via <code>startServerTransport()</code>
222 * and <code>startClientTransport()</code>.
223 *
224 * @see startServerTransport
225 * @see startClientTransport
226 */
227 public synchronized void startup() throws JMSException
228 {
229 logger.entry("startup");
230
231 if (router == null) {
232 try {
233 MessageQueue queue = null;
234
235 if (persistentDir == null) {
236 queue = new MemoryMessageQueue();
237 } else {
238 File dir = new File(persistentDir);
239 PersistentQueue pqueue =
240 new PersistentQueue(dir, persistentPrefix, persistentLogSize);
241 pqueue.open();
242 queue = pqueue;
243 }
244
245 router = new Router(queue);
246
247 } catch (IOException ioe) {
248 JMSException jmsex = new JMSException("Unable to initialize queue");
249 jmsex.setLinkedException(ioe);
250 throw jmsex;
251 }
252 }
253
254 logger.exit("startup");
255 }
256
257 /**
258 * Stops the server. The server will disconnect with all other servers it
259 * is connected to and will disconnect all clients.
260 */
261 public synchronized void shutdown()
262 {
263 logger.entry("shutdown");
264
265 // Clone the information to an array, since the connection information
266 // structures will change as we are shutting down.
267 Set serverSet = serverMap.keySet();
268 Set clientSet = clientMap.keySet();
269
270 String [] servers = new String[serverSet.size()];
271 String [] clients = new String[clientSet.size()];
272
273 serverSet.toArray(servers);
274 clientSet.toArray(clients);
275
276 int i;
277 for (i=0; i < servers.length; ++i) {
278 stopServerTransport(servers[i]);
279 }
280
281 for (i=0; i < clients.length; ++i) {
282 stopClientTransport(clients[i]);
283 }
284
285 router.closeRouter();
286 router = null;
287
288 logger.exit("shutdown");
289 }
290
291
292 /**
293 * Start a built-in server transport mechanism to listen for and
294 * accept connections from client transport mechanismism of the
295 * same protocal.
296 * </p>
297 * <p>
298 * The format of the passed in <code>url</code> should be:
299 * </p>
300 * <code>protocal://inetaddress:port</code>
301 * <ul>
302 * <li><b>protocal</b> - Identifier for built-in protocal.
303 * <li><b>inetaddress</b> - Address to run the protocal on. The loopback
304 * address is usually correct here (i.e. 127.0.0.1 or localhost). The one
305 * exception is a multi-homed machine.
306 * <li><b>port</b> - Port the protocal should listen on.
307 * </ul>
308 * </p>
309 * Currently the only built-in protocal is "tcp" for a TCP/IP
310 * implementation.
311 *
312 * You must start the server before creating connections to it.
313 *
314 * @see startup
315 */
316 public synchronized void startServerTransport(String url) throws JMSException
317 {
318 logger.entry("startServerTransport", url);
319
320 // Clean up when there are more supported protocals
321 if (! url.toLowerCase().startsWith("tcp")) {
322 throw new JMSException(Resources.getResourceString("PJMSE0004", url));
323 }
324 else if (serverMap.containsKey(url)) {
325 throw new JMSException(Resources.getResourceString("PJMSE0002", url));
326 }
327 else {
328 ServerTransport transport = new
329 com.presumo.jms.plugin.implementation.transport.tcp.ServerTransportImpl();
330 transport.setURL(url);
331 transport.setRouter(router);
332 transport.start();
333 serverMap.put(url, transport);
334 }
335
336 logger.exit("startServerTransport");
337 }
338
339 /**
340 * Stop the given server transport.
341 */
342 public synchronized void stopServerTransport(String url)
343 {
344 logger.entry("stopServerTransport", url);
345
346 ServerTransport transport = (ServerTransport) serverMap.get(url);
347
348 if (transport != null) {
349 transport.close();
350 serverMap.remove(url);
351 }
352
353 logger.exit("stopServerTransport");
354 }
355
356
357 /**
358 * Start a client transport to connect to another server.
359 */
360 public synchronized void startClientTransport(String url)
361 throws JMSException
362 {
363 logger.entry("startClientTransport", url);
364
365 // Clean up when there are more supported protocals
366 if (! url.toLowerCase().startsWith("tcp")) {
367 throw new JMSException(Resources.getResourceString("PJMSE0004", url));
368 }
369 else if (clientMap.containsKey(url)) {
370 throw new JMSException(Resources.getResourceString("PJMSE0002", url));
371 }
372 else {
373
374 String host = getHost(url);
375 int port = getPort(url);
376
377 try {
378 Transport transport = new
379 com.presumo.jms.plugin.implementation.transport.tcp.TransportImpl(host, port);
380 ConnectionListener cl = new ConnectionListener() {
381 public void connectionLost(RemoteSession session) {
382 // todo:: handle lost connections gracefully
383 }
384 };
385 RemoteSession session = new RemoteSession(router, transport, cl);
386 session.start();
387
388 clientMap.put(url, transport);
389 } catch (java.io.IOException ioe) {
390 JMSException jmsex = new JMSException("Unable to connect to server " + host+":"+port);
391 jmsex.setLinkedException(ioe);
392 throw jmsex;
393 }
394 }
395
396 logger.exit("startClientTransport");
397 }
398
399 /**
400 * Stop a client transpot connected to another server.
401 */
402 public void stopClientTransport(String url)
403 {
404 logger.entry("stopClientTransport", url);
405
406 Transport transport = (Transport) clientMap.get(url);
407
408 if (transport != null) {
409 transport.close();
410 clientMap.remove(url);
411 }
412
413 logger.exit("stopClientTransport");
414 }
415
416
417 ///////////////////////////////////////////////////////////////////////////
418 // Protected Methods //
419 ///////////////////////////////////////////////////////////////////////////
420
421 /**
422 * Augment the basic functionality by shutting down the server and
423 * releasing all resources before being garbage collected.
424 */
425 protected void finalize() throws Throwable
426 {
427 shutdown();
428 super.finalize();
429 }
430
431 ///////////////////////////////////////////////////////////////////////////
432 // Private Methods //
433 ///////////////////////////////////////////////////////////////////////////
434
435 private String getHost(String url) throws JMSException
436 {
437 logger.entry("getHost", url);
438
439 int loc = url.lastIndexOf('/');
440 int loc2 = url.lastIndexOf(':');
441
442 if (loc == -1 || loc2 == -1) {
443 throw new JMSException("Malformed URL: " + url);
444 }
445
446 String retval = url.substring(loc+1, loc2);
447 logger.exit("getHost", retval);
448 return retval;
449 }
450
451 private int getPort(String url) throws JMSException
452 {
453 logger.entry("getPort", url);
454
455 int loc = url.lastIndexOf(':');
456 if (loc == -1) {
457 throw new JMSException("Malformed URL: " + url);
458 }
459
460 int retval = 0;
461 try {
462 retval = Integer.parseInt(url.substring(loc+1));
463 } catch (NumberFormatException nfe) {
464 JMSException jmsex = new JMSException("Malformed URL: " + url);
465 jmsex.setLinkedException(nfe);
466 throw jmsex;
467 }
468
469 logger.exit("getPort", new Integer(retval));
470 return retval;
471 }
472
473
474 ///////////////////////////////////////////////////////////////////////////
475 // Static Methods //
476 ///////////////////////////////////////////////////////////////////////////
477
478 /**
479 * Used to invoke a JVM with just the Presumo JMS server running within it.
480 * <ul><li>
481 * Usage: java com.presumo.jms.JmsServer [config.properties]
482 * </li></ul>
483 * Where config.properties is the filename of a property file containing
484 * configuration for the server. If the property file is not given the
485 * server will attempt to start on localhost:2323 with no persistent
486 * mechanisms.
487 */
488 public static void main(String [] args) throws Exception
489 {
490 if (args.length > 1) {
491 System.err.println("Usage: JmsServer <server.properties>");
492 System.exit(-1);
493 }
494
495 try {
496 //
497 // Read in the test properties
498 //
499 Properties props = new Properties();
500 if (args.length == 1) {
501 FileInputStream fis = null;
502 try {
503 fis = new FileInputStream(args[0]);
504 props.load(fis);
505 } finally {
506 if (fis != null) fis.close();
507 }
508 }
509
510 String persistentDir = props.getProperty("PersistentDirectory");
511 String serverTransports = props.getProperty("ServerTransports",
512 "tcp://localhost:2323");
513 String clientTransports = props.getProperty("ClientTransports");
514
515
516 // Start the server
517 //
518 JmsServer server = new JmsServer(persistentDir);
519 server.startup();
520
521 if (serverTransports != null && serverTransports.length() > 0) {
522 StringTokenizer tokens =
523 new StringTokenizer(serverTransports.trim(), ";");
524 while(tokens.hasMoreTokens()) {
525 String serverTransport = tokens.nextToken().trim();
526 server.startServerTransport(serverTransport);
527 }
528 }
529
530 if (clientTransports != null && clientTransports.length() > 0) {
531 StringTokenizer tokens =
532 new StringTokenizer(clientTransports.trim(), ";");
533 while(tokens.hasMoreTokens()) {
534 String clientTransport = tokens.nextToken().trim();
535 server.startClientTransport(clientTransport);
536 }
537 }
538
539 System.out.println(Resources.getResourceString("STARTUP_COMPLETE")+"\n");
540
541
542 // Wait for the user to request a shutdown.
543 //
544 BufferedReader input = new BufferedReader(new InputStreamReader(System.in));
545 String choice = "";
546 while(! choice.toLowerCase().equals("exit")) {
547 System.out.println(Resources.getResourceString("SHUTDOWN_INSTRUCTIONS"));
548 choice = input.readLine();
549 }
550
551 // Shutdown the server
552 //
553 server.shutdown();
554 System.out.println(Resources.getResourceString("SHUTDOWN_COMPLETE"));
555
556 } catch (Throwable t) {
557 System.err.println("The following error occured while starting the server:");
558 t.printStackTrace();
559 System.exit(-1);
560 }
561
562 }
563
564 }