1 /*
2 * JBoss, Home of Professional Open Source.
3 * Copyright 2006, Red Hat Middleware LLC, and individual contributors
4 * as indicated by the @author tags. See the copyright.txt file in the
5 * distribution for a full listing of individual contributors.
6 *
7 * This is free software; you can redistribute it and/or modify it
8 * under the terms of the GNU Lesser General Public License as
9 * published by the Free Software Foundation; either version 2.1 of
10 * the License, or (at your option) any later version.
11 *
12 * This software is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this software; if not, write to the Free
19 * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
20 * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
21 */
22 package org.jboss.mq.il.uil2;
23
24 import java.io.IOException;
25 import java.io.ObjectInputStream;
26 import java.io.ObjectOutputStream;
27 import java.net.InetAddress;
28 import java.net.Socket;
29 import java.util.Iterator;
30
31 import javax.jms.JMSException;
32
33 import org.jboss.logging.Logger;
34 import org.jboss.mq.il.uil2.msgs.BaseMsg;
35 import org.jboss.util.stream.NotifyingBufferedInputStream;
36 import org.jboss.util.stream.NotifyingBufferedOutputStream;
37
38 import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
39 import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
40 import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
41 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
42 import EDU.oswego.cs.dl.util.concurrent.SynchronizedInt;
43 import EDU.oswego.cs.dl.util.concurrent.ThreadFactory;
44
45 /** Used to manage the client/server and server/client communication in an
46 * asynchrounous manner.
47 *
48 * @todo verify the pooled executor config
49 *
50 * @author Scott.Stark@jboss.org
51 * @version $Revision: 64067 $
52 */
53 public class SocketManager
54 {
55 private static Logger log = Logger.getLogger(SocketManager.class);
56
57 private static final int STOPPED = 0;
58 private static final int STARTED = 1;
59 private static final int STOPPING = 2;
60 private static SynchronizedInt taskID = new SynchronizedInt(0);
61
62 /** The socket created by the IL layer */
63 private Socket socket;
64 /** The input stream used by the read task */
65 private ObjectInputStream in;
66 /** The buffering for output */
67 NotifyingBufferedInputStream bufferedInput;
68 /** The output stream used by the write task */
69 private ObjectOutputStream out;
70 /** The buffering for output */
71 NotifyingBufferedOutputStream bufferedOutput;
72 /** The write task thread */
73 private Thread writeThread;
74 /** The read task thread */
75 private Thread readThread;
76 /** The thread pool used to service incoming requests */
77 PooledExecutor pool;
78 /** The flag used to control the read loop */
79 private int readState = STOPPED;
80 /** The flag used to control the write loop */
81 private int writeState = STOPPED;
82 /** Used for constrolling the state */
83 private SynchronizedBoolean running = new SynchronizedBoolean(false);
84 /** The queue of messages to be processed by the write task */
85 private LinkedQueue sendQueue;
86 /** A HashMap<Integer, BaseMsg> that are awaiting a reply */
87 private ConcurrentHashMap replyMap;
88 /** The callback handler used for msgs that are not replys */
89 private SocketManagerHandler handler;
90 /** The buffer size */
91 private int bufferSize = 1;
92 /** The chunk size for notification of stream activity */
93 private int chunkSize = 0x40000000;
94 /** The logging trace level which is set in the ctor */
95 private boolean trace;
96
97 public SocketManager(Socket s) throws IOException
98 {
99 socket = s;
100 sendQueue = new LinkedQueue();
101 replyMap = new ConcurrentHashMap();
102 trace = log.isTraceEnabled();
103 }
104
105 /** Start the read and write threads using the given thread group and
106 * names of "UIL2.SocketManager.ReadTask" and "UIL2.SocketManager.WriteTask".
107 * @param tg the thread group to use for the read and write threads.
108 */
109 public void start(ThreadGroup tg)
110 {
111 if (trace)
112 log.trace("start called", new Exception("Start stack trace"));
113
114 InetAddress inetAddr = socket.getInetAddress();
115 String ipAddress = (inetAddr != null) ? inetAddr.getHostAddress() : "<unknown>";
116 ipAddress += ":" + socket.getPort();
117 if (pool == null)
118 {
119 // TODO: Check the validity of this config
120 pool = new PooledExecutor(5);
121 pool.setMinimumPoolSize(1);
122 pool.setKeepAliveTime(1000 * 60);
123 pool.runWhenBlocked();
124 String id = "SocketManager.MsgPool@"+
125 Integer.toHexString(System.identityHashCode(this))
126 + " client=" + ipAddress;
127 pool.setThreadFactory(new UILThreadFactory(id));
128 }
129
130 ReadTask readTask = new ReadTask();
131 readThread = new Thread(tg, readTask, "UIL2.SocketManager.ReadTask#" + taskID.increment() + " client=" + ipAddress);
132 readThread.setDaemon(true);
133
134 WriteTask writeTask = new WriteTask();
135 writeThread = new Thread(tg, writeTask, "UIL2.SocketManager.WriteTask#" + taskID.increment() + " client=" + ipAddress);
136 writeThread.setDaemon(true);
137
138 synchronized (running)
139 {
140 readState = STARTED;
141 writeState = STARTED;
142 running.set(true);
143 }
144
145 try
146 {
147 readThread.start();
148 writeThread.start();
149 }
150 catch (Throwable t)
151 {
152 try
153 {
154 stop();
155 }
156 catch (Throwable ignored)
157 {
158 }
159
160 log.warn("Error starting socket manager threads", t);
161 }
162 }
163
164 /** Stop the read and write threads by interrupting them.
165 */
166 public void stop()
167 {
168 synchronized (running)
169 {
170 if (trace)
171 log.trace("stop() " + readThread + " " + writeThread);
172 if (readState == STARTED)
173 {
174 readState = STOPPING;
175 readThread.interrupt();
176 }
177 if (writeState == STARTED)
178 {
179 writeState = STOPPING;
180 writeThread.interrupt();
181 }
182 running.set(false);
183 if (pool != null)
184 {
185 pool.shutdownNow();
186 pool = null;
187 }
188 try
189 {
190 socket.close();
191 }
192 catch (Throwable ignored)
193 {
194 }
195 }
196 }
197
198 /** Set the callback handler for msgs that were not originated by the
199 * socket manager. This is any msgs read that was not sent via the
200 * sendMessage method.
201 *
202 * @param handler
203 */
204 public void setHandler(SocketManagerHandler handler)
205 {
206 this.handler = handler;
207 if (bufferedInput != null)
208 bufferedInput.setStreamListener(handler);
209 if (bufferedOutput != null)
210 bufferedOutput.setStreamListener(handler);
211 }
212
213 /**
214 * Sets the buffer size
215 *
216 * @param size the size of the buffer
217 */
218 public void setBufferSize(int size)
219 {
220 this.bufferSize = size;
221 }
222
223 /**
224 * Sets the chunk size
225 *
226 * @param size the size of a chunk
227 */
228 public void setChunkSize(int size)
229 {
230 this.chunkSize = size;
231 }
232
233 /** Send a two-way message and block the calling thread until the
234 * msg reply is received. This enques the msg to the sendQueue, places
235 * the msg in the replyMap and waits on the msg. The msg is notified by the
236 * read task thread when it finds a msg with a msgID that maps to the
237 * msg in the msgReply map.
238 *
239 * @param msg the request msg to send
240 * @throws Exception thrown if the reply message has an error value
241 */
242 public void sendMessage(BaseMsg msg) throws Exception
243 {
244 internalSendMessage(msg, true);
245 if (msg.error != null)
246 {
247 if (trace)
248 log.trace("sendMessage will throw error", msg.error);
249 throw msg.error;
250 }
251 }
252
253 /**
254 * Send a reply.
255 *
256 * @param msg the message
257 * @throws Exception for any error
258 */
259 public void sendReply(BaseMsg msg) throws Exception
260 {
261 msg.trimReply();
262 internalSendMessage(msg, false);
263 }
264
265 /**
266 * Send a one-way.
267 *
268 * @param msg the message
269 * @throws Exception for any error
270 */
271 public void sendOneWay(BaseMsg msg) throws Exception
272 {
273 msg.getMsgID();
274 internalSendMessage(msg, false);
275 }
276
277 /** This places the msg into the sendQueue and returns if waitOnReply
278 * is false, or enques the msg to the sendQueue, places the msg
279 * in the replyMap and waits on the msg.
280 *
281 * @param msg
282 * @param waitOnReply
283 * @throws Exception
284 */
285 private void internalSendMessage(BaseMsg msg, boolean waitOnReply) throws Exception
286 {
287 if (running.get() == false)
288 throw new IOException("Client is not connected");
289
290 if (waitOnReply)
291 { // Send a request msg and wait for the reply
292 synchronized (msg)
293 {
294 // Create the request msgID
295 msg.getMsgID();
296 if (trace)
297 log.trace("Begin internalSendMessage, round-trip msg=" + msg);
298 // Place the msg into the write queue and reply map
299 replyMap.put(msg, msg);
300 sendQueue.put(msg);
301 // Wait for the msg reply
302 msg.wait();
303 }
304 }
305 else
306 { // Send an asynchronous msg, typically a reply
307 if (trace)
308 log.trace("Begin internalSendMessage, one-way msg=" + msg);
309 sendQueue.put(msg);
310 }
311 if (trace)
312 log.trace("End internalSendMessage, msg=" + msg);
313 }
314
315 /** The task managing the socket read thread
316 *
317 */
318 public class ReadTask implements Runnable
319 {
320 public void run()
321 {
322 int msgType = 0;
323 log.debug("Begin ReadTask.run " + Thread.currentThread());
324 try
325 {
326 bufferedInput = new NotifyingBufferedInputStream(socket.getInputStream(), bufferSize, chunkSize, handler);
327 in = new ObjectInputStream(bufferedInput);
328 log.debug("Created ObjectInputStream");
329 }
330 catch (IOException e)
331 {
332 handleStop("Failed to create ObjectInputStream", e);
333 return;
334 }
335
336 while (true)
337 {
338 try
339 {
340 msgType = in.readByte();
341 int msgID = in.readInt();
342 if (trace)
343 log.trace("Read msgType: " + BaseMsg.toString(msgType) + ", msgID: " + msgID);
344 // See if there is a msg awaiting a reply
345 BaseMsg key = new BaseMsg(msgType, msgID);
346 BaseMsg msg = (BaseMsg) replyMap.remove(key);
347 if (msg == null)
348 {
349 msg = BaseMsg.createMsg(msgType);
350 msg.setMsgID(msgID);
351 msg.read(in);
352 if (trace)
353 log.trace("Read new msg: " + msg);
354
355 // Handle the message
356 if (pool == null)
357 break;
358 msg.setHandler(this);
359 pool.execute(msg);
360 }
361 else
362 {
363 if (trace)
364 log.trace("Found replyMap msg: " + msg);
365 msg.setMsgID(msgID);
366 try
367 {
368 msg.read(in);
369 if (trace)
370 log.trace("Read msg reply: " + msg);
371 }
372 catch (Throwable e)
373 {
374 // Forward the error to the waiting message
375 msg.setError(e);
376 throw e;
377 }
378 // Always notify the waiting message
379 finally
380 {
381 synchronized (msg)
382 {
383 msg.notify();
384 }
385 }
386 }
387 }
388 catch (ClassNotFoundException e)
389 {
390 handleStop("Failed to read msgType:" + msgType, e);
391 break;
392 }
393 catch (IOException e)
394 {
395 handleStop("Exiting on IOE", e);
396 break;
397 }
398 catch (InterruptedException e)
399 {
400 handleStop("Exiting on interrupt", e);
401 break;
402 }
403 catch (Throwable e)
404 {
405 handleStop("Exiting on unexpected error in read task", e);
406 break;
407 }
408 }
409 log.debug("End ReadTask.run " + Thread.currentThread());
410 }
411
412 /**
413 * Handle the message or respond with an error
414 */
415 public void handleMsg(BaseMsg msg)
416 {
417 try
418 {
419 handler.handleMsg(msg);
420 }
421 catch (Throwable e)
422 {
423 if (e instanceof JMSException || running.get() == false)
424 log.trace("Failed to handle: " + msg.toString(), e);
425 else if (e instanceof RuntimeException || e instanceof Error)
426 log.error("Failed to handle: " + msg.toString(), e);
427 else
428 log.debug("Failed to handle: " + msg.toString(), e);
429 msg.setError(e);
430 try
431 {
432 internalSendMessage(msg, false);
433 }
434 catch (Exception ie)
435 {
436 if (running.get())
437 log.debug("Failed to send error reply", ie);
438 else
439 log.trace("Failed to send error reply", ie);
440 }
441 }
442 }
443
444 /**
445 * Stop the read thread
446 */
447 private void handleStop(String error, Throwable e)
448 {
449 synchronized (running)
450 {
451 readState = STOPPING;
452 running.set(false);
453 }
454
455 if (e instanceof IOException || e instanceof InterruptedException)
456 {
457 if (trace)
458 log.trace(error, e);
459 }
460 else
461 log.debug(error, e);
462
463 replyAll(e);
464 if (handler != null)
465 {
466 handler.asynchFailure(error, e);
467 handler.close();
468 }
469
470 synchronized (running)
471 {
472 readState = STOPPED;
473 if (writeState == STARTED)
474 {
475 writeState = STOPPING;
476 writeThread.interrupt();
477 }
478 }
479
480 try
481 {
482 in.close();
483 }
484 catch (Exception ignored)
485 {
486 if (trace)
487 log.trace(ignored.getMessage(), ignored);
488 }
489
490 try
491 {
492 socket.close();
493 }
494 catch (Exception ignored)
495 {
496 if (trace)
497 log.trace(ignored.getMessage(), ignored);
498 }
499 }
500
501 private void replyAll(Throwable e)
502 {
503 // Clear the interrupted state of the thread
504 Thread.interrupted();
505
506 for (Iterator iterator = replyMap.keySet().iterator(); iterator.hasNext();)
507 {
508 BaseMsg msg = (BaseMsg) iterator.next();
509 msg.setError(e);
510 synchronized (msg)
511 {
512 msg.notify();
513 }
514 iterator.remove();
515 }
516 }
517 }
518
519 /** The task managing the socket write thread
520 *
521 */
522 public class WriteTask implements Runnable
523 {
524 public void run()
525 {
526 log.debug("Begin WriteTask.run " + Thread.currentThread());
527 try
528 {
529 bufferedOutput =
530 new NotifyingBufferedOutputStream(socket.getOutputStream(), bufferSize, chunkSize, handler);
531 out = new ObjectOutputStream(bufferedOutput);
532 log.debug("Created ObjectOutputStream");
533 }
534 catch (IOException e)
535 {
536 handleStop(null, "Failed to create ObjectOutputStream", e);
537 return;
538 }
539
540 while (true)
541 {
542 BaseMsg msg = null;
543
544 synchronized (running)
545 {
546 if (writeState != STARTED)
547 break;
548 }
549 try
550 {
551 msg = (BaseMsg) sendQueue.poll(10000l);
552 if (msg == null)
553 continue; // Check for stop if no message for 10 seconds
554 if (trace)
555 log.trace("Write msg: " + msg);
556 msg.write(out);
557 out.reset();
558 out.flush();
559 }
560 catch (InterruptedException e)
561 {
562 handleStop(msg, "WriteTask was interrupted", e);
563 break;
564 }
565 catch (IOException e)
566 {
567 handleStop(msg, "Exiting on IOE", e);
568 break;
569 }
570 catch (Throwable e)
571 {
572 handleStop(msg, "Failed to write msgType:" + msg, e);
573 break;
574 }
575 }
576 log.debug("End WriteTask.run " + Thread.currentThread());
577 }
578
579 /**
580 * Stop the write thread
581 */
582 private void handleStop(BaseMsg msg, String error, Throwable e)
583 {
584 synchronized (running)
585 {
586 writeState = STOPPING;
587 running.set(false);
588 }
589
590 if (e instanceof InterruptedException || e instanceof IOException)
591 {
592 if (trace)
593 log.trace(error, e);
594 }
595 else
596 log.debug(error, e);
597
598 if (msg != null)
599 {
600 msg.setError(e);
601 synchronized (msg)
602 {
603 msg.notify();
604 }
605 }
606
607 synchronized (running)
608 {
609 writeState = STOPPED;
610 if (readState == STARTED)
611 {
612 readState = STOPPING;
613 readThread.interrupt();
614 }
615 }
616
617 try
618 {
619 out.close();
620 }
621 catch (Exception ignored)
622 {
623 if (trace)
624 log.trace(ignored.getMessage(), ignored);
625 }
626
627 try
628 {
629 socket.close();
630 }
631 catch (Exception ignored)
632 {
633 if (trace)
634 log.trace(ignored.getMessage(), ignored);
635 }
636 }
637 }
638
639 static class UILThreadFactory implements ThreadFactory
640 {
641 private String id;
642 private int count;
643
644 UILThreadFactory(String id)
645 {
646 this.id = id;
647 }
648 public Thread newThread(Runnable command)
649 {
650 synchronized( this )
651 {
652 count ++;
653 }
654 Thread t = new Thread(command, "UIL2("+id+")#"+count);
655 return t;
656 }
657 }
658 }