1 /*
2 * Copyright (c) 2002-2003 by OpenSymphony
3 * All rights reserved.
4 */
5 package com.opensymphony.oscache.plugins.clustersupport;
6
7 import com.opensymphony.oscache.base.Cache;
8 import com.opensymphony.oscache.base.Config;
9 import com.opensymphony.oscache.base.FinalizationException;
10 import com.opensymphony.oscache.base.InitializationException;
11
12 import org.apache.commons.logging.Log;
13 import org.apache.commons.logging.LogFactory;
14
15 import org.jgroups.Address;
16 import org.jgroups.Channel;
17
18 import org.jgroups.blocks.NotificationBus;
19
20 import java.io.Serializable;
21
22 /**
23 * <p>A concrete implementation of the {@link AbstractBroadcastingListener} based on
24 * the JavaGroups library. This Class uses JavaGroups to broadcast cache flush
25 * messages across a cluster.</p>
26 *
27 * <p>One of the following properties should be configured in <code>oscache.properties</code> for
28 * this listener:
29 * <ul>
30 * <li><b>cache.cluster.multicast.ip</b> - The multicast IP that JavaGroups should use for broadcasting</li>
31 * <li><b>cache.cluster.properties</b> - The JavaGroups channel properties to use. Allows for precise
32 * control over the behaviour of JavaGroups</li>
33 * </ul>
34 * Please refer to the clustering documentation for further details on the configuration of this listener.</p>
35 *
36 * @author <a href="mailto:chris@swebtec.com">Chris Miller</a>
37 */
38 public class JavaGroupsBroadcastingListener extends AbstractBroadcastingListener implements NotificationBus.Consumer {
39 private final static Log log = LogFactory.getLog(JavaGroupsBroadcastingListener.class);
40 private static final String BUS_NAME = "OSCacheBus";
41 private static final String CHANNEL_PROPERTIES = "cache.cluster.properties";
42 private static final String MULTICAST_IP_PROPERTY = "cache.cluster.multicast.ip";
43
44 /**
45 * The first half of the default channel properties. They default channel properties are:
46 * <pre>
47 * UDP(mcast_addr=*.*.*.*;mcast_port=45566;ip_ttl=32;\
48 * mcast_send_buf_size=150000;mcast_recv_buf_size=80000):\
49 * PING(timeout=2000;num_initial_members=3):\
50 * MERGE2(min_interval=5000;max_interval=10000):\
51 * FD_SOCK:VERIFY_SUSPECT(timeout=1500):\
52 * pbcast.NAKACK(gc_lag=50;retransmit_timeout=300,600,1200,2400,4800;max_xmit_size=8192):\
53 * UNICAST(timeout=300,600,1200,2400):\
54 * pbcast.STABLE(desired_avg_gossip=20000):\
55 * FRAG(frag_size=8096;down_thread=false;up_thread=false):\
56 * pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;shun=false;print_local_addr=true)
57 * </pre>
58 *
59 * Where <code>*.*.*.*</code> is the specified multicast IP, which defaults to <code>231.12.21.132</code>.
60 */
61 private static final String DEFAULT_CHANNEL_PROPERTIES_PRE = "UDP(mcast_addr=";
62
63 /**
64 * The second half of the default channel properties. They default channel properties are:
65 * <pre>
66 * UDP(mcast_addr=*.*.*.*;mcast_port=45566;ip_ttl=32;\
67 * mcast_send_buf_size=150000;mcast_recv_buf_size=80000):\
68 * PING(timeout=2000;num_initial_members=3):\
69 * MERGE2(min_interval=5000;max_interval=10000):\
70 * FD_SOCK:VERIFY_SUSPECT(timeout=1500):\
71 * pbcast.NAKACK(gc_lag=50;retransmit_timeout=300,600,1200,2400,4800;max_xmit_size=8192):\
72 * UNICAST(timeout=300,600,1200,2400):\
73 * pbcast.STABLE(desired_avg_gossip=20000):\
74 * FRAG(frag_size=8096;down_thread=false;up_thread=false):\
75 * pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;shun=false;print_local_addr=true)
76 * </pre>
77 *
78 * Where <code>*.*.*.*</code> is the specified multicast IP, which defaults to <code>231.12.21.132</code>.
79 */
80 private static final String DEFAULT_CHANNEL_PROPERTIES_POST = ";mcast_port=45566;ip_ttl=32;mcast_send_buf_size=150000;mcast_recv_buf_size=80000):" + "PING(timeout=2000;num_initial_members=3):MERGE2(min_interval=5000;max_interval=10000):FD_SOCK:VERIFY_SUSPECT(timeout=1500):" + "pbcast.NAKACK(gc_lag=50;retransmit_timeout=300,600,1200,2400,4800;max_xmit_size=8192):UNICAST(timeout=300,600,1200,2400):pbcast.STABLE(desired_avg_gossip=20000):" + "FRAG(frag_size=8096;down_thread=false;up_thread=false):pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;shun=false;print_local_addr=true)";
81 private static final String DEFAULT_MULTICAST_IP = "231.12.21.132";
82 private NotificationBus bus;
83
84 /**
85 * Initializes the broadcasting listener by starting up a JavaGroups notification
86 * bus instance to handle incoming and outgoing messages.
87 *
88 * @param config An OSCache configuration object.
89 * @throws com.opensymphony.oscache.base.InitializationException If this listener has
90 * already been initialized.
91 */
92 public synchronized void initialize(Cache cache, Config config) throws InitializationException {
93 super.initialize(cache, config);
94
95 String properties = config.getProperty(CHANNEL_PROPERTIES);
96 String multicastIP = config.getProperty(MULTICAST_IP_PROPERTY);
97
98 if ((properties == null) && (multicastIP == null)) {
99 multicastIP = DEFAULT_MULTICAST_IP;
100 }
101
102 if (properties == null) {
103 properties = DEFAULT_CHANNEL_PROPERTIES_PRE + multicastIP.trim() + DEFAULT_CHANNEL_PROPERTIES_POST;
104 } else {
105 properties = properties.trim();
106 }
107
108 if (log.isInfoEnabled()) {
109 log.info("Starting a new JavaGroups broadcasting listener with properties=" + properties);
110 }
111
112 try {
113 bus = new NotificationBus(BUS_NAME, properties);
114 bus.start();
115 bus.getChannel().setOpt(Channel.LOCAL, new Boolean(false));
116 bus.setConsumer(this);
117 log.info("JavaGroups clustering support started successfully");
118 } catch (Exception e) {
119 throw new InitializationException("Initialization failed: " + e);
120 }
121 }
122
123 /**
124 * Shuts down the JavaGroups being managed by this listener. This
125 * occurs once the cache is shut down and this listener is no longer
126 * in use.
127 *
128 * @throws com.opensymphony.oscache.base.FinalizationException
129 */
130 public synchronized void finialize() throws FinalizationException {
131 if (log.isInfoEnabled()) {
132 log.info("JavaGroups shutting down...");
133 }
134
135 // It's possible that the notification bus is null (CACHE-154)
136 if (bus != null) {
137 bus.stop();
138 bus = null;
139 } else {
140 log.warn("Notification bus wasn't initialized or finialize was invoked before!");
141 }
142
143 if (log.isInfoEnabled()) {
144 log.info("JavaGroups shutdown complete.");
145 }
146 }
147
148 /**
149 * Uses JavaGroups to broadcast the supplied notification message across the cluster.
150 *
151 * @param message The cluster nofication message to broadcast.
152 */
153 protected void sendNotification(ClusterNotification message) {
154 bus.sendNotification(message);
155 }
156
157 /**
158 * Handles incoming notification messages from JavaGroups. This method should
159 * never be called directly.
160 *
161 * @param serializable The incoming message object. This must be a {@link ClusterNotification}.
162 */
163 public void handleNotification(Serializable serializable) {
164 if (!(serializable instanceof ClusterNotification)) {
165 log.error("An unknown cluster notification message received (class=" + serializable.getClass().getName() + "). Notification ignored.");
166
167 return;
168 }
169
170 handleClusterNotification((ClusterNotification) serializable);
171 }
172
173 /**
174 * We are not using the caching, so we just return something that identifies
175 * us. This method should never be called directly.
176 */
177 public Serializable getCache() {
178 return "JavaGroupsBroadcastingListener: " + bus.getLocalAddress();
179 }
180
181 /**
182 * A callback that is fired when a new member joins the cluster. This
183 * method should never be called directly.
184 *
185 * @param address The address of the member who just joined.
186 */
187 public void memberJoined(Address address) {
188 if (log.isInfoEnabled()) {
189 log.info("A new member at address '" + address + "' has joined the cluster");
190 }
191 }
192
193 /**
194 * A callback that is fired when an existing member leaves the cluster.
195 * This method should never be called directly.
196 *
197 * @param address The address of the member who left.
198 */
199 public void memberLeft(Address address) {
200 if (log.isInfoEnabled()) {
201 log.info("Member at address '" + address + "' left the cluster");
202 }
203 }
204 }