1 /*
2 * Copyright 1999,2004-2005 The Apache Software Foundation.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 package org.apache.catalina.cluster.tcp;
18
19 import java.beans.PropertyChangeSupport;
20 import java.io.IOException;
21 import java.net.URL;
22 import java.util.ArrayList;
23 import java.util.Date;
24 import java.util.HashMap;
25 import java.util.Iterator;
26 import java.util.List;
27 import java.util.Map;
28
29 import javax.management.MBeanServer;
30 import javax.management.MBeanServerFactory;
31 import javax.management.ObjectName;
32 import javax.management.modelmbean.ModelMBean;
33
34 import org.apache.catalina.Container;
35 import org.apache.catalina.Context;
36 import org.apache.catalina.Engine;
37 import org.apache.catalina.Host;
38 import org.apache.catalina.Lifecycle;
39 import org.apache.catalina.LifecycleEvent;
40 import org.apache.catalina.LifecycleException;
41 import org.apache.catalina.LifecycleListener;
42 import org.apache.catalina.Manager;
43 import org.apache.catalina.Valve;
44 import org.apache.catalina.cluster.CatalinaCluster;
45 import org.apache.catalina.cluster.ClusterManager;
46 import org.apache.catalina.cluster.ClusterMessage;
47 import org.apache.catalina.cluster.ClusterReceiver;
48 import org.apache.catalina.cluster.ClusterSender;
49 import org.apache.catalina.cluster.ClusterValve;
50 import org.apache.catalina.cluster.Member;
51 import org.apache.catalina.cluster.MembershipListener;
52 import org.apache.catalina.cluster.MembershipService;
53 import org.apache.catalina.cluster.MessageListener;
54 import org.apache.catalina.cluster.mcast.McastService;
55 import org.apache.catalina.cluster.session.ClusterSessionListener;
56 import org.apache.catalina.cluster.session.DeltaManager;
57 import org.apache.catalina.cluster.util.IDynamicProperty;
58 import org.apache.catalina.core.StandardEngine;
59 import org.apache.catalina.core.StandardHost;
60 import org.apache.catalina.util.LifecycleSupport;
61 import org.apache.catalina.util.StringManager;
62 import org.apache.commons.logging.Log;
63 import org.apache.commons.logging.LogFactory;
64 import org.apache.commons.modeler.ManagedBean;
65 import org.apache.commons.modeler.Registry;
66 import org.apache.tomcat.util.IntrospectionUtils;
67
68 /**
69 * A <b>Cluster </b> implementation using simple multicast. Responsible for
70 * setting up a cluster and provides callers with a valid multicast
71 * receiver/sender.
72 *
73 * FIXME remove install/remove/start/stop context dummys
74 * FIXME wrote testcases
75 *
76 * @author Filip Hanik
77 * @author Remy Maucherat
78 * @author Peter Rossbach
79 * @version $Revision: 345224 $, $Date: 2005-11-17 06:26:31 -0500 (Thu, 17 Nov 2005) $
80 */
81 public class SimpleTcpCluster implements CatalinaCluster, Lifecycle,
82 MembershipListener, LifecycleListener, IDynamicProperty {
83
84 public static Log log = LogFactory.getLog(SimpleTcpCluster.class);
85
86 // ----------------------------------------------------- Instance Variables
87
88 /**
89 * Descriptive information about this component implementation.
90 */
91 protected static final String info = "SimpleTcpCluster/2.2";
92
93 public static final String BEFORE_MEMBERREGISTER_EVENT = "before_member_register";
94
95 public static final String AFTER_MEMBERREGISTER_EVENT = "after_member_register";
96
97 public static final String BEFORE_MANAGERREGISTER_EVENT = "before_manager_register";
98
99 public static final String AFTER_MANAGERREGISTER_EVENT = "after_manager_register";
100
101 public static final String BEFORE_MANAGERUNREGISTER_EVENT = "before_manager_unregister";
102
103 public static final String AFTER_MANAGERUNREGISTER_EVENT = "after_manager_unregister";
104
105 public static final String BEFORE_MEMBERUNREGISTER_EVENT = "before_member_unregister";
106
107 public static final String AFTER_MEMBERUNREGISTER_EVENT = "after_member_unregister";
108
109 public static final String SEND_MESSAGE_FAILURE_EVENT = "send_message_failure";
110
111 public static final String RECEIVE_MESSAGE_FAILURE_EVENT = "receive_message_failure";
112
113 /**
114 * the service that provides the membership
115 */
116 protected MembershipService membershipService = null;
117
118 /**
119 * Name for logging purpose
120 */
121 protected String clusterImpName = "SimpleTcpCluster";
122
123 /**
124 * The string manager for this package.
125 */
126 protected StringManager sm = StringManager.getManager(Constants.Package);
127
128 /**
129 * The cluster name to join
130 */
131 protected String clusterName ;
132
133 /**
134 * The Container associated with this Cluster.
135 */
136 protected Container container = null;
137
138 /**
139 * The lifecycle event support for this component.
140 */
141 protected LifecycleSupport lifecycle = new LifecycleSupport(this);
142
143 /**
144 * Globale MBean Server
145 */
146 private MBeanServer mserver = null;
147
148 /**
149 * Current Catalina Registry
150 */
151 private Registry registry = null;
152
153 /**
154 * Has this component been started?
155 */
156 protected boolean started = false;
157
158 /**
159 * The property change support for this component.
160 */
161 protected PropertyChangeSupport support = new PropertyChangeSupport(this);
162
163 /**
164 * The context name <->manager association for distributed contexts.
165 */
166 protected Map managers = new HashMap();
167
168 //sort members by alive time
169 protected MemberComparator memberComparator = new MemberComparator();
170
171 private String managerClassName = "org.apache.catalina.cluster.session.DeltaManager";
172
173 /**
174 * Sender to send data with
175 */
176 private org.apache.catalina.cluster.ClusterSender clusterSender;
177
178 /**
179 * Receiver to register call back with
180 */
181 private org.apache.catalina.cluster.ClusterReceiver clusterReceiver;
182
183 private List valves = new ArrayList();
184
185 private org.apache.catalina.cluster.ClusterDeployer clusterDeployer;
186
187 private boolean defaultMode = true ;
188
189 /**
190 * Listeners of messages
191 */
192 protected List clusterListeners = new ArrayList();
193
194 /**
195 * Comment for <code>notifyLifecycleListenerOnFailure</code>
196 */
197 private boolean notifyLifecycleListenerOnFailure = false;
198
199 private ObjectName objectName = null;
200
201 /**
202 * dynamic sender <code>properties</code>
203 */
204 private Map properties = new HashMap();
205
206 /**
207 * The cluster log device name to log at level info
208 */
209 private String clusterLogName = "org.apache.catalina.cluster.tcp.SimpleTcpCluster";
210
211 private boolean doClusterLog = false;
212
213 private Log clusterLog = null;
214
215 // ------------------------------------------------------------- Properties
216
217 public SimpleTcpCluster() {
218 }
219
220 /**
221 * Return descriptive information about this Cluster implementation and the
222 * corresponding version number, in the format
223 * <code><description>/<version></code>.
224 */
225 public String getInfo() {
226 return (info);
227 }
228
229 /**
230 * Set the name of the cluster to join, if no cluster with this name is
231 * present create one.
232 *
233 * @param clusterName
234 * The clustername to join
235 */
236 public void setClusterName(String clusterName) {
237 this.clusterName = clusterName;
238 }
239
240 /**
241 * Return the name of the cluster that this Server is currently configured
242 * to operate within.
243 *
244 * @return The name of the cluster associated with this server
245 */
246 public String getClusterName() {
247 if(clusterName == null && container != null)
248 return container.getName() ;
249 return clusterName;
250 }
251
252 /**
253 * Set the Container associated with our Cluster
254 *
255 * @param container
256 * The Container to use
257 */
258 public void setContainer(Container container) {
259 Container oldContainer = this.container;
260 this.container = container;
261 support.firePropertyChange("container", oldContainer, this.container);
262 }
263
264 /**
265 * Get the Container associated with our Cluster
266 *
267 * @return The Container associated with our Cluster
268 */
269 public Container getContainer() {
270 return (this.container);
271 }
272
273 /**
274 * @return Returns the notifyLifecycleListenerOnFailure.
275 */
276 public boolean isNotifyLifecycleListenerOnFailure() {
277 return notifyLifecycleListenerOnFailure;
278 }
279
280 /**
281 * @param notifyListenerOnFailure
282 * The notifyLifecycleListenerOnFailure to set.
283 */
284 public void setNotifyLifecycleListenerOnFailure(
285 boolean notifyListenerOnFailure) {
286 boolean oldNotifyListenerOnFailure = this.notifyLifecycleListenerOnFailure;
287 this.notifyLifecycleListenerOnFailure = notifyListenerOnFailure;
288 support.firePropertyChange("notifyLifecycleListenerOnFailure",
289 oldNotifyListenerOnFailure,
290 this.notifyLifecycleListenerOnFailure);
291 }
292
293 /**
294 * @return Returns the defaultMode.
295 */
296 public boolean isDefaultMode() {
297 return defaultMode;
298 }
299
300 /**
301 * @param defaultMode The defaultMode to set.
302 */
303 public void setDefaultMode(boolean defaultMode) {
304 this.defaultMode = defaultMode;
305 }
306
307 public String getManagerClassName() {
308 if(managerClassName != null)
309 return managerClassName;
310 return (String)getProperty("manager.className");
311 }
312
313 public void setManagerClassName(String managerClassName) {
314 this.managerClassName = managerClassName;
315 }
316
317 public ClusterSender getClusterSender() {
318 return clusterSender;
319 }
320
321 public void setClusterSender(ClusterSender clusterSender) {
322 this.clusterSender = clusterSender;
323 }
324
325 public ClusterReceiver getClusterReceiver() {
326 return clusterReceiver;
327 }
328
329 public void setClusterReceiver(ClusterReceiver clusterReceiver) {
330 this.clusterReceiver = clusterReceiver;
331 }
332
333 public MembershipService getMembershipService() {
334 return membershipService;
335 }
336
337 public void setMembershipService(MembershipService membershipService) {
338 this.membershipService = membershipService;
339 }
340
341 /**
342 * Add cluster valve
343 * Cluster Valves are only add to container when cluster is started!
344 * @param valve The new cluster Valve.
345 */
346 public void addValve(Valve valve) {
347 if (valve instanceof ClusterValve)
348 valves.add(valve);
349 }
350
351 /**
352 * get all cluster valves
353 * @return current cluster valves
354 */
355 public Valve[] getValves() {
356 return (Valve[]) valves.toArray(new Valve[valves.size()]);
357 }
358
359 /**
360 * Get the cluster listeners associated with this cluster. If this Array has
361 * no listeners registered, a zero-length array is returned.
362 */
363 public MessageListener[] findClusterListeners() {
364 if (clusterListeners.size() > 0) {
365 MessageListener[] listener = new MessageListener[clusterListeners
366 .size()];
367 clusterListeners.toArray(listener);
368 return listener;
369 } else
370 return new MessageListener[0];
371
372 }
373
374 /**
375 * add cluster message listener and register cluster to this listener
376 *
377 * @see org.apache.catalina.cluster.CatalinaCluster#addClusterListener(org.apache.catalina.cluster.MessageListener)
378 */
379 public void addClusterListener(MessageListener listener) {
380 if (listener != null && !clusterListeners.contains(listener)) {
381 clusterListeners.add(listener);
382 listener.setCluster(this);
383 }
384 }
385
386 /**
387 * remove message listener and deregister Cluster from listener
388 *
389 * @see org.apache.catalina.cluster.CatalinaCluster#removeClusterListener(org.apache.catalina.cluster.MessageListener)
390 */
391 public void removeClusterListener(MessageListener listener) {
392 if (listener != null) {
393 clusterListeners.remove(listener);
394 listener.setCluster(null);
395 }
396 }
397
398 public org.apache.catalina.cluster.ClusterDeployer getClusterDeployer() {
399 return clusterDeployer;
400 }
401
402 public void setClusterDeployer(
403 org.apache.catalina.cluster.ClusterDeployer clusterDeployer) {
404 this.clusterDeployer = clusterDeployer;
405 }
406
407 /**
408 * Get all current cluster members
409 * @return all members or empty array
410 */
411 public Member[] getMembers() {
412 Member[] members = membershipService.getMembers();
413 if(members != null) {
414 //sort by alive time
415 java.util.Arrays.sort(members, memberComparator);
416 } else
417 members = new Member[0];
418 return members;
419 }
420
421 /**
422 * Return the member that represents this node.
423 *
424 * @return Member
425 */
426 public Member getLocalMember() {
427 return membershipService.getLocalMember();
428 }
429
430 // ------------------------------------------------------------- dynamic
431 // manager property handling
432
433 /**
434 * JMX hack to direct use at jconsole
435 *
436 * @param name
437 * @param value
438 */
439 public void setProperty(String name, String value) {
440 setProperty(name, (Object) value);
441 }
442
443 /**
444 * set config attributes with reflect and propagate to all managers
445 *
446 * @param name
447 * @param value
448 */
449 public void setProperty(String name, Object value) {
450 if (log.isTraceEnabled())
451 log.trace(sm.getString("SimpleTcpCluster.setProperty", name, value,
452 properties.get(name)));
453
454 properties.put(name, value);
455 if(started) {
456 // FIXME Hmm, is that correct when some DeltaManagers are direct configured inside Context?
457 // Why we not support it for other elements, like sender, receiver or membership?
458 // Must we restart element after change?
459 if (name.startsWith("manager")) {
460 String key = name.substring("manager".length() + 1);
461 String pvalue = value.toString();
462 for (Iterator iter = managers.values().iterator(); iter.hasNext();) {
463 Manager manager = (Manager) iter.next();
464 if(manager instanceof DeltaManager && ((ClusterManager) manager).isDefaultMode()) {
465 IntrospectionUtils.setProperty(manager, key, pvalue );
466 }
467 }
468 }
469 }
470 }
471
472 /**
473 * get current config
474 *
475 * @param key
476 * @return The property
477 */
478 public Object getProperty(String key) {
479 if (log.isTraceEnabled())
480 log.trace(sm.getString("SimpleTcpCluster.getProperty", key));
481 return properties.get(key);
482 }
483
484 /**
485 * Get all properties keys
486 *
487 * @return An iterator over the property names.
488 */
489 public Iterator getPropertyNames() {
490 return properties.keySet().iterator();
491 }
492
493 /**
494 * remove a configured property.
495 *
496 * @param key
497 */
498 public void removeProperty(String key) {
499 properties.remove(key);
500 }
501
502 /**
503 * transfer properties from cluster configuration to subelement bean.
504 * @param prefix
505 * @param bean
506 */
507 protected void transferProperty(String prefix, Object bean) {
508 if (prefix != null) {
509 for (Iterator iter = getPropertyNames(); iter.hasNext();) {
510 String pkey = (String) iter.next();
511 if (pkey.startsWith(prefix)) {
512 String key = pkey.substring(prefix.length() + 1);
513 Object value = getProperty(pkey);
514 IntrospectionUtils.setProperty(bean, key, value.toString());
515 }
516 }
517 }
518 }
519
520 // --------------------------------------------------------- Public Methods
521
522 /**
523 * @return Returns the managers.
524 */
525 public Map getManagers() {
526 return managers;
527 }
528
529 /**
530 * Create new Manager without add to cluster (comes with start the manager)
531 *
532 * @param name
533 * Context Name of this manager
534 * @see org.apache.catalina.Cluster#createManager(java.lang.String)
535 * @see #addManager(String, Manager)
536 * @see DeltaManager#start()
537 */
538 public synchronized Manager createManager(String name) {
539 if (log.isDebugEnabled())
540 log.debug("Creating ClusterManager for context " + name
541 + " using class " + getManagerClassName());
542 Manager manager = null;
543 try {
544 manager = (Manager) getClass().getClassLoader().loadClass(
545 getManagerClassName()).newInstance();
546 } catch (Exception x) {
547 log.error("Unable to load class for replication manager", x);
548 manager = new org.apache.catalina.cluster.session.DeltaManager();
549 } finally {
550 if(manager != null) {
551 manager.setDistributable(true);
552 if (manager instanceof ClusterManager) {
553 ClusterManager cmanager = (ClusterManager) manager ;
554 cmanager.setDefaultMode(true);
555 cmanager.setName(getManagerName(name,manager));
556 cmanager.setCluster(this);
557 }
558 }
559 }
560 return manager;
561 }
562
563 /**
564 * remove an application form cluster replication bus
565 *
566 * @see org.apache.catalina.cluster.CatalinaCluster#removeManager(java.lang.String,Manager)
567 */
568 public void removeManager(String name,Manager manager) {
569 if (manager != null) {
570 // Notify our interested LifecycleListeners
571 lifecycle.fireLifecycleEvent(BEFORE_MANAGERUNREGISTER_EVENT,
572 manager);
573 managers.remove(getManagerName(name,manager));
574 if (manager instanceof ClusterManager)
575 ((ClusterManager) manager).setCluster(null);
576 // Notify our interested LifecycleListeners
577 lifecycle
578 .fireLifecycleEvent(AFTER_MANAGERUNREGISTER_EVENT, manager);
579 }
580 }
581
582 /**
583 * add an application to cluster replication bus
584 *
585 * @param name
586 * of the context
587 * @param manager
588 * manager to register
589 * @see org.apache.catalina.cluster.CatalinaCluster#addManager(java.lang.String,
590 * org.apache.catalina.Manager)
591 */
592 public void addManager(String name, Manager manager) {
593 if (!manager.getDistributable()) {
594 log.warn("Manager with name " + name
595 + " is not distributable, can't add as cluster manager");
596 return;
597 }
598 // Notify our interested LifecycleListeners
599 lifecycle.fireLifecycleEvent(BEFORE_MANAGERREGISTER_EVENT, manager);
600 String clusterName = getManagerName(name, manager);
601 if (manager instanceof ClusterManager) {
602 ClusterManager cmanager = (ClusterManager) manager ;
603 cmanager.setName(clusterName);
604 cmanager.setCluster(this);
605 if(cmanager.isDefaultMode())
606 transferProperty("manager",cmanager);
607 }
608 managers.put(clusterName, manager);
609 // Notify our interested LifecycleListeners
610 lifecycle.fireLifecycleEvent(AFTER_MANAGERREGISTER_EVENT, manager);
611 }
612
613 /**
614 * @param name
615 * @param manager
616 * @return
617 */
618 private String getManagerName(String name, Manager manager) {
619 String clusterName = name ;
620 if(getContainer() instanceof Engine) {
621 Container context = manager.getContainer() ;
622 if(context != null && context instanceof Context) {
623 Container host = ((Context)context).getParent();
624 if(host != null && host instanceof Host)
625 clusterName = host.getName() + name ;
626 }
627 }
628 return clusterName;
629 }
630
631 /*
632 * Get Manager
633 *
634 * @see org.apache.catalina.cluster.CatalinaCluster#getManager(java.lang.String)
635 */
636 public Manager getManager(String name) {
637 return (Manager) managers.get(name);
638 }
639
640
641 // ------------------------------------------------------ Lifecycle Methods
642
643 /**
644 * Execute a periodic task, such as reloading, etc. This method will be
645 * invoked inside the classloading context of this container. Unexpected
646 * throwables will be caught and logged.
647 * @see org.apache.catalina.cluster.deploy.FarmWarDeployer#backgroundProcess()
648 * @see ReplicationTransmitter#backgroundProcess()
649 */
650 public void backgroundProcess() {
651 if (clusterDeployer != null)
652 clusterDeployer.backgroundProcess();
653 if (clusterSender != null)
654 clusterSender.backgroundProcess();
655 }
656
657 /**
658 * Add a lifecycle event listener to this component.
659 *
660 * @param listener
661 * The listener to add
662 */
663 public void addLifecycleListener(LifecycleListener listener) {
664 lifecycle.addLifecycleListener(listener);
665 }
666
667 /**
668 * Get the lifecycle listeners associated with this lifecycle. If this
669 * Lifecycle has no listeners registered, a zero-length array is returned.
670 */
671 public LifecycleListener[] findLifecycleListeners() {
672
673 return lifecycle.findLifecycleListeners();
674
675 }
676
677 /**
678 * Remove a lifecycle event listener from this component.
679 *
680 * @param listener
681 * The listener to remove
682 */
683 public void removeLifecycleListener(LifecycleListener listener) {
684 lifecycle.removeLifecycleListener(listener);
685 }
686
687 /**
688 * Use as base to handle start/stop/periodic Events from host. Currently
689 * only log the messages as trace level.
690 *
691 * @see org.apache.catalina.LifecycleListener#lifecycleEvent(org.apache.catalina.LifecycleEvent)
692 */
693 public void lifecycleEvent(LifecycleEvent lifecycleEvent) {
694 if (log.isTraceEnabled())
695 log.trace(sm.getString("SimpleTcpCluster.event.log", lifecycleEvent
696 .getType(), lifecycleEvent.getData()));
697 }
698
699 // ------------------------------------------------------ public
700
701 /**
702 * Prepare for the beginning of active use of the public methods of this
703 * component. This method should be called after <code>configure()</code>,
704 * and before any of the public methods of the component are utilized. <BR>
705 * Starts the cluster communication channel, this will connect with the
706 * other nodes in the cluster, and request the current session state to be
707 * transferred to this node.
708 *
709 * @exception IllegalStateException
710 * if this component has already been started
711 * @exception LifecycleException
712 * if this component detects a fatal error that prevents this
713 * component from being used
714 */
715 public void start() throws LifecycleException {
716 if (started)
717 throw new LifecycleException(sm.getString("cluster.alreadyStarted"));
718 if (log.isInfoEnabled())
719 log.info("Cluster is about to start");
720 getClusterLog();
721 // Notify our interested LifecycleListeners
722 lifecycle.fireLifecycleEvent(BEFORE_START_EVENT, this);
723 try {
724 if(isDefaultMode() && valves.size() == 0) {
725 createDefaultClusterValves() ;
726 }
727 registerClusterValve();
728 registerMBeans();
729 // setup the default cluster session listener (DeltaManager support)
730 if(isDefaultMode() && clusterListeners.size() == 0) {
731 createDefaultClusterListener();
732 }
733 // setup the default cluster Receiver
734 if(isDefaultMode() && clusterReceiver == null) {
735 createDefaultClusterReceiver();
736 }
737 // setup the default cluster sender
738 if(isDefaultMode() && clusterSender == null) {
739 createDefaultClusterSender();
740 }
741 // start the receiver.
742 if(clusterReceiver != null) {
743 clusterReceiver.setSendAck(clusterSender.isWaitForAck());
744 clusterReceiver.setCompress(clusterSender.isCompress());
745 clusterReceiver.setCatalinaCluster(this);
746 clusterReceiver.start();
747 }
748
749 // start the sender.
750 if(clusterSender != null && clusterReceiver != null) {
751 clusterSender.setCatalinaCluster(this);
752 clusterSender.start();
753 }
754
755 // start the membership service.
756 if(isDefaultMode() && membershipService == null) {
757 createDefaultMembershipService();
758 }
759
760 if(membershipService != null && clusterReceiver != null) {
761 membershipService.setLocalMemberProperties(clusterReceiver
762 .getHost(), clusterReceiver.getPort());
763 membershipService.addMembershipListener(this);
764 membershipService.setCatalinaCluster(this);
765 membershipService.start();
766 // start the deployer.
767 try {
768 if (clusterDeployer != null) {
769 clusterDeployer.setCluster(this);
770 clusterDeployer.start();
771 }
772 } catch (Throwable x) {
773 log.fatal("Unable to retrieve the container deployer. Cluster deployment disabled.",x);
774 }
775 }
776 this.started = true;
777 // Notify our interested LifecycleListeners
778 lifecycle.fireLifecycleEvent(AFTER_START_EVENT, this);
779 } catch (Exception x) {
780 log.error("Unable to start cluster.", x);
781 throw new LifecycleException(x);
782 }
783 }
784
785 /**
786 * Create default membership service:
787 * <pre>
788 * <Membership
789 * className="org.apache.catalina.cluster.mcast.McastService"
790 * mcastAddr="228.0.0.4"
791 * mcastPort="8012"
792 * mcastFrequency="1000"
793 * mcastDropTime="30000"/>
794 * </pre>
795 */
796 protected void createDefaultMembershipService() {
797 if (log.isInfoEnabled()) {
798 log.info(sm.getString(
799 "SimpleTcpCluster.default.addMembershipService",
800 getClusterName()));
801 }
802
803 McastService mService= new McastService();
804 mService.setMcastAddr("228.0.0.4");
805 mService.setMcastPort(8012);
806 mService.setMcastFrequency(1000);
807 mService.setMcastDropTime(30000);
808 transferProperty("service",mService);
809 setMembershipService(mService);
810 }
811
812
813 /**
814 * Create default cluster sender
815 * <pre>
816 * <Sender
817 * className="org.apache.catalina.cluster.tcp.ReplicationTransmitter"
818 * replicationMode="fastasyncqueue"
819 * doTransmitterProcessingStats="true"
820 * doProcessingStats="true"/>
821 * </pre>
822 */
823 protected void createDefaultClusterSender() {
824 if (log.isInfoEnabled()) {
825 log.info(sm.getString(
826 "SimpleTcpCluster.default.addClusterSender",
827 getClusterName()));
828 }
829 ReplicationTransmitter sender= new ReplicationTransmitter();
830 sender.setReplicationMode("fastasyncqueue");
831 sender.setDoTransmitterProcessingStats(true);
832 sender.setProperty("doProcessingStats", "true");
833 transferProperty("sender",sender);
834 setClusterSender(sender);
835 }
836
837 /**
838 * Create default receiver:
839 * <pre>
840 * <Receiver
841 * className="org.apache.catalina.cluster.tcp.SocketReplicationListener"
842 * tcpListenAddress="auto"
843 * tcpListenPort="8015"
844 * tcpListenMaxPort="8019"
845 * doReceivedProcessingStats="true"
846 * />
847 * </pre>
848 */
849 protected void createDefaultClusterReceiver() {
850 if (log.isInfoEnabled()) {
851 log.info(sm.getString(
852 "SimpleTcpCluster.default.addClusterReceiver",
853 getClusterName()));
854 }
855 SocketReplicationListener receiver= new SocketReplicationListener();
856 receiver.setTcpListenAddress("auto");
857 receiver.setDoReceivedProcessingStats(true);
858 receiver.setTcpListenPort(8015);
859 receiver.setTcpListenMaxPort(8019);
860 transferProperty("receiver",receiver);
861 setClusterReceiver(receiver);
862
863 }
864
865 /**
866 * Create default session cluster listener:
867 * <pre>
868 * <ClusterListener
869 * className="org.apache.catalina.cluster.session.ClusterSessionListener" />
870 * </pre>
871 */
872 protected void createDefaultClusterListener() {
873 if (log.isInfoEnabled()) {
874 log.info(sm.getString(
875 "SimpleTcpCluster.default.addClusterListener",
876 getClusterName()));
877 }
878 ClusterSessionListener listener = new ClusterSessionListener();
879 transferProperty("listener",listener);
880 addClusterListener(listener);
881
882 }
883
884 /**
885 * Create default ReplicationValve
886 * <pre>
887 * <Valve
888 * className="org.apache.catalina.cluster.tcp.ReplicationValve"
889 * filter=".*\.gif;.*\.js;.*\.css;.*\.png;.*\.jpeg;.*\.jpg;.*\.htm;.*\.html;.*\.txt;"
890 * primaryIndicator="true" />
891 * </pre>
892 */
893 protected void createDefaultClusterValves() {
894 if (log.isInfoEnabled()) {
895 log.info(sm.getString(
896 "SimpleTcpCluster.default.addClusterValves",
897 getClusterName()));
898 }
899 ReplicationValve valve= new ReplicationValve() ;
900 valve.setFilter(".*\\.gif;.*\\.js;.*\\.css;.*\\.png;.*\\.jpeg;.*\\.jpg;.*\\.htm;.*\\.html;.*\\.txt;");
901 valve.setPrimaryIndicator(true);
902 transferProperty("valve",valve);
903 addValve(valve);
904
905 }
906
907 /**
908 * register all cluster valve to host or engine
909 * @throws Exception
910 * @throws ClassNotFoundException
911 */
912 protected void registerClusterValve() throws Exception {
913 for (Iterator iter = valves.iterator(); iter.hasNext();) {
914 ClusterValve valve = (ClusterValve) iter.next();
915 if (log.isDebugEnabled())
916 log.debug("Invoking addValve on " + getContainer()
917 + " with class=" + valve.getClass().getName());
918 if (valve != null) {
919 IntrospectionUtils.callMethodN(getContainer(), "addValve",
920 new Object[] { valve }, new Class[] { org.apache.catalina.Valve.class });
921
922 }
923 valve.setCluster(this);
924 }
925 }
926
927 /**
928 * unregister all cluster valve to host or engine
929 * @throws Exception
930 * @throws ClassNotFoundException
931 */
932 protected void unregisterClusterValve() throws Exception {
933 for (Iterator iter = valves.iterator(); iter.hasNext();) {
934 ClusterValve valve = (ClusterValve) iter.next();
935 if (log.isDebugEnabled())
936 log.debug("Invoking removeValve on " + getContainer()
937 + " with class=" + valve.getClass().getName());
938 if (valve != null) {
939 IntrospectionUtils.callMethodN(getContainer(), "removeValve",
940 new Object[] { valve }, new Class[] { org.apache.catalina.Valve.class });
941 }
942 valve.setCluster(this);
943 }
944 }
945
946 /**
947 * Gracefully terminate the active cluster component.<br/>
948 * This will disconnect the cluster communication channel, stop the
949 * listener and deregister the valves from host or engine.<br/><br/>
950 * <b>Note:</b><br/>The sub elements receiver, sender, membership,
951 * listener or valves are not removed. You can easily start the cluster again.
952 *
953 * @exception IllegalStateException
954 * if this component has not been started
955 * @exception LifecycleException
956 * if this component detects a fatal error that needs to be
957 * reported
958 */
959 public void stop() throws LifecycleException {
960
961 if (!started)
962 throw new IllegalStateException(sm.getString("cluster.notStarted"));
963 // Notify our interested LifecycleListeners
964 lifecycle.fireLifecycleEvent(BEFORE_STOP_EVENT, this);
965
966 if (clusterDeployer != null) {
967 clusterDeployer.stop();
968 }
969 // FIXME remove registered managers!!
970 if(membershipService != null) {
971 membershipService.stop();
972 membershipService.removeMembershipListener();
973 }
974 if(clusterSender != null) {
975 try {
976 clusterSender.stop();
977 } catch (Exception x) {
978 log.error("Unable to stop cluster sender.", x);
979 }
980 }
981 if(clusterReceiver != null ){
982 try {
983 clusterReceiver.stop();
984 clusterReceiver.setCatalinaCluster(null);
985 } catch (Exception x) {
986 log.error("Unable to stop cluster receiver.", x);
987 }
988 }
989 unregisterMBeans();
990 try {
991 unregisterClusterValve();
992 } catch (Exception x) {
993 log.error("Unable to stop cluster valve.", x);
994 }
995 started = false;
996 // Notify our interested LifecycleListeners
997 lifecycle.fireLifecycleEvent(AFTER_STOP_EVENT, this);
998 clusterLog = null ;
999 }
1000
1001 /**
1002 * send message to all cluster members same cluster domain
1003 *
1004 * @see org.apache.catalina.cluster.CatalinaCluster#send(org.apache.catalina.cluster.ClusterMessage)
1005 */
1006 public void sendClusterDomain(ClusterMessage msg) {
1007 long start = 0;
1008 if (doClusterLog)
1009 start = System.currentTimeMillis();
1010 try {
1011 msg.setAddress(membershipService.getLocalMember());
1012 clusterSender.sendMessageClusterDomain(msg);
1013 } catch (Exception x) {
1014 if (notifyLifecycleListenerOnFailure) {
1015 // Notify our interested LifecycleListeners
1016 lifecycle.fireLifecycleEvent(SEND_MESSAGE_FAILURE_EVENT,
1017 new SendMessageData(msg, null, x));
1018 }
1019 log.error("Unable to send message through cluster sender.", x);
1020 }
1021 if (doClusterLog)
1022 logSendMessage(msg, start, null);
1023 }
1024
1025
1026 /**
1027 * send message to all cluster members
1028 * @param msg message to transfer
1029 *
1030 * @see org.apache.catalina.cluster.CatalinaCluster#send(org.apache.catalina.cluster.ClusterMessage)
1031 */
1032 public void send(ClusterMessage msg) {
1033 send(msg, null);
1034 }
1035
1036 /**
1037 * send a cluster message to one member (very usefull JMX method for remote scripting)
1038 *
1039 * @param msg message to transfer
1040 * @param dest Receiver member with name
1041 * @see org.apache.catalina.cluster.CatalinaCluster#send(org.apache.catalina.cluster.ClusterMessage,
1042 * org.apache.catalina.cluster.Member)
1043 * @see McastService#findMemberByName(String)
1044 */
1045 public void sendToMember(ClusterMessage msg, String dest) {
1046 Member member = getMembershipService().findMemberByName(dest);
1047 if (member != null) {
1048 send(msg, member);
1049 } else {
1050 log.error("sendToMember: member " + dest + " not found!");
1051 }
1052 }
1053
1054 /**
1055 * send a cluster message to one member
1056 *
1057 * @param msg message to transfer
1058 * @param dest Receiver member
1059 * @see org.apache.catalina.cluster.CatalinaCluster#send(org.apache.catalina.cluster.ClusterMessage,
1060 * org.apache.catalina.cluster.Member)
1061 */
1062 public void send(ClusterMessage msg, Member dest) {
1063 long start = 0;
1064 if (doClusterLog)
1065 start = System.currentTimeMillis();
1066 try {
1067 msg.setAddress(membershipService.getLocalMember());
1068 if (dest != null) {
1069 if (!membershipService.getLocalMember().equals(dest)) {
1070 clusterSender.sendMessage(msg, dest);
1071 } else
1072 log.error("Unable to send message to local member " + msg);
1073 } else {
1074 clusterSender.sendMessage(msg);
1075 }
1076 } catch (Exception x) {
1077 if (notifyLifecycleListenerOnFailure) {
1078 // Notify our interested LifecycleListeners
1079 lifecycle.fireLifecycleEvent(SEND_MESSAGE_FAILURE_EVENT,
1080 new SendMessageData(msg, dest, x));
1081 }
1082 log.error("Unable to send message through cluster sender.", x);
1083 }
1084 if (doClusterLog)
1085 logSendMessage(msg, start, dest);
1086 }
1087
1088 /**
1089 * New cluster member is registered
1090 *
1091 * @see org.apache.catalina.cluster.MembershipListener#memberAdded(org.apache.catalina.cluster.Member)
1092 */
1093 public void memberAdded(Member member) {
1094 try {
1095 if (log.isInfoEnabled())
1096 log.info("Replication member added:" + member);
1097 // Notify our interested LifecycleListeners
1098 lifecycle.fireLifecycleEvent(BEFORE_MEMBERREGISTER_EVENT, member);
1099 clusterSender.add(member);
1100 // Notify our interested LifecycleListeners
1101 lifecycle.fireLifecycleEvent(AFTER_MEMBERREGISTER_EVENT, member);
1102 } catch (Exception x) {
1103 log.error("Unable to connect to replication system.", x);
1104 }
1105
1106 }
1107
1108 /**
1109 * Cluster member is gone
1110 *
1111 * @see org.apache.catalina.cluster.MembershipListener#memberDisappeared(org.apache.catalina.cluster.Member)
1112 */
1113 public void memberDisappeared(Member member) {
1114 if (log.isInfoEnabled())
1115 log.info("Received member disappeared:" + member);
1116 try {
1117 // Notify our interested LifecycleListeners
1118 lifecycle.fireLifecycleEvent(BEFORE_MEMBERUNREGISTER_EVENT, member);
1119 clusterSender.remove(member);
1120 // Notify our interested LifecycleListeners
1121 lifecycle.fireLifecycleEvent(AFTER_MEMBERUNREGISTER_EVENT, member);
1122 } catch (Exception x) {
1123 log.error("Unable remove cluster node from replication system.", x);
1124 }
1125
1126 }
1127
1128 // --------------------------------------------------------- receiver
1129 // messages
1130
1131 /**
1132 * notify all listeners from receiving a new message is not ClusterMessage
1133 * emitt Failure Event to LifecylceListener
1134 *
1135 * @param message
1136 * receveived Message
1137 */
1138 public void receive(ClusterMessage message) {
1139
1140 long start = 0;
1141 if (doClusterLog)
1142 start = System.currentTimeMillis();
1143 if (log.isDebugEnabled() && message != null)
1144 log.debug("Assuming clocks are synched: Replication for "
1145 + message.getUniqueId() + " took="
1146 + (System.currentTimeMillis() - (message).getTimestamp())
1147 + " ms.");
1148
1149 //invoke all the listeners
1150 boolean accepted = false;
1151 if (message != null) {
1152 for (Iterator iter = clusterListeners.iterator(); iter.hasNext();) {
1153 MessageListener listener = (MessageListener) iter.next();
1154 if (listener.accept(message)) {
1155 accepted = true;
1156 listener.messageReceived(message);
1157 }
1158 }
1159 }
1160 if (!accepted && log.isDebugEnabled()) {
1161 if (notifyLifecycleListenerOnFailure) {
1162 Member dest = message.getAddress();
1163 // Notify our interested LifecycleListeners
1164 lifecycle.fireLifecycleEvent(RECEIVE_MESSAGE_FAILURE_EVENT,
1165 new SendMessageData(message, dest, null));
1166 }
1167 log.debug("Message " + message.toString() + " from type "
1168 + message.getClass().getName()
1169 + " transfered but no listener registered");
1170 }
1171 if (doClusterLog)
1172 logReceiveMessage(message, start, accepted);
1173 }
1174
1175 // --------------------------------------------------------- Logger
1176
1177 /**
1178 * @return Returns the clusterLogName.
1179 */
1180 public String getClusterLogName() {
1181 return clusterLogName;
1182 }
1183
1184 /**
1185 * @param clusterLogName The clusterLogName to set.
1186 */
1187 public void setClusterLogName(String clusterLogName) {
1188 this.clusterLogName = clusterLogName;
1189 }
1190
1191 /**
1192 * @return Returns the doClusterLog.
1193 */
1194 public boolean isDoClusterLog() {
1195 return doClusterLog;
1196 }
1197
1198 /**
1199 * @param doClusterLog The doClusterLog to set.
1200 */
1201 public void setDoClusterLog(boolean doClusterLog) {
1202 this.doClusterLog = doClusterLog;
1203 }
1204 public Log getLogger() {
1205 return log;
1206 }
1207
1208 public Log getClusterLog() {
1209 if (clusterLog == null && clusterLogName != null
1210 && !"".equals(clusterLogName))
1211 clusterLog = LogFactory.getLog(clusterLogName);
1212
1213 return clusterLog;
1214 }
1215
1216 /**
1217 * log received message to cluster transfer log
1218 * @param message
1219 * @param start
1220 * @param accepted
1221 */
1222 protected void logReceiveMessage(ClusterMessage message, long start,
1223 boolean accepted) {
1224 if (clusterLog != null && clusterLog.isInfoEnabled()) {
1225 clusterLog.info(sm.getString("SimpleTcpCluster.log.receive", new Object[] {
1226 new Date(start),
1227 new Long(System.currentTimeMillis() - start),
1228 message.getAddress().getHost(),
1229 new Integer(message.getAddress().getPort()),
1230 message.getUniqueId(), new Boolean(accepted) }));
1231 }
1232 }
1233
1234 /**
1235 * log sended message to cluster transfer log
1236 * @param message
1237 * @param start
1238 * @param dest
1239 */
1240 protected void logSendMessage(ClusterMessage message, long start,
1241 Member dest) {
1242 if (clusterLog != null && clusterLog.isInfoEnabled()) {
1243 if (dest != null) {
1244 clusterLog.info(sm.getString("SimpleTcpCluster.log.send",
1245 new Object[] { new Date(start),
1246 new Long(System.currentTimeMillis() - start),
1247 dest.getHost(), new Integer(dest.getPort()),
1248 message.getUniqueId() }));
1249 } else {
1250 clusterLog.info(sm.getString("SimpleTcpCluster.log.send.all",
1251 new Object[] { new Date(start),
1252 new Long(System.currentTimeMillis() - start),
1253 message.getUniqueId() }));
1254 }
1255 }
1256 }
1257
1258 // --------------------------------------------- JMX MBeans
1259
1260 /**
1261 * register Means at cluster.
1262 */
1263 protected void registerMBeans() {
1264 try {
1265 getMBeanServer();
1266 String domain = mserver.getDefaultDomain();
1267 String name = ":type=Cluster";
1268 if (container instanceof StandardHost) {
1269 domain = ((StandardHost) container).getDomain();
1270 name += ",host=" + container.getName();
1271 } else {
1272 if (container instanceof StandardEngine) {
1273 domain = ((StandardEngine) container).getDomain();
1274 }
1275 }
1276 ObjectName clusterName = new ObjectName(domain + name);
1277
1278 if (mserver.isRegistered(clusterName)) {
1279 if (log.isWarnEnabled())
1280 log.warn(sm.getString("cluster.mbean.register.allready",
1281 clusterName));
1282 return;
1283 }
1284 setObjectName(clusterName);
1285 mserver.registerMBean(getManagedBean(this), getObjectName());
1286 } catch (Exception ex) {
1287 log.error(ex.getMessage(), ex);
1288 }
1289 }
1290
1291 protected void unregisterMBeans() {
1292 if (mserver != null) {
1293 try {
1294 mserver.unregisterMBean(getObjectName());
1295 } catch (Exception e) {
1296 log.error(e);
1297 }
1298 }
1299 }
1300
1301 /**
1302 * Get current Catalina MBean Server and load mbean registry
1303 *
1304 * @return The server
1305 * @throws Exception
1306 */
1307 public MBeanServer getMBeanServer() throws Exception {
1308 if (mserver == null) {
1309 if (MBeanServerFactory.findMBeanServer(null).size() > 0) {
1310 mserver = (MBeanServer) MBeanServerFactory
1311 .findMBeanServer(null).get(0);
1312 } else {
1313 mserver = MBeanServerFactory.createMBeanServer();
1314 }
1315 registry = Registry.getRegistry(null, null);
1316 registry.loadMetadata(this.getClass().getResourceAsStream(
1317 "mbeans-descriptors.xml"));
1318 }
1319 return (mserver);
1320 }
1321
1322 /**
1323 * Returns the ModelMBean
1324 *
1325 * @param object
1326 * The Object to get the ModelMBean for
1327 * @return The ModelMBean
1328 * @throws Exception
1329 * If an error occurs this constructors throws this exception
1330 */
1331 public ModelMBean getManagedBean(Object object) throws Exception {
1332 ModelMBean mbean = null;
1333 if (registry != null) {
1334 ManagedBean managedBean = registry.findManagedBean(object
1335 .getClass().getName());
1336 mbean = managedBean.createMBean(object);
1337 }
1338 return mbean;
1339 }
1340
1341 public void setObjectName(ObjectName name) {
1342 objectName = name;
1343 }
1344
1345 public ObjectName getObjectName() {
1346 return objectName;
1347 }
1348
1349 // --------------------------------------------- Inner Class
1350
1351 private class MemberComparator implements java.util.Comparator {
1352
1353 public int compare(Object o1, Object o2) {
1354 try {
1355 return compare((Member) o1, (Member) o2);
1356 } catch (ClassCastException x) {
1357 return 0;
1358 }
1359 }
1360
1361 public int compare(Member m1, Member m2) {
1362 //longer alive time, means sort first
1363 long result = m2.getMemberAliveTime() - m1.getMemberAliveTime();
1364 if (result < 0)
1365 return -1;
1366 else if (result == 0)
1367 return 0;
1368 else
1369 return 1;
1370 }
1371 }
1372
1373
1374 // ------------------------------------------------------------- deprecated
1375
1376 /**
1377 *
1378 * @see org.apache.catalina.Cluster#setProtocol(java.lang.String)
1379 */
1380 public void setProtocol(String protocol) {
1381 }
1382
1383 /**
1384 * @see org.apache.catalina.Cluster#getProtocol()
1385 */
1386 public String getProtocol() {
1387 return null;
1388 }
1389
1390 /**
1391 * @see org.apache.catalina.Cluster#startContext(java.lang.String)
1392 */
1393 public void startContext(String contextPath) throws IOException {
1394
1395 }
1396
1397 /**
1398 * @see org.apache.catalina.Cluster#installContext(java.lang.String, java.net.URL)
1399 */
1400 public void installContext(String contextPath, URL war) {
1401
1402 }
1403
1404 /**
1405 * @see org.apache.catalina.Cluster#stop(java.lang.String)
1406 */
1407 public void stop(String contextPath) throws IOException {
1408
1409 }
1410 }