Save This Page
Home » j2ssh-0.2.9-src » com.sshtools.j2ssh.transport » [javadoc | source]
    1   /*
    2    *  SSHTools - Java SSH2 API
    3    *
    4    *  Copyright (C) 2002-2003 Lee David Painter and Contributors.
    5    *
    6    *  Contributions made by:
    7    *
    8    *  Brett Smith
    9    *  Richard Pernavas
   10    *  Erwin Bolwidt
   11    *
   12    *  This program is free software; you can redistribute it and/or
   13    *  modify it under the terms of the GNU General Public License
   14    *  as published by the Free Software Foundation; either version 2
   15    *  of the License, or (at your option) any later version.
   16    *
   17    *  This program is distributed in the hope that it will be useful,
   18    *  but WITHOUT ANY WARRANTY; without even the implied warranty of
   19    *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   20    *  GNU General Public License for more details.
   21    *
   22    *  You should have received a copy of the GNU General Public License
   23    *  along with this program; if not, write to the Free Software
   24    *  Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
   25    */
   26   package com.sshtools.j2ssh.transport;
   27   
   28   import com.sshtools.j2ssh.io.ByteArrayReader;
   29   
   30   import org.apache.commons.logging.Log;
   31   import org.apache.commons.logging.LogFactory;
   32   
   33   import java.util.ArrayList;
   34   import java.util.HashMap;
   35   import java.util.Iterator;
   36   import java.util.List;
   37   import java.util.Map;
   38   import java.util.Vector;
   39   
   40   
   41   /**
   42    * <p>
   43    * This class implements a message store that can be used to provide a blocking
   44    * mechanism for transport protocol messages.
   45    * </p>
   46    *
   47    * @author Lee David Painter
   48    * @version $Revision: 1.42 $
   49    *
   50    * @since 0.2.0
   51    */
   52   public final class SshMessageStore {
   53       private static Log log = LogFactory.getLog(SshMessageStore.class);
   54   
   55       // List to hold messages as they are received
   56       private List messages = new ArrayList();
   57       private Map register = new HashMap();
   58       private boolean isClosed = false;
   59       private int[] singleIdFilter = new int[1];
   60       private int interrupt = 5000;
   61       private Vector listeners = new Vector();
   62   
   63       /**
   64        * <p>
   65        * Contructs the message store.
   66        * </p>
   67        *
   68        * @since 0.2.0
   69        */
   70       public SshMessageStore() {
   71       }
   72   
   73       /**
   74        * <p>
   75        * Evaluate whether the message store is closed.
   76        * </p>
   77        *
   78        * @return
   79        *
   80        * @since 0.2.0
   81        */
   82       public boolean isClosed() {
   83           return isClosed;
   84       }
   85   
   86       public void addMessageListener(SshMessageListener listener) {
   87           synchronized (listeners) {
   88               listeners.add(listener);
   89           }
   90       }
   91   
   92       /**
   93        * <p>
   94        * Get a message from the store. This method will block until a message
   95        * with an id matching the supplied filter arrives, or the message store
   96        * closes. The message is removed from the store.
   97        * </p>
   98        *
   99        * @param messageIdFilter an array of message ids that are acceptable
  100        *
  101        * @return the next available message
  102        *
  103        * @throws MessageStoreEOFException if the message store is closed
  104        * @throws InterruptedException if the thread was interrupted
  105        *
  106        * @since 0.2.0
  107        */
  108       public synchronized SshMessage getMessage(int[] messageIdFilter)
  109           throws MessageStoreEOFException, InterruptedException {
  110           try {
  111               return getMessage(messageIdFilter, 0);
  112           } catch (MessageNotAvailableException e) {
  113               // This should never happen but throw just in case
  114               throw new MessageStoreEOFException();
  115           }
  116       }
  117   
  118       /**
  119        * <p>
  120        * Get a message from the store. This method will block until a message
  121        * with an id matching the supplied filter arrives, the specified timeout
  122        * is reached or the message store closes. The message is removed from the
  123        * store.
  124        * </p>
  125        *
  126        * @param messageIdFilter an array of message ids that are acceptable.
  127        * @param timeout the maximum number of milliseconds to block before
  128        *        returning.
  129        *
  130        * @return the next available message
  131        *
  132        * @throws MessageStoreEOFException if the message store is closed
  133        * @throws MessageNotAvailableException if the message is not available
  134        *         after a timeout
  135        * @throws InterruptedException if the thread is interrupted
  136        *
  137        * @since 0.2.0
  138        */
  139       public synchronized SshMessage getMessage(int[] messageIdFilter, int timeout)
  140           throws MessageStoreEOFException, MessageNotAvailableException, 
  141               InterruptedException {
  142           if ((messages.size() <= 0) && isClosed) {
  143               throw new MessageStoreEOFException();
  144           }
  145   
  146           if (messageIdFilter == null) {
  147               return nextMessage();
  148           }
  149   
  150           SshMessage msg;
  151           boolean firstPass = true;
  152   
  153           if (timeout < 0) {
  154               timeout = 0;
  155           }
  156   
  157           while ((messages.size() > 0) || !isClosed) {
  158               // lookup the message
  159               msg = lookupMessage(messageIdFilter, true);
  160   
  161               if (msg != null) {
  162                   return msg;
  163               } else {
  164                   // If this is the second time and there's no message, then throw
  165                   if (!firstPass && (timeout > 0)) {
  166                       throw new MessageNotAvailableException();
  167                   }
  168               }
  169   
  170               // Now wait
  171               if (!isClosed) {
  172                   wait((timeout == 0) ? interrupt : timeout);
  173               }
  174   
  175               firstPass = false;
  176           }
  177   
  178           throw new MessageStoreEOFException();
  179       }
  180   
  181       /**
  182        * <p>
  183        * Get a message from the store. This method will block until a message
  184        * with an id matching the supplied id arrives, or the message store
  185        * closes. The message is removed from the store.
  186        * </p>
  187        *
  188        * @param messageId the id of the message requried
  189        *
  190        * @return the next available message with the id supplied
  191        *
  192        * @throws MessageStoreEOFException if the message store closed
  193        * @throws InterruptedException if the thread is interrupted
  194        *
  195        * @since 0.2.0
  196        */
  197       public synchronized SshMessage getMessage(int messageId)
  198           throws MessageStoreEOFException, InterruptedException {
  199           try {
  200               return getMessage(messageId, 0);
  201           } catch (MessageNotAvailableException e) {
  202               // This should never happen by throw jsut in case
  203               throw new MessageStoreEOFException();
  204           }
  205       }
  206   
  207       /**
  208        * <p>
  209        * Get a message from the store. This method will block until a message
  210        * with an id matching the supplied id arrives,the specified timeout is
  211        * reached or the message store closes. The message will be removed from
  212        * the store.
  213        * </p>
  214        *
  215        * @param messageId the id of the message requried
  216        * @param timeout the maximum number of milliseconds to block before
  217        *        returning.
  218        *
  219        * @return the next available message with the id supplied
  220        *
  221        * @throws MessageStoreEOFException if the message store closed
  222        * @throws InterruptedException if the thread is interrupted
  223        * @throws InterruptedException
  224        *
  225        * @since 0.2.0
  226        */
  227       public synchronized SshMessage getMessage(int messageId, int timeout)
  228           throws MessageStoreEOFException, MessageNotAvailableException, 
  229               InterruptedException {
  230           singleIdFilter[0] = messageId;
  231   
  232           return getMessage(singleIdFilter, timeout);
  233       }
  234   
  235       /**
  236        * <p>
  237        * Evaluate whether the store has any messages.
  238        * </p>
  239        *
  240        * @return true if messages exist, otherwise false
  241        *
  242        * @since 0.2.0
  243        */
  244       public boolean hasMessages() {
  245           return messages.size() > 0;
  246       }
  247   
  248       /**
  249        * <p>
  250        * Returns the number of messages contained within this message store.
  251        * </p>
  252        *
  253        * @return the number of messages
  254        *
  255        * @since 0.2.0
  256        */
  257       public int size() {
  258           return messages.size();
  259       }
  260   
  261       /**
  262        * <p>
  263        * Determines if the message id is a registered message of this store.
  264        * </p>
  265        *
  266        * @param messageId the message id
  267        *
  268        * @return true if the message id is registered, otherwise false
  269        *
  270        * @since 0.2.0
  271        */
  272       public boolean isRegisteredMessage(Integer messageId) {
  273           return register.containsKey(messageId);
  274       }
  275   
  276       /**
  277        * <p>
  278        * Adds a raw message to the store and processes the data into a registered
  279        * message.
  280        * </p>
  281        *
  282        * @param msgdata the raw message data to process
  283        *
  284        * @throws MessageNotRegisteredException if the message id of the raw data
  285        *         is not a registered message
  286        * @throws InvalidMessageException if the message is invalid
  287        *
  288        * @since 0.2.0
  289        */
  290       public void addMessage(byte[] msgdata)
  291           throws MessageNotRegisteredException, InvalidMessageException {
  292           Integer messageId = new Integer(msgdata[5]);
  293   
  294           if (!isRegisteredMessage(messageId)) {
  295               throw new MessageNotRegisteredException(messageId);
  296           }
  297   
  298           Class cls = (Class) register.get(SshMessage.getMessageId(msgdata));
  299   
  300           try {
  301               SshMessage msg = (SshMessage) cls.newInstance();
  302               msg.fromByteArray(new ByteArrayReader(msgdata));
  303               addMessage(msg);
  304           } catch (IllegalAccessException iae) {
  305               throw new InvalidMessageException(
  306                   "Illegal access for implementation class " + cls.getName());
  307           } catch (InstantiationException ie) {
  308               throw new InvalidMessageException("Instantiation failed for class " +
  309                   cls.getName());
  310           }
  311       }
  312   
  313       /**
  314        * <p>
  315        * Add a formed message to the store.
  316        * </p>
  317        *
  318        * @param msg the message to add to the store
  319        *
  320        * @throws MessageNotRegisteredException if the message type is not
  321        *         registered with the store
  322        *
  323        * @since 0.2.0
  324        */
  325       public synchronized void addMessage(SshMessage msg)
  326           throws MessageNotRegisteredException {
  327           // Add the message
  328           messages.add(messages.size(), msg);
  329   
  330           synchronized (listeners) {
  331               if (listeners.size() > 0) {
  332                   for (Iterator it = listeners.iterator(); it.hasNext();) {
  333                       ((SshMessageListener) it.next()).messageReceived(msg);
  334                   }
  335               }
  336           }
  337   
  338           // Notify the threads
  339           notifyAll();
  340       }
  341   
  342       /**
  343        * <p>
  344        * Closes the store. This will cause any blocking operations on the message
  345        * store to return.
  346        * </p>
  347        *
  348        * @since 0.2.0
  349        */
  350       public synchronized void close() {
  351           isClosed = true;
  352   
  353           // We need to notify all anyway as if there are messages still available
  354           // it should not affect the waiting threads as they are waiting for their
  355           // own messages to be received because non were avaialable in the first place
  356           //if (messages.size()<=0) {
  357           notifyAll();
  358   
  359           //}
  360       }
  361   
  362       /**
  363        * <p>
  364        * Get the next message in the store or wait until a new message arrives.
  365        * The message is removed from the store.
  366        * </p>
  367        *
  368        * @return the next available message.
  369        *
  370        * @throws MessageStoreEOFException if the message store is closed
  371        * @throws InterruptedException if the thread is interrupted
  372        *
  373        * @since 0.2.0
  374        */
  375       public synchronized SshMessage nextMessage()
  376           throws MessageStoreEOFException, InterruptedException {
  377           if ((messages.size() <= 0) && isClosed) {
  378               throw new MessageStoreEOFException();
  379           }
  380   
  381           // If there are no messages available then wait untill there are.
  382           while ((messages.size() <= 0) && !isClosed) {
  383               wait(interrupt);
  384           }
  385   
  386           if (messages.size() > 0) {
  387               return (SshMessage) messages.remove(0);
  388           } else {
  389               throw new MessageStoreEOFException();
  390           }
  391       }
  392   
  393       /**
  394        *
  395        */
  396       public synchronized void breakWaiting() {
  397           notifyAll();
  398       }
  399   
  400       /**
  401        * <p>
  402        * Get a message from the store without removing or blocking if the message
  403        * does not exist.
  404        * </p>
  405        *
  406        * @param messageIdFilter the id of the message requried
  407        *
  408        * @return the next available message with the id supplied
  409        *
  410        * @throws MessageStoreEOFException if the message store closed
  411        * @throws MessageNotAvailableException if the message is not available
  412        * @throws InterruptedException if the thread is interrupted
  413        *
  414        * @since 0.2.0
  415        */
  416       public synchronized SshMessage peekMessage(int[] messageIdFilter)
  417           throws MessageStoreEOFException, MessageNotAvailableException, 
  418               InterruptedException {
  419           return peekMessage(messageIdFilter, 0);
  420       }
  421   
  422       /**
  423        * <p>
  424        * Get a message from the store without removing it; only blocking for the
  425        * number of milliseconds specified in the timeout field. If timeout is
  426        * zero, the method will not block.
  427        * </p>
  428        *
  429        * @param messageIdFilter an array of acceptable message ids
  430        * @param timeout the number of milliseconds to wait
  431        *
  432        * @return the next available message of the acceptable message ids
  433        *
  434        * @throws MessageStoreEOFException if the message store is closed
  435        * @throws MessageNotAvailableException if the message is not available
  436        * @throws InterruptedException if the thread is interrupted
  437        *
  438        * @since 0.2.0
  439        */
  440       public synchronized SshMessage peekMessage(int[] messageIdFilter,
  441           int timeout)
  442           throws MessageStoreEOFException, MessageNotAvailableException, 
  443               InterruptedException {
  444           SshMessage msg;
  445   
  446           // Do a straight lookup
  447           msg = lookupMessage(messageIdFilter, false);
  448   
  449           if (msg != null) {
  450               return msg;
  451           }
  452   
  453           // If were willing to wait the wait and look again
  454           if (timeout > 0) {
  455               if (log.isDebugEnabled()) {
  456                   log.debug("No message so waiting for " +
  457                       String.valueOf(timeout) + " milliseconds");
  458               }
  459   
  460               wait(timeout);
  461               msg = lookupMessage(messageIdFilter, false);
  462   
  463               if (msg != null) {
  464                   return msg;
  465               }
  466           }
  467   
  468           // Nothing even after a wait so throw the relevant exception
  469           if (isClosed) {
  470               throw new MessageStoreEOFException();
  471           } else {
  472               throw new MessageNotAvailableException();
  473           }
  474       }
  475   
  476       private SshMessage lookupMessage(int[] messageIdFilter, boolean remove) {
  477           SshMessage msg;
  478   
  479           for (int x = 0; x < messages.size(); x++) {
  480               msg = (SshMessage) messages.get(x);
  481   
  482               // Determine whether its one of the filtered messages
  483               for (int i = 0; i < messageIdFilter.length; i++) {
  484                   if (msg.getMessageId() == messageIdFilter[i]) {
  485                       if (remove) {
  486                           messages.remove(msg);
  487                       }
  488   
  489                       return msg;
  490                   }
  491               }
  492           }
  493   
  494           return null;
  495       }
  496   
  497       /**
  498        * <p>
  499        * Get a message from the store without removing it.
  500        * </p>
  501        *
  502        * @param messageId the acceptable message id
  503        *
  504        * @return the next available message.
  505        *
  506        * @throws MessageStoreEOFException if the message store is closed.
  507        * @throws MessageNotAvailableException if the message is not available.
  508        * @throws InterruptedException if the thread is interrupted
  509        *
  510        * @since 0.2.0
  511        */
  512       public synchronized SshMessage peekMessage(int messageId)
  513           throws MessageStoreEOFException, MessageNotAvailableException, 
  514               InterruptedException {
  515           return peekMessage(messageId, 0);
  516       }
  517   
  518       /**
  519        * <p>
  520        * Removes a message from the message store.
  521        * </p>
  522        *
  523        * @param msg the message to remove
  524        *
  525        * @since 0.2.0
  526        */
  527       public synchronized void removeMessage(SshMessage msg) {
  528           messages.remove(msg);
  529       }
  530   
  531       /**
  532        * <p>
  533        * Get a message from the store without removing it, only blocking for the
  534        * number of milliseconds specified in the timeout field.
  535        * </p>
  536        *
  537        * @param messageId the acceptable message id
  538        * @param timeout the timeout setting in milliseconds
  539        *
  540        * @return the next available message
  541        *
  542        * @throws MessageStoreEOFException if the message store is closed
  543        * @throws MessageNotAvailableException if the message is not available
  544        * @throws InterruptedException if the thread is interrupted
  545        *
  546        * @since 0.2.0
  547        */
  548       public synchronized SshMessage peekMessage(int messageId, int timeout)
  549           throws MessageStoreEOFException, MessageNotAvailableException, 
  550               InterruptedException {
  551           singleIdFilter[0] = messageId;
  552   
  553           return peekMessage(singleIdFilter, timeout);
  554       }
  555   
  556       /**
  557        * <p>
  558        * Register a message implementation with the store.
  559        * </p>
  560        *
  561        * @param messageId the id of the message
  562        * @param implementor the class of the implementation
  563        *
  564        * @since 0.2.0
  565        */
  566       public void registerMessage(int messageId, Class implementor) {
  567           Integer id = new Integer(messageId);
  568           register.put(id, implementor);
  569       }
  570   
  571       /**
  572        * <p>
  573        * Returns an Object array (Integers) of the registered message ids.
  574        * </p>
  575        *
  576        * @return the registered message id array
  577        *
  578        * @since 0.2.0
  579        */
  580       public Object[] getRegisteredMessageIds() {
  581           return register.keySet().toArray();
  582       }
  583   
  584       /**
  585        * <p>
  586        * Create a formed message from raw message data.
  587        * </p>
  588        *
  589        * @param msgdata the raw message data
  590        *
  591        * @return the formed message
  592        *
  593        * @throws MessageNotRegisteredException if the message is not a registered
  594        *         message
  595        * @throws InvalidMessageException if the message is invalid
  596        *
  597        * @since 0.2.0
  598        */
  599       public SshMessage createMessage(byte[] msgdata)
  600           throws MessageNotRegisteredException, InvalidMessageException {
  601           Integer messageId = SshMessage.getMessageId(msgdata);
  602   
  603           if (!isRegisteredMessage(messageId)) {
  604               throw new MessageNotRegisteredException(messageId);
  605           }
  606   
  607           Class cls = (Class) register.get(SshMessage.getMessageId(msgdata));
  608   
  609           try {
  610               SshMessage msg = (SshMessage) cls.newInstance();
  611               msg.fromByteArray(new ByteArrayReader(msgdata));
  612   
  613               return msg;
  614           } catch (IllegalAccessException iae) {
  615               throw new InvalidMessageException(
  616                   "Illegal access for implementation class " + cls.getName());
  617           } catch (InstantiationException ie) {
  618               throw new InvalidMessageException("Instantiation failed for class " +
  619                   cls.getName());
  620           }
  621       }
  622   }

Save This Page
Home » j2ssh-0.2.9-src » com.sshtools.j2ssh.transport » [javadoc | source]