Save This Page
Home » jboss-5.0.0.CR1-src » org » jboss » mq » il » uil2 » [javadoc | source]
    1   /*
    2    * JBoss, Home of Professional Open Source.
    3    * Copyright 2006, Red Hat Middleware LLC, and individual contributors
    4    * as indicated by the @author tags. See the copyright.txt file in the
    5    * distribution for a full listing of individual contributors.
    6    *
    7    * This is free software; you can redistribute it and/or modify it
    8    * under the terms of the GNU Lesser General Public License as
    9    * published by the Free Software Foundation; either version 2.1 of
   10    * the License, or (at your option) any later version.
   11    *
   12    * This software is distributed in the hope that it will be useful,
   13    * but WITHOUT ANY WARRANTY; without even the implied warranty of
   14    * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
   15    * Lesser General Public License for more details.
   16    *
   17    * You should have received a copy of the GNU Lesser General Public
   18    * License along with this software; if not, write to the Free
   19    * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
   20    * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
   21    */
   22   package org.jboss.mq.il.uil2;
   23   
   24   import java.io.IOException;
   25   import java.io.ObjectInputStream;
   26   import java.io.ObjectOutputStream;
   27   import java.net.InetAddress;
   28   import java.net.Socket;
   29   import java.util.Iterator;
   30   
   31   import javax.jms.JMSException;
   32   
   33   import org.jboss.logging.Logger;
   34   import org.jboss.mq.il.uil2.msgs.BaseMsg;
   35   import org.jboss.util.stream.NotifyingBufferedInputStream;
   36   import org.jboss.util.stream.NotifyingBufferedOutputStream;
   37   
   38   import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
   39   import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
   40   import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
   41   import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
   42   import EDU.oswego.cs.dl.util.concurrent.SynchronizedInt;
   43   import EDU.oswego.cs.dl.util.concurrent.ThreadFactory;
   44   
   45   /** Used to manage the client/server and server/client communication in an
   46    * asynchrounous manner.
   47    *
   48    * @todo verify the pooled executor config
   49    *
   50    * @author  Scott.Stark@jboss.org
   51    * @version $Revision: 64067 $
   52    */
   53   public class SocketManager
   54   {
   55      private static Logger log = Logger.getLogger(SocketManager.class);
   56   
   57      private static final int STOPPED = 0;
   58      private static final int STARTED = 1;
   59      private static final int STOPPING = 2;
   60      private static SynchronizedInt taskID = new SynchronizedInt(0);
   61   
   62      /** The socket created by the IL layer */
   63      private Socket socket;
   64      /** The input stream used by the read task */
   65      private ObjectInputStream in;
   66      /** The buffering for output */
   67      NotifyingBufferedInputStream bufferedInput;
   68      /** The output stream used by the write task */
   69      private ObjectOutputStream out;
   70      /** The buffering for output */
   71      NotifyingBufferedOutputStream bufferedOutput;
   72      /** The write task thread */
   73      private Thread writeThread;
   74      /** The read task thread */
   75      private Thread readThread;
   76      /** The thread pool used to service incoming requests */
   77      PooledExecutor pool;
   78      /** The flag used to control the read loop */
   79      private int readState = STOPPED;
   80      /** The flag used to control the write loop */
   81      private int writeState = STOPPED;
   82      /** Used for constrolling the state */
   83      private SynchronizedBoolean running = new SynchronizedBoolean(false);
   84      /** The queue of messages to be processed by the write task */
   85      private LinkedQueue sendQueue;
   86      /** A HashMap<Integer, BaseMsg> that are awaiting a reply */
   87      private ConcurrentHashMap replyMap;
   88      /** The callback handler used for msgs that are not replys */
   89      private SocketManagerHandler handler;
   90      /** The buffer size */
   91      private int bufferSize = 1;
   92      /** The chunk size for notification of stream activity */
   93      private int chunkSize = 0x40000000;
   94      /** The logging trace level which is set in the ctor */
   95      private boolean trace;
   96   
   97      public SocketManager(Socket s) throws IOException
   98      {
   99         socket = s;
  100         sendQueue = new LinkedQueue();
  101         replyMap = new ConcurrentHashMap();
  102         trace = log.isTraceEnabled();
  103      }
  104   
  105      /** Start the read and write threads using the given thread group and
  106       * names of "UIL2.SocketManager.ReadTask" and "UIL2.SocketManager.WriteTask".
  107       * @param tg the thread group to use for the read and write threads.
  108       */
  109      public void start(ThreadGroup tg)
  110      {
  111         if (trace)
  112            log.trace("start called", new Exception("Start stack trace"));
  113   
  114         InetAddress inetAddr = socket.getInetAddress();
  115         String ipAddress = (inetAddr != null) ? inetAddr.getHostAddress() : "<unknown>";
  116         ipAddress += ":" + socket.getPort();
  117         if (pool == null)
  118         {
  119            // TODO: Check the validity of this config
  120            pool = new PooledExecutor(5);
  121            pool.setMinimumPoolSize(1);
  122            pool.setKeepAliveTime(1000 * 60);
  123            pool.runWhenBlocked();
  124            String id = "SocketManager.MsgPool@"+
  125               Integer.toHexString(System.identityHashCode(this))
  126               + " client=" +  ipAddress;
  127            pool.setThreadFactory(new UILThreadFactory(id));
  128         }
  129   
  130         ReadTask readTask = new ReadTask();
  131         readThread = new Thread(tg, readTask, "UIL2.SocketManager.ReadTask#" + taskID.increment() + " client=" +  ipAddress);
  132         readThread.setDaemon(true);
  133   
  134         WriteTask writeTask = new WriteTask();
  135         writeThread = new Thread(tg, writeTask, "UIL2.SocketManager.WriteTask#" + taskID.increment() + " client=" + ipAddress);
  136         writeThread.setDaemon(true);
  137         
  138         synchronized (running)
  139         {
  140            readState = STARTED;
  141            writeState = STARTED;
  142            running.set(true);
  143         }
  144   
  145         try
  146         {
  147            readThread.start();
  148            writeThread.start();
  149         }
  150         catch (Throwable t)
  151         {
  152            try
  153            {
  154               stop();
  155            }
  156            catch (Throwable ignored)
  157            {
  158            }
  159            
  160            log.warn("Error starting socket manager threads", t);
  161         }
  162      }
  163   
  164      /** Stop the read and write threads by interrupting them.
  165       */
  166      public void stop()
  167      {
  168         synchronized (running)
  169         {
  170            if (trace)
  171               log.trace("stop() " + readThread + " " + writeThread);
  172            if (readState == STARTED)
  173            {
  174               readState = STOPPING;
  175               readThread.interrupt();
  176            }
  177            if (writeState == STARTED)
  178            {
  179               writeState = STOPPING;
  180               writeThread.interrupt();
  181            }
  182            running.set(false);
  183            if (pool != null)
  184            {
  185               pool.shutdownNow();
  186               pool = null;
  187            }
  188            try
  189            {
  190               socket.close();
  191            }
  192            catch (Throwable ignored)
  193            {
  194            }
  195         }
  196      }
  197   
  198      /** Set the callback handler for msgs that were not originated by the
  199       * socket manager. This is any msgs read that was not sent via the
  200       * sendMessage method.
  201       *
  202       * @param handler
  203       */
  204      public void setHandler(SocketManagerHandler handler)
  205      {
  206         this.handler = handler;
  207         if (bufferedInput != null)
  208            bufferedInput.setStreamListener(handler);
  209         if (bufferedOutput != null)
  210            bufferedOutput.setStreamListener(handler);
  211      }
  212   
  213      /**
  214       * Sets the buffer size
  215       *
  216       * @param size the size of the buffer
  217       */
  218      public void setBufferSize(int size)
  219      {
  220         this.bufferSize = size;
  221      }
  222   
  223      /**
  224       * Sets the chunk size
  225       *
  226       * @param size the size of a chunk
  227       */
  228      public void setChunkSize(int size)
  229      {
  230         this.chunkSize = size;
  231      }
  232   
  233      /** Send a two-way message and block the calling thread until the
  234       * msg reply is received. This enques the msg to the sendQueue, places
  235       * the msg in the replyMap and waits on the msg. The msg is notified by the
  236       * read task thread when it finds a msg with a msgID that maps to the
  237       * msg in the msgReply map.
  238       *
  239       * @param msg the request msg to send
  240       * @throws Exception thrown if the reply message has an error value
  241       */
  242      public void sendMessage(BaseMsg msg) throws Exception
  243      {
  244         internalSendMessage(msg, true);
  245         if (msg.error != null)
  246         {
  247            if (trace)
  248               log.trace("sendMessage will throw error", msg.error);
  249            throw msg.error;
  250         }
  251      }
  252   
  253      /** 
  254       * Send a reply.
  255       *
  256       * @param msg the message
  257       * @throws Exception for any error
  258       */
  259      public void sendReply(BaseMsg msg) throws Exception
  260      {
  261         msg.trimReply();
  262         internalSendMessage(msg, false);
  263      }
  264   
  265      /** 
  266       * Send a one-way.
  267       *
  268       * @param msg the message
  269       * @throws Exception for any error
  270       */
  271      public void sendOneWay(BaseMsg msg) throws Exception
  272      {
  273         msg.getMsgID();
  274         internalSendMessage(msg, false);
  275      }
  276   
  277      /** This places the msg into the sendQueue and returns if waitOnReply
  278       * is false, or enques the msg to the sendQueue, places the msg
  279       * in the replyMap and waits on the msg.
  280       *
  281       * @param msg
  282       * @param waitOnReply
  283       * @throws Exception
  284       */
  285      private void internalSendMessage(BaseMsg msg, boolean waitOnReply) throws Exception
  286      {
  287         if (running.get() == false)
  288            throw new IOException("Client is not connected");
  289   
  290         if (waitOnReply)
  291         { // Send a request msg and wait for the reply
  292            synchronized (msg)
  293            {
  294               // Create the request msgID
  295               msg.getMsgID();
  296               if (trace)
  297                  log.trace("Begin internalSendMessage, round-trip msg=" + msg);
  298               // Place the msg into the write queue and reply map
  299               replyMap.put(msg, msg);
  300               sendQueue.put(msg);
  301               // Wait for the msg reply
  302               msg.wait();
  303            }
  304         }
  305         else
  306         { // Send an asynchronous msg, typically a reply
  307            if (trace)
  308               log.trace("Begin internalSendMessage, one-way msg=" + msg);
  309            sendQueue.put(msg);
  310         }
  311         if (trace)
  312            log.trace("End internalSendMessage, msg=" + msg);
  313      }
  314   
  315      /** The task managing the socket read thread
  316       *
  317       */
  318      public class ReadTask implements Runnable
  319      {
  320         public void run()
  321         {
  322            int msgType = 0;
  323            log.debug("Begin ReadTask.run " + Thread.currentThread());
  324            try
  325            {
  326               bufferedInput = new NotifyingBufferedInputStream(socket.getInputStream(), bufferSize, chunkSize, handler);
  327               in = new ObjectInputStream(bufferedInput);
  328               log.debug("Created ObjectInputStream");
  329            }
  330            catch (IOException e)
  331            {
  332               handleStop("Failed to create ObjectInputStream", e);
  333               return;
  334            }
  335   
  336            while (true)
  337            {
  338               try
  339               {
  340                  msgType = in.readByte();
  341                  int msgID = in.readInt();
  342                  if (trace)
  343                     log.trace("Read msgType: " + BaseMsg.toString(msgType) + ", msgID: " + msgID);
  344                  // See if there is a msg awaiting a reply
  345                  BaseMsg key = new BaseMsg(msgType, msgID);
  346                  BaseMsg msg = (BaseMsg) replyMap.remove(key);
  347                  if (msg == null)
  348                  {
  349                     msg = BaseMsg.createMsg(msgType);
  350                     msg.setMsgID(msgID);
  351                     msg.read(in);
  352                     if (trace)
  353                        log.trace("Read new msg: " + msg);
  354   
  355                     // Handle the message
  356                     if (pool == null)
  357                        break;
  358                     msg.setHandler(this);
  359                     pool.execute(msg);
  360                  }
  361                  else
  362                  {
  363                     if (trace)
  364                        log.trace("Found replyMap msg: " + msg);
  365                     msg.setMsgID(msgID);
  366                     try
  367                     {
  368                        msg.read(in);
  369                        if (trace)
  370                           log.trace("Read msg reply: " + msg);
  371                     }
  372                     catch (Throwable e)
  373                     {
  374                        // Forward the error to the waiting message
  375                        msg.setError(e);
  376                        throw e;
  377                     }
  378                     // Always notify the waiting message
  379                     finally
  380                     {
  381                        synchronized (msg)
  382                        {
  383                           msg.notify();
  384                        }
  385                     }
  386                  }
  387               }
  388               catch (ClassNotFoundException e)
  389               {
  390                  handleStop("Failed to read msgType:" + msgType, e);
  391                  break;
  392               }
  393               catch (IOException e)
  394               {
  395                  handleStop("Exiting on IOE", e);
  396                  break;
  397               }
  398               catch (InterruptedException e)
  399               {
  400                  handleStop("Exiting on interrupt", e);
  401                  break;
  402               }
  403               catch (Throwable e)
  404               {
  405                  handleStop("Exiting on unexpected error in read task", e);
  406                  break;
  407               }
  408            }
  409            log.debug("End ReadTask.run " + Thread.currentThread());
  410         }
  411   
  412         /**
  413          * Handle the message or respond with an error
  414          */
  415         public void handleMsg(BaseMsg msg)
  416         {
  417            try
  418            {
  419               handler.handleMsg(msg);
  420            }
  421            catch (Throwable e)
  422            {
  423               if (e instanceof JMSException || running.get() == false)
  424                  log.trace("Failed to handle: " + msg.toString(), e);
  425               else if (e instanceof RuntimeException || e instanceof Error)
  426                  log.error("Failed to handle: " + msg.toString(), e);
  427               else
  428                  log.debug("Failed to handle: " + msg.toString(), e);
  429               msg.setError(e);
  430               try
  431               {
  432                  internalSendMessage(msg, false);
  433               }
  434               catch (Exception ie)
  435               {
  436                  if (running.get())
  437                     log.debug("Failed to send error reply", ie);
  438                  else
  439                     log.trace("Failed to send error reply", ie);
  440               }
  441            }
  442         }
  443   
  444         /**
  445          * Stop the read thread
  446          */
  447         private void handleStop(String error, Throwable e)
  448         {
  449            synchronized (running)
  450            {
  451               readState = STOPPING;
  452               running.set(false);
  453            }
  454   
  455            if (e instanceof IOException || e instanceof InterruptedException)
  456            {
  457               if (trace)
  458                  log.trace(error, e);
  459            }
  460            else
  461               log.debug(error, e);
  462   
  463            replyAll(e);
  464            if (handler != null)
  465            {
  466               handler.asynchFailure(error, e);
  467               handler.close();
  468            }
  469   
  470            synchronized (running)
  471            {
  472               readState = STOPPED;
  473               if (writeState == STARTED)
  474               {
  475                  writeState = STOPPING;
  476                  writeThread.interrupt();
  477               }
  478            }
  479   
  480            try
  481            {
  482               in.close();
  483            }
  484            catch (Exception ignored)
  485            {
  486               if (trace)
  487                  log.trace(ignored.getMessage(), ignored);
  488            }
  489   
  490            try
  491            {
  492               socket.close();
  493            }
  494            catch (Exception ignored)
  495            {
  496               if (trace)
  497                  log.trace(ignored.getMessage(), ignored);
  498            }
  499         }
  500   
  501         private void replyAll(Throwable e)
  502         {
  503            // Clear the interrupted state of the thread
  504            Thread.interrupted();
  505   
  506            for (Iterator iterator = replyMap.keySet().iterator(); iterator.hasNext();)
  507            {
  508               BaseMsg msg = (BaseMsg) iterator.next();
  509               msg.setError(e);
  510               synchronized (msg)
  511               {
  512                  msg.notify();
  513               }
  514               iterator.remove();
  515            }
  516         }
  517      }
  518   
  519      /** The task managing the socket write thread
  520       *
  521       */
  522      public class WriteTask implements Runnable
  523      {
  524         public void run()
  525         {
  526            log.debug("Begin WriteTask.run " + Thread.currentThread());
  527            try
  528            {
  529               bufferedOutput =
  530                  new NotifyingBufferedOutputStream(socket.getOutputStream(), bufferSize, chunkSize, handler);
  531               out = new ObjectOutputStream(bufferedOutput);
  532               log.debug("Created ObjectOutputStream");
  533            }
  534            catch (IOException e)
  535            {
  536               handleStop(null, "Failed to create ObjectOutputStream", e);
  537               return;
  538            }
  539   
  540            while (true)
  541            {
  542               BaseMsg msg = null;
  543               
  544               synchronized (running)
  545               {
  546                  if (writeState != STARTED)
  547                     break;
  548               }
  549               try
  550               {
  551                  msg = (BaseMsg) sendQueue.poll(10000l);
  552                  if (msg == null)
  553                     continue; // Check for stop if no message for 10 seconds
  554                  if (trace)
  555                     log.trace("Write msg: " + msg);
  556                  msg.write(out);
  557                  out.reset();
  558                  out.flush();
  559               }
  560               catch (InterruptedException e)
  561               {
  562                  handleStop(msg, "WriteTask was interrupted", e);
  563                  break;
  564               }
  565               catch (IOException e)
  566               {
  567                  handleStop(msg, "Exiting on IOE", e);
  568                  break;
  569               }
  570               catch (Throwable e)
  571               {
  572                  handleStop(msg, "Failed to write msgType:" + msg, e);
  573                  break;
  574               }
  575            }
  576            log.debug("End WriteTask.run " + Thread.currentThread());
  577         }
  578   
  579         /**
  580          * Stop the write thread
  581          */
  582         private void handleStop(BaseMsg msg, String error, Throwable e)
  583         {
  584            synchronized (running)
  585            {
  586               writeState = STOPPING;
  587               running.set(false);
  588            }
  589   
  590            if (e instanceof InterruptedException || e instanceof IOException)
  591            {
  592               if (trace)
  593                  log.trace(error, e);
  594            }
  595            else
  596               log.debug(error, e);
  597   
  598            if (msg != null)
  599            {
  600               msg.setError(e);
  601               synchronized (msg)
  602               {
  603                  msg.notify();
  604               }
  605            }
  606   
  607            synchronized (running)
  608            {
  609               writeState = STOPPED;
  610               if (readState == STARTED)
  611               {
  612                  readState = STOPPING;
  613                  readThread.interrupt();
  614               }
  615            }
  616   
  617            try
  618            {
  619               out.close();
  620            }
  621            catch (Exception ignored)
  622            {
  623               if (trace)
  624                  log.trace(ignored.getMessage(), ignored);
  625            }
  626   
  627            try
  628            {
  629               socket.close();
  630            }
  631            catch (Exception ignored)
  632            {
  633               if (trace)
  634                  log.trace(ignored.getMessage(), ignored);
  635            }
  636         }
  637      }
  638   
  639      static class UILThreadFactory implements ThreadFactory
  640      {
  641         private String id;
  642         private int count;
  643         
  644         UILThreadFactory(String id)
  645         {
  646            this.id = id;
  647         }
  648         public Thread newThread(Runnable command)
  649         {
  650            synchronized( this )
  651            {
  652               count ++;
  653            }
  654            Thread t = new Thread(command, "UIL2("+id+")#"+count);
  655            return t;
  656         }
  657      }
  658   }

Save This Page
Home » jboss-5.0.0.CR1-src » org » jboss » mq » il » uil2 » [javadoc | source]