Save This Page
Home » apache-tomcat-6.0.16-src » org.apache » catalina » cluster » tcp » [javadoc | source]
    1   /*
    2    * Copyright 1999,2004-2005 The Apache Software Foundation.
    3    * 
    4    * Licensed under the Apache License, Version 2.0 (the "License"); you may not
    5    * use this file except in compliance with the License. You may obtain a copy of
    6    * the License at
    7    * 
    8    * http://www.apache.org/licenses/LICENSE-2.0
    9    * 
   10    * Unless required by applicable law or agreed to in writing, software
   11    * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
   12    * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
   13    * License for the specific language governing permissions and limitations under
   14    * the License.
   15    */
   16   
   17   package org.apache.catalina.cluster.tcp;
   18   
   19   import java.io.ByteArrayOutputStream;
   20   import java.io.IOException;
   21   import java.io.ObjectOutputStream;
   22   import java.util.HashMap;
   23   import java.util.Iterator;
   24   import java.util.Map;
   25   import java.util.zip.GZIPOutputStream;
   26   
   27   import javax.management.MBeanServer;
   28   import javax.management.ObjectName;
   29   
   30   import org.apache.catalina.Container;
   31   import org.apache.catalina.cluster.ClusterMessage;
   32   import org.apache.catalina.cluster.ClusterSender;
   33   import org.apache.catalina.cluster.Member;
   34   import org.apache.catalina.cluster.util.IDynamicProperty;
   35   import org.apache.catalina.core.StandardHost;
   36   import org.apache.catalina.util.StringManager;
   37   import org.apache.tomcat.util.IntrospectionUtils;
   38   
   39   /**
   40    * Transmit message to ohter cluster members create sender from replicationMode
   41    * type 
   42    * FIXME i18n log messages
   43    * FIXME compress data depends on message type and size 
   44    * FIXME send very big messages at some block see FarmWarDeployer!
   45    * TODO pause and resume senders
   46    * 
   47    * @author Peter Rossbach
   48    * @author Filip Hanik
   49    * @version $Revision: 345567 $ $Date: 2005-11-18 16:07:23 -0500 (Fri, 18 Nov 2005) $
   50    */
   51   public class ReplicationTransmitter implements ClusterSender,IDynamicProperty {
   52       private static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory
   53               .getLog(ReplicationTransmitter.class);
   54   
   55       /**
   56        * The descriptive information about this implementation.
   57        */
   58       private static final String info = "ReplicationTransmitter/3.0";
   59   
   60       /**
   61        * The string manager for this package.
   62        */
   63       protected StringManager sm = StringManager.getManager(Constants.Package);
   64   
   65       private Map map = new HashMap();
   66   
   67       public ReplicationTransmitter() {
   68       }
   69   
   70       /**
   71        * number of transmitted messages>
   72        */
   73       private long nrOfRequests = 0;
   74   
   75       /**
   76        * number of transmitted bytes
   77        */
   78       private long totalBytes = 0;
   79   
   80       /**
   81        * number of failure
   82        */
   83       private long failureCounter = 0;
   84   
   85       /**
   86        * Iteration count for background processing.
   87        */
   88       private int count = 0;
   89   
   90       /**
   91        * Frequency of the check sender keepAlive Socket Status.
   92        */
   93       protected int processSenderFrequency = 2;
   94   
   95       /**
   96        * current sender replication mode
   97        */
   98       private String replicationMode;
   99   
  100       /**
  101        * sender default ackTimeout
  102        */
  103       private long ackTimeout = 15000; //15 seconds by default
  104   
  105       /**
  106        * enabled wait for ack
  107        */
  108       private boolean waitForAck = true;
  109   
  110       /**
  111        * autoConnect sender when next message send
  112        */
  113       private boolean autoConnect = false;
  114   
  115       /**
  116        * Compress message data bytes
  117        */
  118       private boolean compress = false;
  119   
  120       /**
  121        * doTransmitterProcessingStats
  122        */
  123       protected boolean doTransmitterProcessingStats = false;
  124   
  125       /**
  126        * proessingTime
  127        */
  128       protected long processingTime = 0;
  129       
  130       /**
  131        * min proessingTime
  132        */
  133       protected long minProcessingTime = Long.MAX_VALUE ;
  134   
  135       /**
  136        * max proessingTime
  137        */
  138       protected long maxProcessingTime = 0;
  139      
  140       /**
  141        * dynamic sender <code>properties</code>
  142        */
  143       private Map properties = new HashMap();
  144   
  145       /**
  146        * my cluster
  147        */
  148       private SimpleTcpCluster cluster;
  149   
  150       /**
  151        * Transmitter Mbean name
  152        */
  153       private ObjectName objectName;
  154   
  155       // ------------------------------------------------------------- Properties
  156   
  157       /**
  158        * Return descriptive information about this implementation and the
  159        * corresponding version number, in the format
  160        * <code>&lt;description&gt;/&lt;version&gt;</code>.
  161        */
  162       public String getInfo() {
  163           return (info);
  164       }
  165   
  166       /**
  167        * @return Returns the nrOfRequests.
  168        */
  169       public long getNrOfRequests() {
  170           return nrOfRequests;
  171       }
  172   
  173       /**
  174        * @return Returns the totalBytes.
  175        */
  176       public long getTotalBytes() {
  177           return totalBytes;
  178       }
  179   
  180       /**
  181        * @return Returns the failureCounter.
  182        */
  183       public long getFailureCounter() {
  184           return failureCounter;
  185       }
  186   
  187       /**
  188        * current replication mode
  189        * 
  190        * @return The mode
  191        */
  192       public String getReplicationMode() {
  193           return replicationMode;
  194       }
  195   
  196       /**
  197        * set replication Mode (pooled, synchonous, asynchonous, fastasyncqueue)
  198        * 
  199        * @see IDataSenderFactory#validateMode(String)
  200        * @param mode
  201        */
  202       public void setReplicationMode(String mode) {
  203           String msg = IDataSenderFactory.validateMode(mode);
  204           if (msg == null) {
  205               if (log.isDebugEnabled())
  206                   log.debug("Setting replcation mode to " + mode);
  207               this.replicationMode = mode;
  208           } else
  209               throw new IllegalArgumentException(msg);
  210   
  211       }
  212   
  213       /**
  214        * @return Returns the avg processingTime/nrOfRequests.
  215        */
  216       public double getAvgProcessingTime() {
  217           return ((double)processingTime) / nrOfRequests;
  218       }
  219    
  220       /**
  221        * @return Returns the maxProcessingTime.
  222        */
  223       public long getMaxProcessingTime() {
  224           return maxProcessingTime;
  225       }
  226       
  227       /**
  228        * @return Returns the minProcessingTime.
  229        */
  230       public long getMinProcessingTime() {
  231           return minProcessingTime;
  232       }
  233       
  234       /**
  235        * @return Returns the processingTime.
  236        */
  237       public long getProcessingTime() {
  238           return processingTime;
  239       }
  240       
  241       /**
  242        * @return Returns the doTransmitterProcessingStats.
  243        */
  244       public boolean isDoTransmitterProcessingStats() {
  245           return doTransmitterProcessingStats;
  246       }
  247       
  248       /**
  249        * @param doProcessingStats The doTransmitterProcessingStats to set.
  250        */
  251       public void setDoTransmitterProcessingStats(boolean doProcessingStats) {
  252           this.doTransmitterProcessingStats = doProcessingStats;
  253       }
  254    
  255   
  256       /**
  257        * Transmitter ObjectName
  258        * 
  259        * @param name
  260        */
  261       public void setObjectName(ObjectName name) {
  262           objectName = name;
  263       }
  264   
  265       public ObjectName getObjectName() {
  266           return objectName;
  267       }
  268   
  269       /**
  270        * @return Returns the compress.
  271        */
  272       public boolean isCompress() {
  273           return compress;
  274       }
  275   
  276       /**
  277        * @param compressMessageData
  278        *            The compress to set.
  279        */
  280       public void setCompress(boolean compressMessageData) {
  281           this.compress = compressMessageData;
  282       }
  283   
  284       /**
  285        * @return Returns the autoConnect.
  286        */
  287       public boolean isAutoConnect() {
  288           return autoConnect;
  289       }
  290   
  291       /**
  292        * @param autoConnect
  293        *            The autoConnect to set.
  294        */
  295       public void setAutoConnect(boolean autoConnect) {
  296           this.autoConnect = autoConnect;
  297           setProperty("autoConnect", String.valueOf(autoConnect));
  298   
  299       }
  300   
  301       /**
  302        * @return The ack timeout
  303        */
  304       public long getAckTimeout() {
  305           return ackTimeout;
  306       }
  307   
  308       /**
  309        * @param ackTimeout
  310        */
  311       public void setAckTimeout(long ackTimeout) {
  312           this.ackTimeout = ackTimeout;
  313           setProperty("ackTimeout", String.valueOf(ackTimeout));
  314       }
  315   
  316       /**
  317        * @return Returns the waitForAck.
  318        */
  319       public boolean isWaitForAck() {
  320           return waitForAck;
  321       }
  322   
  323       /**
  324        * @param waitForAck
  325        *            The waitForAck to set.
  326        */
  327       public void setWaitForAck(boolean waitForAck) {
  328           this.waitForAck = waitForAck;
  329           setProperty("waitForAck", String.valueOf(waitForAck));
  330       }
  331   
  332       
  333       /**
  334        * @return Returns the processSenderFrequency.
  335        */
  336       public int getProcessSenderFrequency() {
  337           return processSenderFrequency;
  338       }
  339       
  340       /**
  341        * @param processSenderFrequency The processSenderFrequency to set.
  342        */
  343       public void setProcessSenderFrequency(int processSenderFrequency) {
  344           this.processSenderFrequency = processSenderFrequency;
  345       }
  346       
  347       /*
  348        * configured in cluster
  349        * 
  350        * @see org.apache.catalina.cluster.ClusterSender#setCatalinaCluster(org.apache.catalina.cluster.tcp.SimpleTcpCluster)
  351        */
  352       public void setCatalinaCluster(SimpleTcpCluster cluster) {
  353           this.cluster = cluster;
  354   
  355       }
  356   
  357       /**
  358        * @return True if synchronized sender
  359        * @deprecated since version 5.5.7
  360        */
  361       public boolean getIsSenderSynchronized() {
  362           return IDataSenderFactory.SYNC_MODE.equals(replicationMode)
  363                   || IDataSenderFactory.POOLED_SYNC_MODE.equals(replicationMode);
  364       }
  365   
  366       // ------------------------------------------------------------- dynamic
  367       // sender property handling
  368   
  369       /**
  370        * set config attributes with reflect
  371        * 
  372        * @param name
  373        * @param value
  374        */
  375       public void setProperty(String name, Object value) {
  376           if (log.isTraceEnabled())
  377               log.trace(sm.getString("ReplicationTransmitter.setProperty", name,
  378                       value, properties.get(name)));
  379   
  380           properties.put(name, value);
  381       }
  382   
  383       /**
  384        * get current config
  385        * 
  386        * @param key
  387        * @return The property
  388        */
  389       public Object getProperty(String key) {
  390           if (log.isTraceEnabled())
  391               log.trace(sm.getString("ReplicationTransmitter.getProperty", key));
  392           return properties.get(key);
  393       }
  394   
  395       /**
  396        * Get all properties keys
  397        * 
  398        * @return An iterator over the propery name set
  399        */
  400       public Iterator getPropertyNames() {
  401           return properties.keySet().iterator();
  402       }
  403   
  404       /**
  405        * remove a configured property.
  406        * 
  407        * @param key
  408        */
  409       public void removeProperty(String key) {
  410           properties.remove(key);
  411       }
  412   
  413       // ------------------------------------------------------------- public
  414       
  415       /**
  416        * Send data to one member
  417        * FIXME set filtering messages
  418        * @see org.apache.catalina.cluster.ClusterSender#sendMessage(org.apache.catalina.cluster.ClusterMessage, org.apache.catalina.cluster.Member)
  419        */
  420       public void sendMessage(ClusterMessage message, Member member)
  421               throws java.io.IOException {       
  422           long time = 0 ;
  423           if(doTransmitterProcessingStats) {
  424               time = System.currentTimeMillis();
  425           }
  426           try {
  427               ClusterData data = serialize(message);
  428               String key = getKey(member);
  429               IDataSender sender = (IDataSender) map.get(key);
  430               sendMessageData(data, sender);
  431           } finally {
  432               if (doTransmitterProcessingStats) {
  433                   addProcessingStats(time);
  434               }
  435           }
  436       }
  437       
  438       /**
  439        * Send to all senders at same cluster domain as message from address
  440        * @param message Cluster message to send
  441        * @since 5.5.10
  442        */
  443       public void sendMessageClusterDomain(ClusterMessage message) 
  444            throws java.io.IOException {
  445           long time = 0;
  446           if (doTransmitterProcessingStats) {
  447               time = System.currentTimeMillis();
  448           }
  449           try {
  450               String domain = message.getAddress().getDomain();
  451               if(domain == null)
  452                   throw new RuntimeException("Domain at member not set");
  453               ClusterData data = serialize(message);
  454               IDataSender[] senders = getSenders();
  455               for (int i = 0; i < senders.length; i++) {
  456   
  457                   IDataSender sender = senders[i];
  458                   if(domain.equals(sender.getDomain())) {
  459                       try {
  460                           sendMessageData(data, sender);
  461                       } catch (Exception x) {
  462                           if (!sender.getSuspect()) {
  463                               log.warn("Unable to send replicated message to "
  464                                       + sender + ", is server down?", x);
  465                               sender.setSuspect(true);
  466                           }
  467                       }
  468                   }
  469               }
  470           } finally {
  471               if (doTransmitterProcessingStats) {
  472                   addProcessingStats(time);
  473               }
  474           }
  475       
  476       }
  477   
  478       /**
  479        * send message to all senders (broadcast)
  480        * @see org.apache.catalina.cluster.ClusterSender#sendMessage(org.apache.catalina.cluster.ClusterMessage)
  481        */
  482       public void sendMessage(ClusterMessage message)
  483               throws java.io.IOException {
  484           long time = 0;
  485           if (doTransmitterProcessingStats) {
  486               time = System.currentTimeMillis();
  487           }
  488           try {
  489               ClusterData data = serialize(message);
  490               IDataSender[] senders = getSenders();
  491               for (int i = 0; i < senders.length; i++) {
  492   
  493                   IDataSender sender = senders[i];
  494                   try {
  495                       sendMessageData(data, sender);
  496                   } catch (Exception x) {
  497                       if (!sender.getSuspect()) {
  498                           log.warn("Unable to send replicated message to "
  499                                   + sender + ", is server down?", x);
  500                           sender.setSuspect(true);
  501                       }
  502                   }
  503               }
  504           } finally {
  505               if (doTransmitterProcessingStats) {
  506                   addProcessingStats(time);
  507               }
  508           }
  509       }
  510   
  511       /**
  512        * start the sender and register transmitter mbean
  513        * 
  514        * @see org.apache.catalina.cluster.ClusterSender#start()
  515        */
  516       public void start() throws java.io.IOException {
  517           if (cluster != null) {
  518               ObjectName clusterName = cluster.getObjectName();
  519               ObjectName transmitterName = null ;
  520               try {
  521                   MBeanServer mserver = cluster.getMBeanServer();
  522                   Container container = cluster.getContainer();
  523                   String name = clusterName.getDomain() + ":type=ClusterSender";
  524                   if (container instanceof StandardHost) {
  525                       name += ",host=" + clusterName.getKeyProperty("host");
  526                   }
  527                   transmitterName = new ObjectName(name);
  528                   if (mserver.isRegistered(transmitterName)) {
  529                       if (log.isWarnEnabled())
  530                           log.warn(sm.getString(
  531                                   "cluster.mbean.register.allready",
  532                                   transmitterName));
  533                       return;
  534                   }
  535                   setObjectName(transmitterName);
  536                   mserver.registerMBean(cluster.getManagedBean(this),
  537                           getObjectName());
  538                   if(log.isInfoEnabled())
  539                       log.info(sm.getString("ReplicationTransmitter.started",
  540                               clusterName, transmitterName));
  541   
  542               } catch (Exception e) {
  543                   log.warn(e);
  544               }
  545           }
  546       }
  547   
  548       /*
  549        * stop the sender and deregister mbeans (transmitter, senders)
  550        * 
  551        * @see org.apache.catalina.cluster.ClusterSender#stop()
  552        */
  553       public synchronized void stop() {
  554           Iterator i = map.entrySet().iterator();
  555           while (i.hasNext()) {
  556               IDataSender sender = (IDataSender) ((java.util.Map.Entry) i.next())
  557                       .getValue();
  558               try {
  559                   unregisterSenderMBean(sender);
  560                   sender.disconnect();
  561               } catch (Exception x) {
  562               }
  563               i.remove();
  564           }
  565           if (cluster != null && getObjectName() != null) {
  566               try {
  567                   MBeanServer mserver = cluster.getMBeanServer();
  568                   mserver.unregisterMBean(getObjectName());
  569               } catch (Exception e) {
  570                   log.error(e);
  571               }
  572               if(log.isInfoEnabled())
  573                   log.info(sm.getString("ReplicationTransmitter.stopped",
  574                           cluster.getObjectName(), getObjectName()));
  575           }
  576   
  577       }
  578   
  579       /**
  580        * Call transmitter to check for sender socket status
  581        * 
  582        * @see SimpleTcpCluster#backgroundProcess()
  583        */
  584       public void backgroundProcess() {
  585           count = (count + 1) % processSenderFrequency;
  586           if (count == 0) {
  587               checkKeepAlive();
  588           }
  589       }
  590   
  591       /**
  592        * Check all DataSender Socket to close socket at keepAlive mode
  593        * @see DataSender#checkKeepAlive()
  594        */
  595       public void checkKeepAlive() {
  596           if (map.size() > 0) {
  597               java.util.Iterator iter = map.entrySet().iterator();
  598               while (iter.hasNext()) {
  599                   IDataSender sender = (IDataSender) ((java.util.Map.Entry) iter
  600                           .next()).getValue();
  601                   if (sender != null)
  602                       sender.checkKeepAlive();
  603               }
  604           }
  605       }
  606   
  607       /**
  608        * get all current senders
  609        * 
  610        * @return The senders
  611        */
  612       public IDataSender[] getSenders() {
  613           java.util.Iterator iter = map.entrySet().iterator();
  614           IDataSender[] array = new IDataSender[map.size()];
  615           int i = 0;
  616           while (iter.hasNext()) {
  617               IDataSender sender = (IDataSender) ((java.util.Map.Entry) iter
  618                       .next()).getValue();
  619               if (sender != null)
  620                   array[i] = sender;
  621               i++;
  622           }
  623           return array;
  624       }
  625   
  626       /**
  627        * get all current senders
  628        * 
  629        * @return The sender object names
  630        */
  631       public ObjectName[] getSenderObjectNames() {
  632           java.util.Iterator iter = map.entrySet().iterator();
  633           ObjectName array[] = new ObjectName[map.size()];
  634           int i = 0;
  635           while (iter.hasNext()) {
  636               IDataSender sender = (IDataSender) ((java.util.Map.Entry) iter
  637                       .next()).getValue();
  638               if (sender != null)
  639                   array[i] = getSenderObjectName(sender);
  640               i++;
  641           }
  642           return array;
  643       }
  644   
  645       /**
  646        * Reset sender statistics
  647        */
  648       public synchronized void resetStatistics() {
  649           nrOfRequests = 0;
  650           totalBytes = 0;
  651           failureCounter = 0;
  652           processingTime = 0;
  653           minProcessingTime = Long.MAX_VALUE;
  654           maxProcessingTime = 0;
  655       }
  656   
  657       /**
  658        * add new cluster member and create sender ( s. replicationMode) transfer
  659        * current properties to sender
  660        * 
  661        * @see org.apache.catalina.cluster.ClusterSender#add(org.apache.catalina.cluster.Member)
  662        */
  663       public synchronized void add(Member member) {
  664           try {
  665               String key = getKey(member);
  666               if (!map.containsKey(key)) {
  667                   IDataSender sender = IDataSenderFactory.getIDataSender(
  668                           replicationMode, member);
  669                   transferSenderProperty(sender);
  670                   map.put(key, sender);
  671                   registerSenderMBean(member, sender);
  672               }
  673           } catch (java.io.IOException x) {
  674               log.error("Unable to create and add a IDataSender object.", x);
  675           }
  676       }
  677   
  678       /**
  679        * remove sender from transmitter. ( deregister mbean and disconnect sender )
  680        * 
  681        * @see org.apache.catalina.cluster.ClusterSender#remove(org.apache.catalina.cluster.Member)
  682        */
  683       public synchronized void remove(Member member) {
  684           String key = getKey(member);
  685           IDataSender toberemoved = (IDataSender) map.get(key);
  686           if (toberemoved == null)
  687               return;
  688           unregisterSenderMBean(toberemoved);
  689           toberemoved.disconnect();
  690           map.remove(key);
  691   
  692       }
  693   
  694       // ------------------------------------------------------------- protected
  695   
  696       /**
  697        * calc number of requests and transfered bytes. Log stats all 100 requets
  698        * 
  699        * @param length
  700        */
  701       protected synchronized void addStats(int length) {
  702           nrOfRequests++;
  703           totalBytes += length;
  704           if (log.isDebugEnabled() && (nrOfRequests % 100) == 0) {
  705               log.debug("Nr of bytes sent=" + totalBytes + " over "
  706                       + nrOfRequests + "; avg=" + (totalBytes / nrOfRequests)
  707                       + " bytes/request; failures=" + failureCounter);
  708           }
  709   
  710       }
  711   
  712       /**
  713        * Transfer all properties from transmitter to concrete sender
  714        * 
  715        * @param sender
  716        */
  717       protected void transferSenderProperty(IDataSender sender) {
  718           for (Iterator iter = getPropertyNames(); iter.hasNext();) {
  719               String pkey = (String) iter.next();
  720               Object value = getProperty(pkey);
  721               IntrospectionUtils.setProperty(sender, pkey, value.toString());
  722           }
  723       }
  724   
  725       /**
  726        * set unique key to find sender
  727        * 
  728        * @param member
  729        * @return concat member.host:member.port
  730        */
  731       protected String getKey(Member member) {
  732           return member.getHost() + ":" + member.getPort();
  733       }
  734   
  735       /**
  736        * unregsister sendern Mbean
  737        * 
  738        * @see #getSenderObjectName(IDataSender)
  739        * @param sender
  740        */
  741       protected void unregisterSenderMBean(IDataSender sender) {
  742           try {
  743               MBeanServer mserver = cluster.getMBeanServer();
  744               if (mserver != null) {
  745                   mserver.unregisterMBean(getSenderObjectName(sender));
  746               }
  747           } catch (Exception e) {
  748               log.warn(e);
  749           }
  750       }
  751   
  752       /**
  753        * register MBean and check it exist (big problem!)
  754        * 
  755        * @param member
  756        * @param sender
  757        */
  758       protected void registerSenderMBean(Member member, IDataSender sender) {
  759           if (member != null && cluster != null) {
  760               try {
  761                   MBeanServer mserver = cluster.getMBeanServer();
  762                   ObjectName senderName = getSenderObjectName(sender);
  763                   if (mserver.isRegistered(senderName)) {
  764                       if (log.isWarnEnabled())
  765                           log.warn(sm.getString(
  766                                   "cluster.mbean.register.allready", senderName));
  767                       return;
  768                   }
  769                   mserver.registerMBean(cluster.getManagedBean(sender),
  770                           senderName);
  771               } catch (Exception e) {
  772                   log.warn(e);
  773               }
  774           }
  775       }
  776   
  777       
  778       /**
  779        * build sender ObjectName (
  780        * engine.domain:type=IDataSender,host="host",senderAddress="receiver.address",senderPort="port" )
  781        * 
  782        * @param sender
  783        * @return The sender object name
  784        */
  785       protected ObjectName getSenderObjectName(IDataSender sender) {
  786           ObjectName senderName = null;
  787           try {
  788               ObjectName clusterName = cluster.getObjectName();
  789               Container container = cluster.getContainer();
  790               String name = clusterName.getDomain() + ":type=IDataSender";
  791               if (container instanceof StandardHost) {
  792                   name += ",host=" + clusterName.getKeyProperty("host");
  793               }
  794               senderName = new ObjectName(name + ",senderAddress="
  795                       + sender.getAddress().getHostAddress() + ",senderPort="
  796                       + sender.getPort());
  797           } catch (Exception e) {
  798               log.warn(e);
  799           }
  800           return senderName;
  801       }
  802   
  803       /**
  804        * serialize message and add timestamp
  805        * @see GZIPOutputStream
  806        * @param msg cluster message
  807        * @return cluster message as byte array
  808        * @throws IOException
  809        * @since 5.5.10
  810        */
  811       protected ClusterData serialize(ClusterMessage msg) throws IOException {
  812           msg.setTimestamp(System.currentTimeMillis());
  813           ByteArrayOutputStream outs = new ByteArrayOutputStream();
  814           ObjectOutputStream out;
  815           GZIPOutputStream gout = null;
  816           ClusterData data = new ClusterData();
  817           data.setType(msg.getClass().getName());
  818           data.setUniqueId(msg.getUniqueId());
  819           data.setTimestamp(msg.getTimestamp());
  820           data.setCompress(msg.getCompress());
  821           data.setResend(msg.getResend());
  822           // FIXME add Stats how much comress and uncompress messages and bytes are transfered
  823           if ((isCompress() && msg.getCompress() != ClusterMessage.FLAG_FORBIDDEN)
  824                   || msg.getCompress() == ClusterMessage.FLAG_ALLOWED) {
  825               gout = new GZIPOutputStream(outs);
  826               out = new ObjectOutputStream(gout);
  827           } else {
  828               out = new ObjectOutputStream(outs);
  829           }
  830           out.writeObject(msg);
  831           // flush out the gzip stream to byte buffer
  832           if(gout != null) {
  833               gout.flush();
  834               gout.close();
  835           }
  836           data.setMessage(outs.toByteArray());
  837           return data;
  838       }
  839    
  840   
  841       /**
  842        * Send message to concrete sender. If autoConnect is true, check is
  843        * connection broken and the reconnect the complete sender.
  844        * <ul>
  845        * <li>failure the suspect flag is set true. After successfully sending the
  846        * suspect flag is set to false.</li>
  847        * <li>Stats is only update after sussesfull sending</li>
  848        * </ul>
  849        * 
  850        * @param data message Data
  851        * @param sender concrete message sender
  852        * @throws java.io.IOException If an error occurs
  853        */
  854       protected void sendMessageData(ClusterData data,
  855               IDataSender sender) throws java.io.IOException {
  856           if (sender == null)
  857               throw new java.io.IOException(
  858                       "Sender not available. Make sure sender information is available to the ReplicationTransmitter.");
  859           try {
  860               // deprecated not needed DataSender#pushMessage can handle connection
  861               if (autoConnect) {
  862                   synchronized(sender) {
  863                       if(!sender.isConnected())
  864                           sender.connect();
  865                   }
  866               }
  867               sender.sendMessage(data);
  868               sender.setSuspect(false);
  869               addStats(data.getMessage().length);
  870           } catch (Exception x) {
  871               if (log.isWarnEnabled()) {
  872                   if (!sender.getSuspect()) {
  873                       log.warn("Unable to send replicated message, is server down?",x);
  874                   }
  875               }
  876               sender.setSuspect(true);
  877               failureCounter++;
  878           }
  879   
  880       }
  881       /**
  882        * Add processing stats times
  883        * @param startTime
  884        */
  885       protected void addProcessingStats(long startTime) {
  886           long time = System.currentTimeMillis() - startTime ;
  887           if(time < minProcessingTime)
  888               minProcessingTime = time ;
  889           if( time > maxProcessingTime)
  890               maxProcessingTime = time ;
  891           processingTime += time ;
  892       }
  893    
  894    
  895   }

Save This Page
Home » apache-tomcat-6.0.16-src » org.apache » catalina » cluster » tcp » [javadoc | source]