1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18 package org.apache.catalina.ha.tcp;
19
20 import java.beans.PropertyChangeSupport;
21 import java.io.Serializable;
22 import java.util.ArrayList;
23 import java.util.HashMap;
24 import java.util.Iterator;
25 import java.util.List;
26 import java.util.Map;
27
28 import org.apache.catalina.Container;
29 import org.apache.catalina.Context;
30 import org.apache.catalina.Engine;
31 import org.apache.catalina.Host;
32 import org.apache.catalina.Lifecycle;
33 import org.apache.catalina.LifecycleEvent;
34 import org.apache.catalina.LifecycleException;
35 import org.apache.catalina.LifecycleListener;
36 import org.apache.catalina.Manager;
37 import org.apache.catalina.Valve;
38 import org.apache.catalina.ha.CatalinaCluster;
39 import org.apache.catalina.ha.ClusterListener;
40 import org.apache.catalina.ha.ClusterManager;
41 import org.apache.catalina.ha.ClusterMessage;
42 import org.apache.catalina.ha.ClusterValve;
43 import org.apache.catalina.ha.session.DeltaManager;
44 import org.apache.catalina.ha.util.IDynamicProperty;
45 import org.apache.catalina.tribes.Channel;
46 import org.apache.catalina.tribes.ChannelListener;
47 import org.apache.catalina.tribes.Member;
48 import org.apache.catalina.tribes.MembershipListener;
49 import org.apache.catalina.tribes.group.GroupChannel;
50 import org.apache.catalina.util.LifecycleSupport;
51 import org.apache.catalina.util.StringManager;
52 import org.apache.juli.logging.Log;
53 import org.apache.juli.logging.LogFactory;
54 import org.apache.tomcat.util.IntrospectionUtils;
55 import org.apache.catalina.ha.session.ClusterSessionListener;
56 import org.apache.catalina.tribes.group.interceptors.MessageDispatch15Interceptor;
57 import org.apache.catalina.tribes.group.interceptors.TcpFailureDetector;
58 import org.apache.catalina.ha.session.JvmRouteBinderValve;
59 import org.apache.catalina.ha.session.JvmRouteSessionIDBinderListener;
60
61 /**
62 * A <b>Cluster </b> implementation using simple multicast. Responsible for
63 * setting up a cluster and provides callers with a valid multicast
64 * receiver/sender.
65 *
66 * FIXME remove install/remove/start/stop context dummys
67 * FIXME wrote testcases
68 *
69 * @author Filip Hanik
70 * @author Remy Maucherat
71 * @author Peter Rossbach
72 * @version $Revision: 586738 $, $Date: 2007-10-20 16:57:18 +0200 (sam., 20 oct. 2007) $
73 */
74 public class SimpleTcpCluster
75 implements CatalinaCluster, Lifecycle, LifecycleListener, IDynamicProperty,
76 MembershipListener, ChannelListener{
77
78 public static Log log = LogFactory.getLog(SimpleTcpCluster.class);
79
80 // ----------------------------------------------------- Instance Variables
81
82 /**
83 * Descriptive information about this component implementation.
84 */
85 protected static final String info = "SimpleTcpCluster/2.2";
86
87 public static final String BEFORE_MEMBERREGISTER_EVENT = "before_member_register";
88
89 public static final String AFTER_MEMBERREGISTER_EVENT = "after_member_register";
90
91 public static final String BEFORE_MANAGERREGISTER_EVENT = "before_manager_register";
92
93 public static final String AFTER_MANAGERREGISTER_EVENT = "after_manager_register";
94
95 public static final String BEFORE_MANAGERUNREGISTER_EVENT = "before_manager_unregister";
96
97 public static final String AFTER_MANAGERUNREGISTER_EVENT = "after_manager_unregister";
98
99 public static final String BEFORE_MEMBERUNREGISTER_EVENT = "before_member_unregister";
100
101 public static final String AFTER_MEMBERUNREGISTER_EVENT = "after_member_unregister";
102
103 public static final String SEND_MESSAGE_FAILURE_EVENT = "send_message_failure";
104
105 public static final String RECEIVE_MESSAGE_FAILURE_EVENT = "receive_message_failure";
106
107 /**
108 * Group channel.
109 */
110 protected Channel channel = new GroupChannel();
111
112
113 /**
114 * Name for logging purpose
115 */
116 protected String clusterImpName = "SimpleTcpCluster";
117
118 /**
119 * The string manager for this package.
120 */
121 protected StringManager sm = StringManager.getManager(Constants.Package);
122
123 /**
124 * The cluster name to join
125 */
126 protected String clusterName ;
127
128 /**
129 * call Channel.heartbeat() at container background thread
130 * @see org.apache.catalina.tribes.group.GroupChannel#heartbeat()
131 */
132 protected boolean heartbeatBackgroundEnabled =false ;
133
134 /**
135 * The Container associated with this Cluster.
136 */
137 protected Container container = null;
138
139 /**
140 * The lifecycle event support for this component.
141 */
142 protected LifecycleSupport lifecycle = new LifecycleSupport(this);
143
144 /**
145 * Has this component been started?
146 */
147 protected boolean started = false;
148
149 /**
150 * The property change support for this component.
151 */
152 protected PropertyChangeSupport support = new PropertyChangeSupport(this);
153
154 /**
155 * The context name <->manager association for distributed contexts.
156 */
157 protected Map managers = new HashMap();
158
159 protected ClusterManager managerTemplate = new DeltaManager();
160
161 private List valves = new ArrayList();
162
163 private org.apache.catalina.ha.ClusterDeployer clusterDeployer;
164
165 /**
166 * Listeners of messages
167 */
168 protected List clusterListeners = new ArrayList();
169
170 /**
171 * Comment for <code>notifyLifecycleListenerOnFailure</code>
172 */
173 private boolean notifyLifecycleListenerOnFailure = false;
174
175 /**
176 * dynamic sender <code>properties</code>
177 */
178 private Map properties = new HashMap();
179
180 private int channelSendOptions = Channel.SEND_OPTIONS_ASYNCHRONOUS;
181
182 // ------------------------------------------------------------- Properties
183
184 public SimpleTcpCluster() {
185 }
186
187 /**
188 * Return descriptive information about this Cluster implementation and the
189 * corresponding version number, in the format
190 * <code><description>/<version></code>.
191 */
192 public String getInfo() {
193 return (info);
194 }
195
196 /**
197 * Return heartbeat enable flag (default false)
198 * @return the heartbeatBackgroundEnabled
199 */
200 public boolean isHeartbeatBackgroundEnabled() {
201 return heartbeatBackgroundEnabled;
202 }
203
204 /**
205 * enabled that container backgroundThread call heartbeat at channel
206 * @param heartbeatBackgroundEnabled the heartbeatBackgroundEnabled to set
207 */
208 public void setHeartbeatBackgroundEnabled(boolean heartbeatBackgroundEnabled) {
209 this.heartbeatBackgroundEnabled = heartbeatBackgroundEnabled;
210 }
211
212 /**
213 * Set the name of the cluster to join, if no cluster with this name is
214 * present create one.
215 *
216 * @param clusterName
217 * The clustername to join
218 */
219 public void setClusterName(String clusterName) {
220 this.clusterName = clusterName;
221 }
222
223 /**
224 * Return the name of the cluster that this Server is currently configured
225 * to operate within.
226 *
227 * @return The name of the cluster associated with this server
228 */
229 public String getClusterName() {
230 if(clusterName == null && container != null)
231 return container.getName() ;
232 return clusterName;
233 }
234
235 /**
236 * Set the Container associated with our Cluster
237 *
238 * @param container
239 * The Container to use
240 */
241 public void setContainer(Container container) {
242 Container oldContainer = this.container;
243 this.container = container;
244 support.firePropertyChange("container", oldContainer, this.container);
245 }
246
247 /**
248 * Get the Container associated with our Cluster
249 *
250 * @return The Container associated with our Cluster
251 */
252 public Container getContainer() {
253 return (this.container);
254 }
255
256 /**
257 * @return Returns the notifyLifecycleListenerOnFailure.
258 */
259 public boolean isNotifyLifecycleListenerOnFailure() {
260 return notifyLifecycleListenerOnFailure;
261 }
262
263 /**
264 * @param notifyListenerOnFailure
265 * The notifyLifecycleListenerOnFailure to set.
266 */
267 public void setNotifyLifecycleListenerOnFailure(
268 boolean notifyListenerOnFailure) {
269 boolean oldNotifyListenerOnFailure = this.notifyLifecycleListenerOnFailure;
270 this.notifyLifecycleListenerOnFailure = notifyListenerOnFailure;
271 support.firePropertyChange("notifyLifecycleListenerOnFailure",
272 oldNotifyListenerOnFailure,
273 this.notifyLifecycleListenerOnFailure);
274 }
275
276 /**
277 * @deprecated use getManagerTemplate().getClass().getName() instead.
278 * @return String
279 */
280 public String getManagerClassName() {
281 return managerTemplate.getClass().getName();
282 }
283
284 /**
285 * @deprecated use nested <Manager> element inside the cluster config instead.
286 * @param managerClassName String
287 */
288 public void setManagerClassName(String managerClassName) {
289 log.warn("setManagerClassName is deprecated, use nested <Manager> element inside the <Cluster> element instead, this request will be ignored.");
290 }
291
292 /**
293 * Add cluster valve
294 * Cluster Valves are only add to container when cluster is started!
295 * @param valve The new cluster Valve.
296 */
297 public void addValve(Valve valve) {
298 if (valve instanceof ClusterValve && (!valves.contains(valve)))
299 valves.add(valve);
300 }
301
302 /**
303 * get all cluster valves
304 * @return current cluster valves
305 */
306 public Valve[] getValves() {
307 return (Valve[]) valves.toArray(new Valve[valves.size()]);
308 }
309
310 /**
311 * Get the cluster listeners associated with this cluster. If this Array has
312 * no listeners registered, a zero-length array is returned.
313 */
314 public ClusterListener[] findClusterListeners() {
315 if (clusterListeners.size() > 0) {
316 ClusterListener[] listener = new ClusterListener[clusterListeners.size()];
317 clusterListeners.toArray(listener);
318 return listener;
319 } else
320 return new ClusterListener[0];
321
322 }
323
324 /**
325 * add cluster message listener and register cluster to this listener
326 *
327 * @see org.apache.catalina.ha.CatalinaCluster#addClusterListener(org.apache.catalina.ha.MessageListener)
328 */
329 public void addClusterListener(ClusterListener listener) {
330 if (listener != null && !clusterListeners.contains(listener)) {
331 clusterListeners.add(listener);
332 listener.setCluster(this);
333 }
334 }
335
336 /**
337 * remove message listener and deregister Cluster from listener
338 *
339 * @see org.apache.catalina.ha.CatalinaCluster#removeClusterListener(org.apache.catalina.ha.MessageListener)
340 */
341 public void removeClusterListener(ClusterListener listener) {
342 if (listener != null) {
343 clusterListeners.remove(listener);
344 listener.setCluster(null);
345 }
346 }
347
348 /**
349 * get current Deployer
350 */
351 public org.apache.catalina.ha.ClusterDeployer getClusterDeployer() {
352 return clusterDeployer;
353 }
354
355 /**
356 * set a new Deployer, must be set before cluster started!
357 */
358 public void setClusterDeployer(
359 org.apache.catalina.ha.ClusterDeployer clusterDeployer) {
360 this.clusterDeployer = clusterDeployer;
361 }
362
363 public void setChannel(Channel channel) {
364 this.channel = channel;
365 }
366
367 public void setManagerTemplate(ClusterManager managerTemplate) {
368 this.managerTemplate = managerTemplate;
369 }
370
371 public void setChannelSendOptions(int channelSendOptions) {
372 this.channelSendOptions = channelSendOptions;
373 }
374
375 /**
376 * has members
377 */
378 protected boolean hasMembers = false;
379 public boolean hasMembers() {
380 return hasMembers;
381 }
382
383 /**
384 * Get all current cluster members
385 * @return all members or empty array
386 */
387 public Member[] getMembers() {
388 return channel.getMembers();
389 }
390
391 /**
392 * Return the member that represents this node.
393 *
394 * @return Member
395 */
396 public Member getLocalMember() {
397 return channel.getLocalMember(true);
398 }
399
400 // ------------------------------------------------------------- dynamic
401 // manager property handling
402
403 /**
404 * JMX hack to direct use at jconsole
405 *
406 * @param name
407 * @param value
408 */
409 public boolean setProperty(String name, String value) {
410 return setProperty(name, (Object) value);
411 }
412
413 /**
414 * set config attributes with reflect and propagate to all managers
415 *
416 * @param name
417 * @param value
418 */
419 public boolean setProperty(String name, Object value) {
420 properties.put(name, value);
421 return false;
422 }
423
424 /**
425 * get current config
426 *
427 * @param key
428 * @return The property
429 */
430 public Object getProperty(String key) {
431 if (log.isTraceEnabled())
432 log.trace(sm.getString("SimpleTcpCluster.getProperty", key));
433 return properties.get(key);
434 }
435
436 /**
437 * Get all properties keys
438 *
439 * @return An iterator over the property names.
440 */
441 public Iterator getPropertyNames() {
442 return properties.keySet().iterator();
443 }
444
445 /**
446 * remove a configured property.
447 *
448 * @param key
449 */
450 public void removeProperty(String key) {
451 properties.remove(key);
452 }
453
454 /**
455 * transfer properties from cluster configuration to subelement bean.
456 * @param prefix
457 * @param bean
458 */
459 protected void transferProperty(String prefix, Object bean) {
460 if (prefix != null) {
461 for (Iterator iter = getPropertyNames(); iter.hasNext();) {
462 String pkey = (String) iter.next();
463 if (pkey.startsWith(prefix)) {
464 String key = pkey.substring(prefix.length() + 1);
465 Object value = getProperty(pkey);
466 IntrospectionUtils.setProperty(bean, key, value.toString());
467 }
468 }
469 }
470 }
471
472 // --------------------------------------------------------- Public Methods
473
474 /**
475 * @return Returns the managers.
476 */
477 public Map getManagers() {
478 return managers;
479 }
480
481 public Channel getChannel() {
482 return channel;
483 }
484
485 public ClusterManager getManagerTemplate() {
486 return managerTemplate;
487 }
488
489 public int getChannelSendOptions() {
490 return channelSendOptions;
491 }
492
493 /**
494 * Create new Manager without add to cluster (comes with start the manager)
495 *
496 * @param name
497 * Context Name of this manager
498 * @see org.apache.catalina.Cluster#createManager(java.lang.String)
499 * @see #addManager(String, Manager)
500 * @see DeltaManager#start()
501 */
502 public synchronized Manager createManager(String name) {
503 if (log.isDebugEnabled()) log.debug("Creating ClusterManager for context " + name + " using class " + getManagerClassName());
504 Manager manager = null;
505 try {
506 manager = managerTemplate.cloneFromTemplate();
507 ((ClusterManager)manager).setName(name);
508 } catch (Exception x) {
509 log.error("Unable to clone cluster manager, defaulting to org.apache.catalina.ha.session.DeltaManager", x);
510 manager = new org.apache.catalina.ha.session.DeltaManager();
511 } finally {
512 if ( manager != null && (manager instanceof ClusterManager)) ((ClusterManager)manager).setCluster(this);
513 }
514 return manager;
515 }
516
517 public void registerManager(Manager manager) {
518
519 if (! (manager instanceof ClusterManager)) {
520 log.warn("Manager [ " + manager + "] does not implement ClusterManager, addition to cluster has been aborted.");
521 return;
522 }
523 ClusterManager cmanager = (ClusterManager) manager ;
524 cmanager.setDistributable(true);
525 // Notify our interested LifecycleListeners
526 lifecycle.fireLifecycleEvent(BEFORE_MANAGERREGISTER_EVENT, manager);
527 String clusterName = getManagerName(cmanager.getName(), manager);
528 cmanager.setName(clusterName);
529 cmanager.setCluster(this);
530 cmanager.setDefaultMode(false);
531
532 managers.put(clusterName, manager);
533 // Notify our interested LifecycleListeners
534 lifecycle.fireLifecycleEvent(AFTER_MANAGERREGISTER_EVENT, manager);
535 }
536
537 /**
538 * remove an application form cluster replication bus
539 *
540 * @see org.apache.catalina.ha.CatalinaCluster#removeManager(java.lang.String,Manager)
541 */
542 public void removeManager(Manager manager) {
543 if (manager != null && manager instanceof ClusterManager ) {
544 ClusterManager cmgr = (ClusterManager) manager;
545 // Notify our interested LifecycleListeners
546 lifecycle.fireLifecycleEvent(BEFORE_MANAGERUNREGISTER_EVENT,manager);
547 managers.remove(getManagerName(cmgr.getName(),manager));
548 cmgr.setCluster(null);
549 // Notify our interested LifecycleListeners
550 lifecycle.fireLifecycleEvent(AFTER_MANAGERUNREGISTER_EVENT, manager);
551 }
552 }
553
554 /**
555 * @param name
556 * @param manager
557 * @return
558 */
559 public String getManagerName(String name, Manager manager) {
560 String clusterName = name ;
561 if ( clusterName == null ) clusterName = manager.getContainer().getName();
562 if(getContainer() instanceof Engine) {
563 Container context = manager.getContainer() ;
564 if(context != null && context instanceof Context) {
565 Container host = ((Context)context).getParent();
566 if(host != null && host instanceof Host && clusterName!=null && !(clusterName.indexOf("#")>=0))
567 clusterName = host.getName() +"#" + clusterName ;
568 }
569 }
570 return clusterName;
571 }
572
573 /*
574 * Get Manager
575 *
576 * @see org.apache.catalina.ha.CatalinaCluster#getManager(java.lang.String)
577 */
578 public Manager getManager(String name) {
579 return (Manager) managers.get(name);
580 }
581
582 // ------------------------------------------------------ Lifecycle Methods
583
584 /**
585 * Execute a periodic task, such as reloading, etc. This method will be
586 * invoked inside the classloading context of this container. Unexpected
587 * throwables will be caught and logged.
588 * @see org.apache.catalina.ha.deploy.FarmWarDeployer#backgroundProcess()
589 * @see org.apache.catalina.tribes.group.GroupChannel#heartbeat()
590 * @see org.apache.catalina.tribes.group.GroupChannel.HeartbeatThread#run()
591 *
592 */
593 public void backgroundProcess() {
594 if (clusterDeployer != null) clusterDeployer.backgroundProcess();
595
596 //send a heartbeat through the channel
597 if ( isHeartbeatBackgroundEnabled() && channel !=null ) channel.heartbeat();
598 }
599
600 /**
601 * Add a lifecycle event listener to this component.
602 *
603 * @param listener
604 * The listener to add
605 */
606 public void addLifecycleListener(LifecycleListener listener) {
607 lifecycle.addLifecycleListener(listener);
608 }
609
610 /**
611 * Get the lifecycle listeners associated with this lifecycle. If this
612 * Lifecycle has no listeners registered, a zero-length array is returned.
613 */
614 public LifecycleListener[] findLifecycleListeners() {
615
616 return lifecycle.findLifecycleListeners();
617
618 }
619
620 /**
621 * Remove a lifecycle event listener from this component.
622 *
623 * @param listener
624 * The listener to remove
625 */
626 public void removeLifecycleListener(LifecycleListener listener) {
627 lifecycle.removeLifecycleListener(listener);
628 }
629
630 /**
631 * Use as base to handle start/stop/periodic Events from host. Currently
632 * only log the messages as trace level.
633 *
634 * @see org.apache.catalina.LifecycleListener#lifecycleEvent(org.apache.catalina.LifecycleEvent)
635 */
636 public void lifecycleEvent(LifecycleEvent lifecycleEvent) {
637 if (log.isTraceEnabled())
638 log.trace(sm.getString("SimpleTcpCluster.event.log", lifecycleEvent.getType(), lifecycleEvent.getData()));
639 }
640
641 // ------------------------------------------------------ public
642
643 /**
644 * Prepare for the beginning of active use of the public methods of this
645 * component. This method should be called after <code>configure()</code>,
646 * and before any of the public methods of the component are utilized. <BR>
647 * Starts the cluster communication channel, this will connect with the
648 * other nodes in the cluster, and request the current session state to be
649 * transferred to this node.
650 *
651 * @exception IllegalStateException
652 * if this component has already been started
653 * @exception LifecycleException
654 * if this component detects a fatal error that prevents this
655 * component from being used
656 */
657 public void start() throws LifecycleException {
658 if (started)
659 throw new LifecycleException(sm.getString("cluster.alreadyStarted"));
660 if (log.isInfoEnabled()) log.info("Cluster is about to start");
661
662 // Notify our interested LifecycleListeners
663 lifecycle.fireLifecycleEvent(BEFORE_START_EVENT, this);
664 try {
665 checkDefaults();
666 registerClusterValve();
667 channel.addMembershipListener(this);
668 channel.addChannelListener(this);
669 channel.start(channel.DEFAULT);
670 if (clusterDeployer != null) clusterDeployer.start();
671 this.started = true;
672 // Notify our interested LifecycleListeners
673 lifecycle.fireLifecycleEvent(AFTER_START_EVENT, this);
674 } catch (Exception x) {
675 log.error("Unable to start cluster.", x);
676 throw new LifecycleException(x);
677 }
678 }
679
680 protected void checkDefaults() {
681 if ( clusterListeners.size() == 0 ) {
682 addClusterListener(new JvmRouteSessionIDBinderListener());
683 addClusterListener(new ClusterSessionListener());
684 }
685 if ( valves.size() == 0 ) {
686 addValve(new JvmRouteBinderValve());
687 addValve(new ReplicationValve());
688 }
689 if ( clusterDeployer != null ) clusterDeployer.setCluster(this);
690 if ( channel == null ) channel = new GroupChannel();
691 if ( channel instanceof GroupChannel && !((GroupChannel)channel).getInterceptors().hasNext()) {
692 channel.addInterceptor(new MessageDispatch15Interceptor());
693 channel.addInterceptor(new TcpFailureDetector());
694 }
695 }
696
697 /**
698 * register all cluster valve to host or engine
699 * @throws Exception
700 * @throws ClassNotFoundException
701 */
702 protected void registerClusterValve() throws Exception {
703 if(container != null ) {
704 for (Iterator iter = valves.iterator(); iter.hasNext();) {
705 ClusterValve valve = (ClusterValve) iter.next();
706 if (log.isDebugEnabled())
707 log.debug("Invoking addValve on " + getContainer()
708 + " with class=" + valve.getClass().getName());
709 if (valve != null) {
710 IntrospectionUtils.callMethodN(getContainer(), "addValve",
711 new Object[] { valve },
712 new Class[] { org.apache.catalina.Valve.class });
713
714 }
715 valve.setCluster(this);
716 }
717 }
718 }
719
720 /**
721 * unregister all cluster valve to host or engine
722 * @throws Exception
723 * @throws ClassNotFoundException
724 */
725 protected void unregisterClusterValve() throws Exception {
726 for (Iterator iter = valves.iterator(); iter.hasNext();) {
727 ClusterValve valve = (ClusterValve) iter.next();
728 if (log.isDebugEnabled())
729 log.debug("Invoking removeValve on " + getContainer()
730 + " with class=" + valve.getClass().getName());
731 if (valve != null) {
732 IntrospectionUtils.callMethodN(getContainer(), "removeValve",
733 new Object[] { valve }, new Class[] { org.apache.catalina.Valve.class });
734 }
735 valve.setCluster(this);
736 }
737 }
738
739 /**
740 * Gracefully terminate the active cluster component.<br/>
741 * This will disconnect the cluster communication channel, stop the
742 * listener and deregister the valves from host or engine.<br/><br/>
743 * <b>Note:</b><br/>The sub elements receiver, sender, membership,
744 * listener or valves are not removed. You can easily start the cluster again.
745 *
746 * @exception IllegalStateException
747 * if this component has not been started
748 * @exception LifecycleException
749 * if this component detects a fatal error that needs to be
750 * reported
751 */
752 public void stop() throws LifecycleException {
753
754 if (!started)
755 throw new IllegalStateException(sm.getString("cluster.notStarted"));
756 // Notify our interested LifecycleListeners
757 lifecycle.fireLifecycleEvent(BEFORE_STOP_EVENT, this);
758
759 if (clusterDeployer != null) clusterDeployer.stop();
760 this.managers.clear();
761 try {
762 if ( clusterDeployer != null ) clusterDeployer.setCluster(null);
763 channel.stop(Channel.DEFAULT);
764 channel.removeChannelListener(this);
765 channel.removeMembershipListener(this);
766 this.unregisterClusterValve();
767 } catch (Exception x) {
768 log.error("Unable to stop cluster valve.", x);
769 }
770 started = false;
771 // Notify our interested LifecycleListeners
772 lifecycle.fireLifecycleEvent(AFTER_STOP_EVENT, this);
773 }
774
775
776
777
778 /**
779 * send message to all cluster members
780 * @param msg message to transfer
781 *
782 * @see org.apache.catalina.ha.CatalinaCluster#send(org.apache.catalina.ha.ClusterMessage)
783 */
784 public void send(ClusterMessage msg) {
785 send(msg, null);
786 }
787
788 /**
789 * send message to all cluster members same cluster domain
790 *
791 * @see org.apache.catalina.ha.CatalinaCluster#send(org.apache.catalina.ha.ClusterMessage)
792 */
793 public void sendClusterDomain(ClusterMessage msg) {
794 send(msg,null);
795 }
796
797
798 /**
799 * send a cluster message to one member
800 *
801 * @param msg message to transfer
802 * @param dest Receiver member
803 * @see org.apache.catalina.ha.CatalinaCluster#send(org.apache.catalina.ha.ClusterMessage,
804 * org.apache.catalina.ha.Member)
805 */
806 public void send(ClusterMessage msg, Member dest) {
807 try {
808 msg.setAddress(getLocalMember());
809 if (dest != null) {
810 if (!getLocalMember().equals(dest)) {
811 channel.send(new Member[] {dest}, msg,channelSendOptions);
812 } else
813 log.error("Unable to send message to local member " + msg);
814 } else {
815 if (channel.getMembers().length>0)
816 channel.send(channel.getMembers(),msg,channelSendOptions);
817 else if (log.isDebugEnabled())
818 log.debug("No members in cluster, ignoring message:"+msg);
819 }
820 } catch (Exception x) {
821 log.error("Unable to send message through cluster sender.", x);
822 }
823 }
824
825 /**
826 * New cluster member is registered
827 *
828 * @see org.apache.catalina.ha.MembershipListener#memberAdded(org.apache.catalina.ha.Member)
829 */
830 public void memberAdded(Member member) {
831 try {
832 hasMembers = channel.hasMembers();
833 if (log.isInfoEnabled()) log.info("Replication member added:" + member);
834 // Notify our interested LifecycleListeners
835 lifecycle.fireLifecycleEvent(BEFORE_MEMBERREGISTER_EVENT, member);
836 // Notify our interested LifecycleListeners
837 lifecycle.fireLifecycleEvent(AFTER_MEMBERREGISTER_EVENT, member);
838 } catch (Exception x) {
839 log.error("Unable to connect to replication system.", x);
840 }
841
842 }
843
844 /**
845 * Cluster member is gone
846 *
847 * @see org.apache.catalina.ha.MembershipListener#memberDisappeared(org.apache.catalina.ha.Member)
848 */
849 public void memberDisappeared(Member member) {
850 try {
851 hasMembers = channel.hasMembers();
852 if (log.isInfoEnabled()) log.info("Received member disappeared:" + member);
853 // Notify our interested LifecycleListeners
854 lifecycle.fireLifecycleEvent(BEFORE_MEMBERUNREGISTER_EVENT, member);
855 // Notify our interested LifecycleListeners
856 lifecycle.fireLifecycleEvent(AFTER_MEMBERUNREGISTER_EVENT, member);
857 } catch (Exception x) {
858 log.error("Unable remove cluster node from replication system.", x);
859 }
860 }
861
862 // --------------------------------------------------------- receiver
863 // messages
864
865 /**
866 * notify all listeners from receiving a new message is not ClusterMessage
867 * emitt Failure Event to LifecylceListener
868 *
869 * @param message
870 * receveived Message
871 */
872 public boolean accept(Serializable msg, Member sender) {
873 return (msg instanceof ClusterMessage);
874 }
875
876
877 public void messageReceived(Serializable message, Member sender) {
878 ClusterMessage fwd = (ClusterMessage)message;
879 fwd.setAddress(sender);
880 messageReceived(fwd);
881 }
882
883 public void messageReceived(ClusterMessage message) {
884
885 long start = 0;
886 if (log.isDebugEnabled() && message != null)
887 log.debug("Assuming clocks are synched: Replication for "
888 + message.getUniqueId() + " took="
889 + (System.currentTimeMillis() - (message).getTimestamp())
890 + " ms.");
891
892 //invoke all the listeners
893 boolean accepted = false;
894 if (message != null) {
895 for (Iterator iter = clusterListeners.iterator(); iter.hasNext();) {
896 ClusterListener listener = (ClusterListener) iter.next();
897 if (listener.accept(message)) {
898 accepted = true;
899 listener.messageReceived(message);
900 }
901 }
902 }
903 if (!accepted && log.isDebugEnabled()) {
904 if (notifyLifecycleListenerOnFailure) {
905 Member dest = message.getAddress();
906 // Notify our interested LifecycleListeners
907 lifecycle.fireLifecycleEvent(RECEIVE_MESSAGE_FAILURE_EVENT,
908 new SendMessageData(message, dest, null));
909 }
910 log.debug("Message " + message.toString() + " from type "
911 + message.getClass().getName()
912 + " transfered but no listener registered");
913 }
914 return;
915 }
916
917 // --------------------------------------------------------- Logger
918
919 public Log getLogger() {
920 return log;
921 }
922
923
924
925
926 // ------------------------------------------------------------- deprecated
927
928 /**
929 *
930 * @see org.apache.catalina.Cluster#setProtocol(java.lang.String)
931 */
932 public void setProtocol(String protocol) {
933 }
934
935 /**
936 * @see org.apache.catalina.Cluster#getProtocol()
937 */
938 public String getProtocol() {
939 return null;
940 }
941 }