Save This Page
Home » oscache-2.4.1-full » com.opensymphony.oscache.plugins » clustersupport » [javadoc | source]
    1   /*
    2    * Copyright (c) 2002-2003 by OpenSymphony
    3    * All rights reserved.
    4    */
    5   package com.opensymphony.oscache.plugins.clustersupport;
    6   
    7   import com.opensymphony.oscache.base.Cache;
    8   import com.opensymphony.oscache.base.Config;
    9   import com.opensymphony.oscache.base.FinalizationException;
   10   import com.opensymphony.oscache.base.InitializationException;
   11   
   12   import org.apache.commons.logging.Log;
   13   import org.apache.commons.logging.LogFactory;
   14   
   15   import org.jgroups.Address;
   16   import org.jgroups.Channel;
   17   
   18   import org.jgroups.blocks.NotificationBus;
   19   
   20   import java.io.Serializable;
   21   
   22   /**
   23    * <p>A concrete implementation of the {@link AbstractBroadcastingListener} based on
   24    * the JavaGroups library. This Class uses JavaGroups to broadcast cache flush
   25    * messages across a cluster.</p>
   26    *
   27    * <p>One of the following properties should be configured in <code>oscache.properties</code> for
   28    * this listener:
   29    * <ul>
   30    * <li><b>cache.cluster.multicast.ip</b> - The multicast IP that JavaGroups should use for broadcasting</li>
   31    * <li><b>cache.cluster.properties</b> - The JavaGroups channel properties to use. Allows for precise
   32    * control over the behaviour of JavaGroups</li>
   33    * </ul>
   34    * Please refer to the clustering documentation for further details on the configuration of this listener.</p>
   35    *
   36    * @author <a href="&#109;a&#105;&#108;&#116;&#111;:chris&#64;swebtec.&#99;&#111;&#109;">Chris Miller</a>
   37    */
   38   public class JavaGroupsBroadcastingListener extends AbstractBroadcastingListener implements NotificationBus.Consumer {
   39       private final static Log log = LogFactory.getLog(JavaGroupsBroadcastingListener.class);
   40       private static final String BUS_NAME = "OSCacheBus";
   41       private static final String CHANNEL_PROPERTIES = "cache.cluster.properties";
   42       private static final String MULTICAST_IP_PROPERTY = "cache.cluster.multicast.ip";
   43   
   44       /**
   45       * The first half of the default channel properties. They default channel properties are:
   46       * <pre>
   47       * UDP(mcast_addr=*.*.*.*;mcast_port=45566;ip_ttl=32;\
   48       * mcast_send_buf_size=150000;mcast_recv_buf_size=80000):\
   49       * PING(timeout=2000;num_initial_members=3):\
   50       * MERGE2(min_interval=5000;max_interval=10000):\
   51       * FD_SOCK:VERIFY_SUSPECT(timeout=1500):\
   52       * pbcast.NAKACK(gc_lag=50;retransmit_timeout=300,600,1200,2400,4800;max_xmit_size=8192):\
   53       * UNICAST(timeout=300,600,1200,2400):\
   54       * pbcast.STABLE(desired_avg_gossip=20000):\
   55       * FRAG(frag_size=8096;down_thread=false;up_thread=false):\
   56       * pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;shun=false;print_local_addr=true)
   57       * </pre>
   58       *
   59       * Where <code>*.*.*.*</code> is the specified multicast IP, which defaults to <code>231.12.21.132</code>.
   60       */
   61       private static final String DEFAULT_CHANNEL_PROPERTIES_PRE = "UDP(mcast_addr=";
   62   
   63       /**
   64       * The second half of the default channel properties. They default channel properties are:
   65       * <pre>
   66       * UDP(mcast_addr=*.*.*.*;mcast_port=45566;ip_ttl=32;\
   67       * mcast_send_buf_size=150000;mcast_recv_buf_size=80000):\
   68       * PING(timeout=2000;num_initial_members=3):\
   69       * MERGE2(min_interval=5000;max_interval=10000):\
   70       * FD_SOCK:VERIFY_SUSPECT(timeout=1500):\
   71       * pbcast.NAKACK(gc_lag=50;retransmit_timeout=300,600,1200,2400,4800;max_xmit_size=8192):\
   72       * UNICAST(timeout=300,600,1200,2400):\
   73       * pbcast.STABLE(desired_avg_gossip=20000):\
   74       * FRAG(frag_size=8096;down_thread=false;up_thread=false):\
   75       * pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;shun=false;print_local_addr=true)
   76       * </pre>
   77       *
   78       * Where <code>*.*.*.*</code> is the specified multicast IP, which defaults to <code>231.12.21.132</code>.
   79       */
   80       private static final String DEFAULT_CHANNEL_PROPERTIES_POST = ";mcast_port=45566;ip_ttl=32;mcast_send_buf_size=150000;mcast_recv_buf_size=80000):" + "PING(timeout=2000;num_initial_members=3):MERGE2(min_interval=5000;max_interval=10000):FD_SOCK:VERIFY_SUSPECT(timeout=1500):" + "pbcast.NAKACK(gc_lag=50;retransmit_timeout=300,600,1200,2400,4800;max_xmit_size=8192):UNICAST(timeout=300,600,1200,2400):pbcast.STABLE(desired_avg_gossip=20000):" + "FRAG(frag_size=8096;down_thread=false;up_thread=false):pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;shun=false;print_local_addr=true)";
   81       private static final String DEFAULT_MULTICAST_IP = "231.12.21.132";
   82       private NotificationBus bus;
   83   
   84       /**
   85       * Initializes the broadcasting listener by starting up a JavaGroups notification
   86       * bus instance to handle incoming and outgoing messages.
   87       *
   88       * @param config An OSCache configuration object.
   89       * @throws com.opensymphony.oscache.base.InitializationException If this listener has
   90       * already been initialized.
   91       */
   92       public synchronized void initialize(Cache cache, Config config) throws InitializationException {
   93           super.initialize(cache, config);
   94   
   95           String properties = config.getProperty(CHANNEL_PROPERTIES);
   96           String multicastIP = config.getProperty(MULTICAST_IP_PROPERTY);
   97   
   98           if ((properties == null) && (multicastIP == null)) {
   99               multicastIP = DEFAULT_MULTICAST_IP;
  100           }
  101   
  102           if (properties == null) {
  103               properties = DEFAULT_CHANNEL_PROPERTIES_PRE + multicastIP.trim() + DEFAULT_CHANNEL_PROPERTIES_POST;
  104           } else {
  105               properties = properties.trim();
  106           }
  107   
  108           if (log.isInfoEnabled()) {
  109               log.info("Starting a new JavaGroups broadcasting listener with properties=" + properties);
  110           }
  111   
  112           try {
  113               bus = new NotificationBus(BUS_NAME, properties);
  114               bus.start();
  115               bus.getChannel().setOpt(Channel.LOCAL, new Boolean(false));
  116               bus.setConsumer(this);
  117               log.info("JavaGroups clustering support started successfully");
  118           } catch (Exception e) {
  119               throw new InitializationException("Initialization failed: " + e);
  120           }
  121       }
  122   
  123       /**
  124       * Shuts down the JavaGroups being managed by this listener. This
  125       * occurs once the cache is shut down and this listener is no longer
  126       * in use.
  127       *
  128       * @throws com.opensymphony.oscache.base.FinalizationException
  129       */
  130       public synchronized void finialize() throws FinalizationException {
  131           if (log.isInfoEnabled()) {
  132               log.info("JavaGroups shutting down...");
  133           }
  134   
  135           // It's possible that the notification bus is null (CACHE-154)
  136           if (bus != null) {
  137               bus.stop();
  138               bus = null;
  139           } else {
  140               log.warn("Notification bus wasn't initialized or finialize was invoked before!");
  141           }
  142   
  143           if (log.isInfoEnabled()) {
  144               log.info("JavaGroups shutdown complete.");
  145           }
  146       }
  147   
  148       /**
  149       * Uses JavaGroups to broadcast the supplied notification message across the cluster.
  150       *
  151       * @param message The cluster nofication message to broadcast.
  152       */
  153       protected void sendNotification(ClusterNotification message) {
  154           bus.sendNotification(message);
  155       }
  156   
  157       /**
  158       * Handles incoming notification messages from JavaGroups. This method should
  159       * never be called directly.
  160       *
  161       * @param serializable The incoming message object. This must be a {@link ClusterNotification}.
  162       */
  163       public void handleNotification(Serializable serializable) {
  164           if (!(serializable instanceof ClusterNotification)) {
  165               log.error("An unknown cluster notification message received (class=" + serializable.getClass().getName() + "). Notification ignored.");
  166   
  167               return;
  168           }
  169   
  170           handleClusterNotification((ClusterNotification) serializable);
  171       }
  172   
  173       /**
  174       * We are not using the caching, so we just return something that identifies
  175       * us. This method should never be called directly.
  176       */
  177       public Serializable getCache() {
  178           return "JavaGroupsBroadcastingListener: " + bus.getLocalAddress();
  179       }
  180   
  181       /**
  182       * A callback that is fired when a new member joins the cluster. This
  183       * method should never be called directly.
  184       *
  185       * @param address The address of the member who just joined.
  186       */
  187       public void memberJoined(Address address) {
  188           if (log.isInfoEnabled()) {
  189               log.info("A new member at address '" + address + "' has joined the cluster");
  190           }
  191       }
  192   
  193       /**
  194       * A callback that is fired when an existing member leaves the cluster.
  195       * This method should never be called directly.
  196       *
  197       * @param address The address of the member who left.
  198       */
  199       public void memberLeft(Address address) {
  200           if (log.isInfoEnabled()) {
  201               log.info("Member at address '" + address + "' left the cluster");
  202           }
  203       }
  204   }

Save This Page
Home » oscache-2.4.1-full » com.opensymphony.oscache.plugins » clustersupport » [javadoc | source]