Save This Page
Home » openjdk-7 » net » sf » rvpf » messaging » [javadoc | source]
    1   /**
    2    * Related Values Processing Framework.
    3    *
    4    * Copyright (C) 2003 Serge Brisson.
    5    *
    6    * This software is distributable under LGPL license.
    7    * See details at the bottom of this file.
    8    *
    9    * $Header: /cvsroot/rvpf/RVPF/java/src/net/sf/rvpf/messaging/ForwarderThread.java,v 1.17 2003/10/31 02:47:04 sfb Exp $
   10    */
   11   package net.sf.rvpf.messaging;
   12   
   13   import java.io.Serializable;
   14   import java.util.Properties;
   15   
   16   import net.sf.rvpf.util.Naming;
   17   import net.sf.rvpf.util.Queuing;
   18   import net.sf.rvpf.util.ServiceThread;
   19   import net.sf.rvpf.util.Messaging.Message;
   20   import net.sf.rvpf.util.Queuing.Receiver;
   21   import net.sf.rvpf.util.Queuing.Sender;
   22   
   23   /** Forwarder Thread.
   24    *
   25    * @author Serge Brisson
   26    * @version $Revision: 1.17 $
   27    */
   28   public class ForwarderThread extends ServiceThread {
   29   
   30       // Constructors.
   31   
   32       /** Constructs a Forwder Thread.
   33        */
   34       public ForwarderThread() {
   35           setName("Forwarder");
   36       }
   37   
   38       // Public Instance Methods.
   39   
   40       /** Run within the {@link java.lang.Thread}.
   41        */
   42       public synchronized void run() {
   43           Message message;
   44           Serializable object;
   45   
   46           try {
   47               getLog().info("Running");
   48               while (true) {
   49                   message = receive();
   50                   if (message == null) sleep(this.retryDelayMillis);
   51                   else if (send(message)) commit();
   52                   else {
   53                       rollback();
   54                       sleep(this.retryDelayMillis);
   55                   }
   56               }
   57           } catch (InterruptedException ie) {
   58               getLog().info("Interrupted");
   59           } catch (Exception e) {
   60               getLog().fatal("Thread terminated by Exception", e);
   61               e.printStackTrace();
   62           }
   63       }
   64   
   65       // Protected Instance Methods.
   66   
   67       /**
   68        */
   69       protected void close() {
   70           interrupt();
   71           if (this.input != null) this.input.close();
   72           if (this.output != null) this.output.close();
   73       }
   74   
   75       /**
   76        */
   77       protected boolean setUp() {
   78           if (!super.setUp()) return false;
   79   
   80           this.retryDelayMillis =
   81               getMetadata().getPropertyValue(
   82                   RETRY_DELAY_PROPERTY,
   83                   DEFAULT_RETRY_DELAY) * 1000L;
   84           if (getLog().isDebugEnabled())
   85               getLog().debug("Retry delay millis: " + this.retryDelayMillis);
   86   
   87           this.inputProperties =
   88               Naming.getEnvironment(getMetadata(), null, INPUT_NAMING_PROPERTY);
   89           if (this.inputProperties == null) return false;
   90   
   91           this.outputProperties =
   92               Naming.getEnvironment(getMetadata(), null, OUTPUT_NAMING_PROPERTY);
   93           if (this.outputProperties == null) return false;
   94   
   95           return true;
   96       }
   97   
   98       /**
   99        */
  100       protected void tearDown() {
  101           tearDownInput();
  102           tearDownOutput();
  103   
  104           super.tearDown();
  105       }
  106   
  107       // Private Instance Methods.
  108   
  109       private void commit() throws InterruptedException {
  110           try {
  111               this.input.commit();
  112           } catch (InterruptedException ie) {
  113               throw ie;
  114           } catch (Exception exception) {
  115               this.inputLogger.log(exception);
  116               tearDownInput();
  117           }
  118       }
  119   
  120       private Message receive() throws InterruptedException {
  121           Message message;
  122   
  123           try {
  124               if (this.input == null) {
  125                   setUpInput();
  126                   if (this.input == null)
  127                       throw new Exception("Failed to set up input");
  128                   this.inputLogger.reset();
  129               }
  130   
  131               message = this.receiver.receive();
  132           } catch (InterruptedException ie) {
  133               throw ie;
  134           } catch (Exception exception) {
  135               this.inputLogger.log(exception);
  136               tearDownInput();
  137               return null;
  138           }
  139   
  140           return message;
  141       }
  142   
  143       private void rollback() throws InterruptedException {
  144           try {
  145               this.input.rollback();
  146           } catch (InterruptedException ie) {
  147               throw ie;
  148           } catch (Exception exception) {
  149               this.inputLogger.log(exception);
  150               tearDownInput();
  151           }
  152       }
  153   
  154       private boolean send(Message message) throws InterruptedException {
  155           Serializable object = (Serializable) message.getObject();
  156   
  157           try {
  158               if (this.output == null) {
  159                   setUpOutput();
  160                   if (this.output == null)
  161                       throw new Exception("Failed to set up output");
  162                   this.outputLogger.reset();
  163               }
  164   
  165               sender.send(this.output.createMessage(object));
  166   
  167               if (getLog().isDebugEnabled())
  168                   getLog().debug("Forwarded a message submitted on "
  169                       + message.getTimestamp());
  170           } catch (InterruptedException ie) {
  171               throw ie;
  172           } catch (Exception exception) {
  173               this.outputLogger.log(exception);
  174               tearDownOutput();
  175               return false;
  176           }
  177   
  178           return true;
  179       }
  180   
  181       private void setUpInput() {
  182           this.input = Queuing.create(
  183               getMetadata(),
  184               Naming.create(this.inputProperties));
  185   
  186           if (this.input.setUpFactory(INPUT_CONNECTION_FACTORY_PROPERTY)) {
  187               this.input.setUpConnection(
  188                   INPUT_CONNECTION_USER_PROPERTY,
  189                   INPUT_CONNECTION_PASSWORD_PROPERTY);
  190               this.input.setUpSession(true);
  191   
  192               this.receiver = this.input.createReceiver(INPUT_QUEUE_PROPERTY);
  193               if (this.receiver == null) tearDownInput();
  194           } else tearDownInput();
  195       }
  196   
  197       private void setUpOutput() {
  198           this.output = Queuing.create(
  199               getMetadata(),
  200               Naming.create(this.outputProperties));
  201   
  202           if (this.output.setUpFactory(OUTPUT_CONNECTION_FACTORY_PROPERTY)) {
  203               this.output.setUpConnection(
  204                   OUTPUT_CONNECTION_USER_PROPERTY,
  205                   OUTPUT_CONNECTION_PASSWORD_PROPERTY);
  206               this.output.setUpSession(false);
  207   
  208               this.sender = this.output.createSender(OUTPUT_QUEUE_PROPERTY, true);
  209               if (this.sender == null) tearDownOutput(); 
  210           } else tearDownOutput();
  211       }
  212   
  213       private void tearDownInput() {
  214           if (this.receiver != null) {
  215               this.receiver.close();
  216               this.receiver = null;
  217           }
  218           if (this.input != null) {
  219               this.input.tearDown();
  220               this.input = null;
  221           }
  222       }
  223   
  224       private void tearDownOutput() {
  225           if (this.sender != null) {
  226               this.sender.close();
  227               this.sender = null;
  228           }
  229           if (this.output != null) {
  230               this.output.tearDown();
  231               this.output = null;
  232           }
  233       }
  234   
  235       // Class Constants.
  236   
  237       public static final int DEFAULT_RETRY_DELAY = 60;
  238       public static final String INPUT_CONNECTION_FACTORY_PROPERTY = "forwarder.input.queue.connection.factory";
  239       public static final String INPUT_CONNECTION_PASSWORD_PROPERTY = "forwarder.input.queue.connection.password";
  240       public static final String INPUT_CONNECTION_USER_PROPERTY = "forwarder.input.queue.connection.user";
  241       public static final String INPUT_NAMING_PROPERTY = "forwarder.input.jndi.properties";
  242       public static final String INPUT_QUEUE_PROPERTY = "forwarder.input.queue";
  243       public static final String OUTPUT_CONNECTION_FACTORY_PROPERTY = "forwarder.output.queue.connection.factory";
  244       public static final String OUTPUT_CONNECTION_PASSWORD_PROPERTY = "forwarder.output.queue.connection.password";
  245       public static final String OUTPUT_CONNECTION_USER_PROPERTY = "forwarder.output.queue.connection.user";
  246       public static final String OUTPUT_NAMING_PROPERTY = "forwarder.output.jndi.properties";
  247       public static final String OUTPUT_QUEUE_PROPERTY = "forwarder.output.queue";
  248       public static final String RETRY_DELAY_PROPERTY = "forwarder.retry.delay";
  249   
  250       // Instance Attributes.
  251   
  252       private Queuing input = null;
  253       private ExceptionLogger inputLogger = new ExceptionLogger("Input");
  254       private Properties inputProperties = null;
  255       private Queuing output = null;
  256       private ExceptionLogger outputLogger = new ExceptionLogger("Output");
  257       private Properties outputProperties = null;
  258       private Receiver receiver = null;
  259       private Sender sender = null;
  260       private long retryDelayMillis = 60000;
  261       private int skipsNext = 0;
  262       private int skipsToDo = 0;
  263   
  264       // Nested Classes.
  265   
  266       private class ExceptionLogger {
  267   
  268           // Constructors.
  269   
  270           private ExceptionLogger(String name) {
  271               this.name = name;
  272           }
  273   
  274           // Instance Methods.
  275   
  276           private void log(Exception exception) {
  277               if (this.skipsToDo == 0) {
  278                   getLog().warn(this.name + "exception: " + exception.getMessage());
  279               } else {
  280                    this.skipsToDo = this.skipsNext;
  281                    this.skipsNext <<= 1;
  282                    ++this.skipsNext;
  283               }
  284           }
  285   
  286           private void reset() {
  287               this.skipsToDo = this.skipsNext = 0;
  288           }
  289   
  290           // Instance Attributes.
  291   
  292           private String name;
  293           private int skipsNext = 0;
  294           private int skipsToDo = 0;
  295       }
  296   }
  297   
  298   // $Log: ForwarderThread.java,v $
  299   // Revision 1.17  2003/10/31 02:47:04  sfb
  300   // Added some error protection.
  301   //
  302   
  303   /*
  304    * This is free software; you can redistribute it and/or modify
  305    * it under the terms of the GNU Lesser General Public License
  306    * as published by the Free Software Foundation; either version 2.1
  307    * of the License, or (at your option) any later version.
  308    *
  309    * This software is distributed in the hope that it will be useful,
  310    * but WITHOUT ANY WARRANTY; without even the implied warranty of
  311    * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  312    * Lesser General Public License for more details.
  313    *
  314    * You should have received a copy of the GNU Lesser General Public
  315    * License along with this software; if not, write to the Free Software
  316    * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
  317    */

Save This Page
Home » openjdk-7 » net » sf » rvpf » messaging » [javadoc | source]