1 /*
2 * JBoss, Home of Professional Open Source
3 * Copyright 2005, JBoss Inc., and individual contributors as indicated
4 * by the @authors tag. See the copyright.txt in the distribution for a
5 * full listing of individual contributors.
6 *
7 * This is free software; you can redistribute it and/or modify it
8 * under the terms of the GNU Lesser General Public License as
9 * published by the Free Software Foundation; either version 2.1 of
10 * the License, or (at your option) any later version.
11 *
12 * This software is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this software; if not, write to the Free
19 * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
20 * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
21 */
22 package org.jboss.ha.framework.server;
23
24 import java.io.ByteArrayInputStream;
25 import java.io.ByteArrayOutputStream;
26 import java.io.IOException;
27 import java.io.InputStream;
28 import java.io.OutputStream;
29 import java.io.Serializable;
30 import java.lang.ref.WeakReference;
31 import java.net.InetAddress;
32 import java.text.SimpleDateFormat;
33 import java.util.ArrayList;
34 import java.util.Date;
35 import java.util.HashMap;
36 import java.util.Map;
37 import java.util.Vector;
38 import java.util.concurrent.ConcurrentHashMap;
39 import java.util.concurrent.CountDownLatch;
40
41 import javax.naming.Context;
42 import javax.naming.InitialContext;
43 import javax.naming.Name;
44 import javax.naming.NameNotFoundException;
45 import javax.naming.Reference;
46 import javax.naming.StringRefAddr;
47
48 import org.jboss.cache.Cache;
49 import org.jboss.cache.CacheManager;
50 import org.jboss.ha.framework.interfaces.ClusterNode;
51 import org.jboss.ha.framework.interfaces.DistributedReplicantManager;
52 import org.jboss.ha.framework.interfaces.DistributedState;
53 import org.jboss.ha.framework.interfaces.HAPartition;
54 import org.jboss.invocation.MarshalledValueInputStream;
55 import org.jboss.invocation.MarshalledValueOutputStream;
56 import org.jboss.logging.Logger;
57 import org.jboss.naming.NonSerializableFactory;
58 import org.jboss.system.ServiceMBeanSupport;
59 import org.jboss.system.server.ServerConfigUtil;
60 import org.jboss.util.threadpool.ThreadPool;
61 import org.jgroups.Address;
62 import org.jgroups.Channel;
63 import org.jgroups.ChannelFactory;
64 import org.jgroups.ExtendedMembershipListener;
65 import org.jgroups.ExtendedMessageListener;
66 import org.jgroups.MembershipListener;
67 import org.jgroups.MergeView;
68 import org.jgroups.Message;
69 import org.jgroups.MessageListener;
70 import org.jgroups.Version;
71 import org.jgroups.View;
72 import org.jgroups.blocks.GroupRequest;
73 import org.jgroups.blocks.MethodCall;
74 import org.jgroups.blocks.RpcDispatcher;
75 import org.jgroups.stack.IpAddress;
76 import org.jgroups.util.Rsp;
77 import org.jgroups.util.RspList;
78
79 /**
80 * {@link HAPartition} implementation based on a
81 * <a href="http://www.jgroups.com/">JGroups</a> <code>RpcDispatcher</code>
82 * and a multiplexed <code>JChannel</code>.
83 *
84 * @author <a href="mailto:sacha.labourey@cogito-info.ch">Sacha Labourey</a>.
85 * @author <a href="mailto:bill@burkecentral.com">Bill Burke</a>.
86 * @author Scott.Stark@jboss.org
87 * @author brian.stansberry@jboss.com
88 * @version $Revision: 74643 $
89 */
90 public class ClusterPartition
91 extends ServiceMBeanSupport
92 implements ExtendedMembershipListener, HAPartition,
93 AsynchEventHandler.AsynchEventProcessor,
94 ClusterPartitionMBean
95 {
96 public static final String DEFAULT_CACHE_CONFIG = "ha-partition";
97
98 private static final byte EOF_VALUE = -1;
99 private static final byte NULL_VALUE = 0;
100 private static final byte SERIALIZABLE_VALUE = 1;
101 // TODO add Streamable support
102 // private static final byte STREAMABLE_VALUE = 2;
103
104 /**
105 * Returned when an RPC call arrives for a service that isn't registered.
106 */
107 private static class NoHandlerForRPC implements Serializable
108 {
109 static final long serialVersionUID = -1263095408483622838L;
110 }
111
112 private static class StateStreamEnd implements Serializable
113 {
114 /** The serialVersionUID */
115 private static final long serialVersionUID = -3705345735451504946L;
116 }
117
118 /**
119 * Used internally when an RPC call requires a custom classloader for unmarshalling
120 */
121 private static class HAServiceResponse implements Serializable
122 {
123 private static final long serialVersionUID = -6485594652749906437L;
124 private final String serviceName;
125 private final byte[] payload;
126
127 public HAServiceResponse(String serviceName, byte[] payload)
128 {
129 this.serviceName = serviceName;
130 this.payload = payload;
131 }
132
133 public String getServiceName()
134 {
135 return this.serviceName;
136 }
137
138 public byte[] getPayload()
139 {
140 return this.payload;
141 }
142 }
143
144 private class ChannelConnectTask implements Runnable
145 {
146 private final CountDownLatch latch;
147
148 private ChannelConnectTask(CountDownLatch latch)
149 {
150 this.latch = latch;
151 }
152
153 public void run()
154 {
155 try
156 {
157 ClusterPartition.this.channel.connect(ClusterPartition.this.getPartitionName());
158 }
159 catch (Exception e)
160 {
161 synchronized (ClusterPartition.this.channelLock)
162 {
163 ClusterPartition.this.connectException = e;
164 }
165 }
166 finally
167 {
168 this.latch.countDown();
169 }
170 }
171 }
172
173 // Constants -----------------------------------------------------
174
175 // final MethodLookup method_lookup_clos = new MethodLookupClos();
176
177 // Attributes ----------------------------------------------------
178
179 private CacheManager cacheManager;
180 private String cacheConfigName = DEFAULT_CACHE_CONFIG;
181 private Cache cache;
182 private ChannelFactory channelFactory;
183 private String stackName;
184 private String partitionName = ServerConfigUtil.getDefaultPartitionName();
185 private boolean deadlock_detection = false;
186 private InetAddress nodeAddress = null;
187 private long state_transfer_timeout=60000;
188 private long method_call_timeout=60000;
189
190 /** Thread pool used to asynchronously start our channel */
191 private ThreadPool threadPool;
192
193 protected Map<String, Object> rpcHandlers = new ConcurrentHashMap<String, Object>();
194 protected Map<String, HAPartitionStateTransfer> stateHandlers = new HashMap<String, HAPartitionStateTransfer>();
195 /** Do we send any membership change notifications synchronously? */
196 protected boolean allowSyncListeners = false;
197 /** The HAMembershipListener and HAMembershipExtendedListeners */
198 protected ArrayList<HAMembershipListener> synchListeners = new ArrayList<HAMembershipListener>();
199 /** The asynch HAMembershipListener and HAMembershipExtendedListeners */
200 protected ArrayList<HAMembershipListener> asynchListeners = new ArrayList<HAMembershipListener>();
201 /** The handler used to send membership change notifications asynchronously */
202 protected AsynchEventHandler asynchHandler;
203 /** The current cluster partition members */
204 protected Vector<ClusterNode> members = null;
205 protected Vector<Address> jgmembers = null;
206 protected Map<String, WeakReference<ClassLoader>> clmap =
207 new ConcurrentHashMap<String, WeakReference<ClassLoader>>();
208
209 public Vector<String> history = new Vector<String>();
210
211 /** The partition members other than this node */
212 protected Vector<ClusterNode> otherMembers = null;
213 protected Vector<Address> jgotherMembers = null;
214 /** the local JG IP Address */
215 protected Address localJGAddress = null;
216 /** The cluster transport protocol address string */
217 protected String nodeName;
218 /** me as a ClusterNode */
219 protected ClusterNode me = null;
220 /** The JGroups partition channel */
221 protected Channel channel;
222 /** The cluster replicant manager */
223 protected DistributedReplicantManagerImpl replicantManager;
224 /** The DistributedState service we manage */
225 protected DistributedStateImpl distributedState;
226 /** The cluster instance log category */
227 protected Logger log;
228 protected Logger clusterLifeCycleLog;
229 /** The current cluster view id */
230 protected long currentViewId = -1;
231 /** Whether to bind the partition into JNDI */
232 protected boolean bindIntoJndi = true;
233
234 private final ThreadGate flushBlockGate = new ThreadGate();
235
236 private RpcDispatcher dispatcher = null;
237
238 /**
239 * True if serviceState was initialized during start-up.
240 */
241 protected boolean isStateSet = false;
242
243 /**
244 * An exception occuring upon fetch serviceState.
245 */
246 protected Exception setStateException;
247 /**
248 * An exception occuring during channel connect
249 */
250 protected Exception connectException;
251 private final Object channelLock = new Object();
252 private final MessageListenerAdapter messageListener = new MessageListenerAdapter();
253
254 // Static --------------------------------------------------------
255
256 private Channel createChannel()
257 {
258 ChannelFactory factory = this.getChannelFactory();
259 if (factory == null)
260 {
261 throw new IllegalStateException("HAPartitionConfig has no JChannelFactory");
262 }
263 String stack = this.getChannelStackName();
264 if (stack == null)
265 {
266 throw new IllegalStateException("HAPartitionConfig has no multiplexer stack");
267 }
268 try
269 {
270 return factory.createMultiplexerChannel(stack, this.getPartitionName());
271 }
272 catch (RuntimeException e)
273 {
274 throw e;
275 }
276 catch (Exception e)
277 {
278 throw new RuntimeException("Failure creating multiplexed Channel", e);
279 }
280 }
281
282 // Constructors --------------------------------------------------
283
284 public ClusterPartition()
285 {
286 this.logHistory("Partition object created");
287 }
288
289 // ------------------------------------------------------------ ServiceMBean
290
291 // ----------------------------------------------------------------- Service
292
293 protected void createService() throws Exception
294 {
295 if (this.replicantManager == null)
296 {
297 throw new IllegalStateException("DistributedReplicantManager property must be set before creating ClusterPartition service");
298 }
299
300 this.setupLoggers(this.getPartitionName());
301
302 this.replicantManager.createService();
303
304 if (this.distributedState != null)
305 {
306 this.distributedState.createService();
307 }
308
309 // Create the asynchronous handler for view changes
310 this.asynchHandler = new AsynchEventHandler(this, "AsynchViewChangeHandler");
311
312 this.log.debug("done initializing partition");
313 }
314
315 protected void startService() throws Exception
316 {
317 this.logHistory ("Starting partition");
318
319 this.cache = this.cacheManager.getCache(this.cacheConfigName, true);
320 this.channelFactory = this.cache.getConfiguration().getRuntimeConfig().getMuxChannelFactory();
321 this.stackName = this.cache.getConfiguration().getMultiplexerStack();
322
323 if (this.channel == null || !this.channel.isOpen())
324 {
325 this.log.debug("Creating Channel for partition " + this.getPartitionName() +
326 " using stack " + this.getChannelStackName());
327
328 this.channel = this.createChannel();
329
330 this.channel.setOpt(Channel.AUTO_RECONNECT, Boolean.TRUE);
331 this.channel.setOpt(Channel.AUTO_GETSTATE, Boolean.TRUE);
332 }
333
334 this.log.info("Initializing partition " + this.getPartitionName());
335 this.logHistory ("Initializing partition " + this.getPartitionName());
336
337 this.dispatcher = new RpcHandler(this.channel, null, null, new Object(), this.getDeadlockDetection());
338
339 // Subscribe to events generated by the channel
340 this.log.debug("setMembershipListener");
341 this.dispatcher.setMembershipListener(this);
342 this.log.debug("setMessageListener");
343 this.dispatcher.setMessageListener(this.messageListener);
344 this.dispatcher.setRequestMarshaller(new RequestMarshallerImpl());
345 this.dispatcher.setResponseMarshaller(new ResponseMarshallerImpl());
346
347 // Clear any old connectException
348 this.connectException = null;
349 CountDownLatch connectLatch = new CountDownLatch(1);
350
351 if (this.threadPool == null)
352 {
353 this.channel.connect(this.getPartitionName());
354 connectLatch.countDown();
355 }
356 else
357 {
358 // Do the channel connect in another thread while this
359 // thread starts the cache and does that channel connect
360 ChannelConnectTask task = new ChannelConnectTask(connectLatch);
361 this.threadPool.run(task);
362 }
363
364 this.cache.start();
365
366 try
367 {
368 // This will block waiting for any async channel connect above
369 connectLatch.await();
370
371 if (this.connectException != null)
372 {
373 throw this.connectException;
374 }
375
376 this.log.debug("Get current members");
377 this.waitForView();
378
379 // get current JG group properties
380 this.log.debug("get nodeName");
381 this.localJGAddress = this.channel.getLocalAddress();
382 this.me = new ClusterNodeImpl((IpAddress) this.localJGAddress);
383 this.nodeName = this.me.getName();
384
385 this.verifyNodeIsUnique();
386
387 this.fetchState();
388
389 this.replicantManager.startService();
390
391 if (this.distributedState != null)
392 {
393 this.distributedState.setClusteredCache(this.getClusteredCache());
394 this.distributedState.startService();
395 }
396
397 // Start the asynch listener handler thread
398 this.asynchHandler.start();
399
400 // Register with the service locator
401 HAPartitionLocator.getHAPartitionLocator().registerHAPartition(this);
402
403 // Bind ourself in the public JNDI space if configured to do so
404 if (this.bindIntoJndi)
405 {
406 Context ctx = new InitialContext();
407 this.bind(HAPartitionLocator.getStandardJndiBinding(this.getPartitionName()),
408 this, ClusterPartition.class, ctx);
409 this.log.debug("Bound in JNDI under /HAPartition/" + this.getPartitionName());
410 }
411 }
412 catch (Throwable t)
413 {
414 this.log.debug("Caught exception after channel connected; closing channel -- " + t.getLocalizedMessage());
415 this.channel.close();
416 this.channel = null;
417 throw (t instanceof Exception) ? (Exception) t : new RuntimeException(t);
418 }
419
420 }
421
422 protected void stopService() throws Exception
423 {
424 this.logHistory ("Stopping partition");
425 this.log.info("Stopping partition " + this.getPartitionName());
426
427 try
428 {
429 this.asynchHandler.stop();
430 }
431 catch( Exception e)
432 {
433 this.log.warn("Failed to stop asynchHandler", e);
434 }
435
436 if (this.distributedState != null)
437 {
438 this.distributedState.stopService();
439 }
440
441 this.replicantManager.stopService();
442
443 try
444 {
445 this.cacheManager.releaseCache(this.cacheConfigName);
446 }
447 catch (Exception e)
448 {
449 this.log.error("cache release failed", e);
450 }
451
452 // NR 200505 : [JBCLUSTER-38] replace channel.close() by a disconnect and
453 // add the destroyPartition() step
454 try
455 {
456 if (this.channel != null && this.channel.isConnected())
457 {
458 this.channel.disconnect();
459 }
460 }
461 catch (Exception e)
462 {
463 this.log.error("channel disconnection failed", e);
464 }
465
466 if (this.bindIntoJndi)
467 {
468 String boundName = HAPartitionLocator.getStandardJndiBinding(this.getPartitionName());
469 InitialContext ctx = null;
470 try
471 {
472 // the following statement fails when the server is being shut down (07/19/2007)
473 ctx = new InitialContext();
474 ctx.unbind(boundName);
475 }
476 catch (Exception e) {
477 this.log.error("partition unbind operation failed", e);
478 }
479 finally
480 {
481 if (ctx != null)
482 {
483 ctx.close();
484 }
485 }
486 NonSerializableFactory.unbind(boundName);
487 }
488
489 HAPartitionLocator.getHAPartitionLocator().deregisterHAPartition(this);
490
491 this.log.info("Partition " + this.getPartitionName() + " stopped.");
492 }
493
494 protected void destroyService() throws Exception
495 {
496 this.log.debug("Destroying HAPartition: " + this.getPartitionName());
497
498 if (this.distributedState != null)
499 {
500 this.distributedState.destroyService();
501 }
502
503 this.replicantManager.destroyService();
504
505 try
506 {
507 if (this.channel != null && this.channel.isOpen())
508 {
509 this.channel.close();
510 }
511 }
512 catch (Exception e)
513 {
514 this.log.error("Closing channel failed", e);
515 }
516
517 this.log.info("Partition " + this.getPartitionName() + " destroyed.");
518 }
519
520 // ---------------------------------------------------------- State Transfer
521
522
523 protected void fetchState() throws Exception
524 {
525 this.log.info("Fetching serviceState (will wait for " + this.getStateTransferTimeout() +
526 " milliseconds):");
527 long start, stop;
528 this.isStateSet = false;
529 start = System.currentTimeMillis();
530 boolean rc = this.channel.getState(null, this.getStateTransferTimeout());
531 if (rc)
532 {
533 synchronized (this.channelLock)
534 {
535 while (!this.isStateSet)
536 {
537 if (this.setStateException != null)
538 {
539 throw this.setStateException;
540 }
541
542 try
543 {
544 this.channelLock.wait();
545 }
546 catch (InterruptedException iex)
547 {
548 }
549 }
550 }
551 stop = System.currentTimeMillis();
552 this.log.info("serviceState was retrieved successfully (in " + (stop - start) + " milliseconds)");
553 }
554 else
555 {
556 // No one provided us with serviceState.
557 // We need to find out if we are the coordinator, so we must
558 // block until viewAccepted() is called at least once
559
560 synchronized (this.members)
561 {
562 while (this.members.size() == 0)
563 {
564 this.log.debug("waiting on viewAccepted()");
565 try
566 {
567 this.members.wait();
568 }
569 catch (InterruptedException iex)
570 {
571 }
572 }
573 }
574
575 if (this.isCurrentNodeCoordinator())
576 {
577 this.log.info("State could not be retrieved (we are the first member in group)");
578 }
579 else
580 {
581 throw new IllegalStateException("Initial serviceState transfer failed: " +
582 "Channel.getState() returned false");
583 }
584 }
585 }
586
587 private void getStateInternal(OutputStream stream) throws IOException
588 {
589 MarshalledValueOutputStream mvos = null; // don't create until we know we need it
590
591 for (Map.Entry<String, HAPartitionStateTransfer> entry: this.stateHandlers.entrySet())
592 {
593 HAPartitionStateTransfer subscriber = entry.getValue();
594 this.log.debug("getState for " + entry.getKey());
595 Object state = subscriber.getCurrentState();
596 if (state != null)
597 {
598 if (mvos == null)
599 {
600 // This is our first write, so need to write the header first
601 stream.write(SERIALIZABLE_VALUE);
602
603 mvos = new MarshalledValueOutputStream(stream);
604 }
605
606 mvos.writeObject(entry.getKey());
607 mvos.writeObject(state);
608 }
609 }
610
611 if (mvos == null)
612 {
613 // We never wrote any serviceState, so write the NULL header
614 stream.write(NULL_VALUE);
615 }
616 else
617 {
618 mvos.writeObject(new StateStreamEnd());
619 mvos.flush();
620 mvos.close();
621 }
622
623 }
624
625 private void setStateInternal(InputStream stream) throws IOException, ClassNotFoundException
626 {
627 byte type = (byte) stream.read();
628
629 if (type == EOF_VALUE)
630 {
631 this.log.debug("serviceState stream is empty");
632 return;
633 }
634 else if (type == NULL_VALUE)
635 {
636 this.log.debug("serviceState is null");
637 return;
638 }
639
640 long used_mem_before, used_mem_after;
641 Runtime rt=Runtime.getRuntime();
642 used_mem_before=rt.totalMemory() - rt.freeMemory();
643
644 MarshalledValueInputStream mvis = new MarshalledValueInputStream(stream);
645
646 while (true)
647 {
648 Object obj = mvis.readObject();
649 if (obj instanceof StateStreamEnd)
650 {
651 break;
652 }
653
654 String key = (String) obj;
655 this.log.debug("setState for " + key);
656 Object someState = mvis.readObject();
657 HAPartitionStateTransfer subscriber = this.stateHandlers.get(key);
658 if (subscriber != null)
659 {
660 try
661 {
662 subscriber.setCurrentState((Serializable)someState);
663 }
664 catch (Exception e)
665 {
666 // Don't let issues with one subscriber affect others
667 // unless it is DRM, which is really an internal function
668 // of the HAPartition
669 // FIXME remove this once DRM is JBC-based
670 if (DistributedReplicantManagerImpl.SERVICE_NAME.equals(key))
671 {
672 if (e instanceof RuntimeException)
673 {
674 throw (RuntimeException) e;
675 }
676
677 throw new RuntimeException(e);
678 }
679
680 this.log.error("Caught exception setting serviceState to " + subscriber, e);
681 }
682 }
683 else
684 {
685 this.log.debug("There is no stateHandler for: " + key);
686 }
687 }
688
689 try
690 {
691 stream.close();
692 }
693 catch(Exception e)
694 {
695 this.log.error("Caught exception closing serviceState stream", e);
696 }
697
698 used_mem_after=rt.totalMemory() - rt.freeMemory();
699 this.log.debug("received serviceState; expanded memory by " +
700 (used_mem_after - used_mem_before) + " bytes (used memory before: " + used_mem_before +
701 ", used memory after: " + used_mem_after + ")");
702 }
703
704 private void recordSetStateFailure(Throwable t)
705 {
706 this.log.error("failed setting serviceState", t);
707 if (t instanceof Exception)
708 {
709 this.setStateException = (Exception) t;
710 }
711 else
712 {
713 this.setStateException = new Exception(t);
714 }
715 }
716
717 private void notifyChannelLock()
718 {
719 synchronized (this.channelLock)
720 {
721 this.channelLock.notifyAll();
722 }
723 }
724
725 // org.jgroups.MembershipListener implementation ----------------------------------------------
726
727 public void suspect(org.jgroups.Address suspected_mbr)
728 {
729 this.logHistory ("Node suspected: " + (suspected_mbr==null?"null":suspected_mbr.toString()));
730 if (this.isCurrentNodeCoordinator ())
731 {
732 this.clusterLifeCycleLog.info ("Suspected member: " + suspected_mbr);
733 }
734 else
735 {
736 this.log.info("Suspected member: " + suspected_mbr);
737 }
738 }
739
740 public void block()
741 {
742 this.flushBlockGate.close();
743 this.log.debug("Block processed at " + this.me);
744 }
745
746 public void unblock()
747 {
748 this.flushBlockGate.open();
749 this.log.debug("Unblock processed at " + this.me);
750 }
751
752 /** Notification of a cluster view change. This is done from the JG protocol
753 * handlder thread and we must be careful to not unduly block this thread.
754 * Because of this there are two types of listeners, synchronous and
755 * asynchronous. The synchronous listeners are messaged with the view change
756 * event using the calling thread while the asynchronous listeners are
757 * messaged using a seperate thread.
758 *
759 * @param newView
760 */
761 public void viewAccepted(View newView)
762 {
763 try
764 {
765 // we update the view id
766 this.currentViewId = newView.getVid().getId();
767
768 // Keep a list of other members only for "exclude-self" RPC calls
769 this.jgotherMembers = (Vector)newView.getMembers().clone();
770 this.jgotherMembers.remove (this.channel.getLocalAddress());
771 this.otherMembers = this.translateAddresses (this.jgotherMembers); // TRANSLATE!
772 Vector<ClusterNode> translatedNewView = this.translateAddresses ((Vector)newView.getMembers().clone());
773 this.logHistory ("New view: " + translatedNewView + " with viewId: " + this.currentViewId +
774 " (old view: " + this.members + " )");
775
776
777 // Save the previous view and make a copy of the new view
778 Vector<ClusterNode> oldMembers = this.members;
779
780 Vector<Address> newjgMembers = (Vector)newView.getMembers().clone();
781 Vector<ClusterNode> newMembers = this.translateAddresses(newjgMembers); // TRANSLATE
782 this.members = newMembers;
783 this.jgmembers = newjgMembers;
784
785 if (oldMembers == null)
786 {
787 // Initial viewAccepted
788 this.log.debug("ViewAccepted: initial members set for partition " + this.getPartitionName() + ": " +
789 this.currentViewId + " (" + this.members + ")");
790
791 this.log.info("Number of cluster members: " + this.members.size());
792 for(int m = 0; m > this.members.size(); m ++)
793 {
794 Object node = this.members.get(m);
795 this.log.debug(node);
796 }
797 this.log.info ("Other members: " + this.otherMembers.size ());
798
799 // Wake up the deployer thread blocking in waitForView
800 this.notifyChannelLock();
801 return;
802 }
803
804 int difference = newMembers.size() - oldMembers.size();
805
806 if (this.isCurrentNodeCoordinator ())
807 {
808 this.clusterLifeCycleLog.info ("New cluster view for partition " + this.getPartitionName() + " (id: " +
809 this.currentViewId + ", delta: " + difference + ") : " + this.members);
810 }
811 else
812 {
813 this.log.info("New cluster view for partition " + this.getPartitionName() + ": " +
814 this.currentViewId + " (" + this.members + " delta: " + difference + ")");
815 }
816
817 // Build a ViewChangeEvent for the asynch listeners
818 ViewChangeEvent event = new ViewChangeEvent();
819 event.viewId = this.currentViewId;
820 event.allMembers = translatedNewView;
821 event.deadMembers = this.getDeadMembers(oldMembers, event.allMembers);
822 event.newMembers = this.getNewMembers(oldMembers, event.allMembers);
823 event.originatingGroups = null;
824 // if the new view occurs because of a merge, we first inform listeners of the merge
825 if(newView instanceof MergeView)
826 {
827 MergeView mergeView = (MergeView) newView;
828 event.originatingGroups = mergeView.getSubgroups();
829 }
830
831 this.log.debug("membership changed from " + oldMembers.size() + " to " + event.allMembers.size());
832 // Put the view change to the asynch queue
833 this.asynchHandler.queueEvent(event);
834
835 // Broadcast the new view to the synchronous view change listeners
836 if (this.allowSyncListeners)
837 {
838 this.notifyListeners(this.synchListeners, event.viewId, event.allMembers,
839 event.deadMembers, event.newMembers, event.originatingGroups);
840 }
841 }
842 catch (Exception ex)
843 {
844 this.log.error("ViewAccepted failed", ex);
845 }
846 }
847
848 private void waitForView() throws Exception
849 {
850 synchronized (this.channelLock)
851 {
852 if (this.members == null)
853 {
854 if (this.connectException != null)
855 {
856 throw this.connectException;
857 }
858
859 try
860 {
861 this.channelLock.wait(this.getMethodCallTimeout());
862 }
863 catch (InterruptedException iex)
864 {
865 }
866
867 if (this.connectException != null)
868 {
869 throw this.connectException;
870 }
871
872 if (this.members == null)
873 {
874 throw new IllegalStateException("No view received from Channel");
875 }
876 }
877 }
878 }
879
880 // HAPartition implementation ----------------------------------------------
881
882 public String getNodeName()
883 {
884 return this.nodeName;
885 }
886
887 public String getPartitionName()
888 {
889 return this.partitionName;
890 }
891
892 public void setPartitionName(String newName)
893 {
894 this.partitionName = newName;
895 }
896
897 public DistributedReplicantManager getDistributedReplicantManager()
898 {
899 return this.replicantManager;
900 }
901
902 public DistributedState getDistributedStateService()
903 {
904 return this.distributedState;
905 }
906
907 public long getCurrentViewId()
908 {
909 return this.currentViewId;
910 }
911
912 public Vector<String> getCurrentView()
913 {
914 Vector<String> result = new Vector<String>(this.members.size());
915 for (ClusterNode member: this.members)
916 {
917 result.add(member.getName());
918 }
919 return result;
920 }
921
922 public ClusterNode[] getClusterNodes ()
923 {
924 synchronized (this.members)
925 {
926 return this.members.toArray(new ClusterNode[this.members.size()]);
927 }
928 }
929
930 public ClusterNode getClusterNode ()
931 {
932 return this.me;
933 }
934
935 public boolean isCurrentNodeCoordinator ()
936 {
937 if(this.members == null || this.members.size() == 0 || this.me == null)
938 {
939 return false;
940 }
941 return this.members.elementAt (0).equals (this.me);
942 }
943
944 // ***************************
945 // ***************************
946 // RPC multicast communication
947 // ***************************
948 // ***************************
949 //
950 public void registerRPCHandler(String objName, Object subscriber)
951 {
952 this.rpcHandlers.put(objName, subscriber);
953 }
954
955 public void registerRPCHandler(String objName, Object subscriber, ClassLoader classloader)
956 {
957 this.registerRPCHandler(objName, subscriber);
958 this.clmap.put(objName, new WeakReference<ClassLoader>(classloader));
959 }
960
961 public void unregisterRPCHandler(String objName, Object subscriber)
962 {
963 this.rpcHandlers.remove(objName);
964 this.clmap.remove(objName);
965 }
966
967 /**
968 * This function is an abstraction of RpcDispatcher.
969 */
970 public ArrayList callMethodOnCluster(String objName, String methodName,
971 Object[] args, Class[] types, boolean excludeSelf) throws Exception
972 {
973 return this.callMethodOnCluster(objName, methodName, args, types, excludeSelf, this.getMethodCallTimeout());
974 }
975
976
977 public ArrayList callMethodOnCluster(String objName, String methodName,
978 Object[] args, Class[] types, boolean excludeSelf, long methodTimeout) throws Exception
979 {
980 RspList rsp = null;
981 boolean trace = this.log.isTraceEnabled();
982
983 MethodCall m = new MethodCall(objName + "." + methodName, args, types);
984
985 if(this.channel.flushSupported())
986 {
987 this.flushBlockGate.await(this.getStateTransferTimeout());
988 }
989 if (excludeSelf)
990 {
991 if( trace )
992 {
993 this.log.trace("callMethodOnCluster(true), objName="+objName
994 +", methodName="+methodName+", members="+this.jgotherMembers);
995 }
996 rsp = this.dispatcher.callRemoteMethods(this.jgotherMembers, m, GroupRequest.GET_ALL, methodTimeout);
997 }
998 else
999 {
1000 if( trace )
1001 {
1002 this.log.trace("callMethodOnCluster(false), objName="+objName
1003 +", methodName="+methodName+", members="+this.members);
1004 }
1005 rsp = this.dispatcher.callRemoteMethods(null, m, GroupRequest.GET_ALL, methodTimeout);
1006 }
1007
1008 return this.processResponseList(rsp, trace);
1009 }
1010
1011 /**
1012 * Calls method on Cluster coordinator node only. The cluster coordinator node is the first node to join the
1013 * cluster.
1014 * and is replaced
1015 * @param objName
1016 * @param methodName
1017 * @param args
1018 * @param types
1019 * @param excludeSelf
1020 * @return an array of responses from remote nodes
1021 * @throws Exception
1022 */
1023 public ArrayList callMethodOnCoordinatorNode(String objName, String methodName,
1024 Object[] args, Class[] types,boolean excludeSelf) throws Exception
1025 {
1026 return this.callMethodOnCoordinatorNode(objName,methodName,args,types,excludeSelf, this.getMethodCallTimeout());
1027 }
1028
1029 /**
1030 * Calls method on Cluster coordinator node only. The cluster coordinator node is the first node to join the
1031 * cluster.
1032 * and is replaced
1033 * @param objName
1034 * @param methodName
1035 * @param args
1036 * @param types
1037 * @param excludeSelf
1038 * @param methodTimeout
1039 * @return an array of responses from remote nodes
1040 * @throws Exception
1041 */
1042 public ArrayList callMethodOnCoordinatorNode(String objName, String methodName,
1043 Object[] args, Class[] types,boolean excludeSelf, long methodTimeout) throws Exception
1044 {
1045 boolean trace = this.log.isTraceEnabled();
1046
1047 MethodCall m = new MethodCall(objName + "." + methodName, args, types);
1048
1049 if( trace )
1050 {
1051 this.log.trace("callMethodOnCoordinatorNode(false), objName="+objName
1052 +", methodName="+methodName);
1053 }
1054
1055 // the first cluster view member is the coordinator
1056 Vector<Address> coordinatorOnly = new Vector<Address>();
1057 // If we are the coordinator, only call ourself if 'excludeSelf' is false
1058 if (false == this.isCurrentNodeCoordinator () ||
1059 false == excludeSelf)
1060 {
1061 coordinatorOnly.addElement(this.jgmembers.elementAt(0));
1062 }
1063
1064 RspList rsp = this.dispatcher.callRemoteMethods(coordinatorOnly, m, GroupRequest.GET_ALL, methodTimeout);
1065
1066 return this.processResponseList(rsp, trace);
1067 }
1068
1069 /**
1070 * Calls method synchrounously on target node only.
1071 * @param serviceName Name of the target service name on which calls are de-multiplexed
1072 * @param methodName name of the Java method to be called on remote services
1073 * @param args array of Java Object representing the set of parameters to be
1074 * given to the remote method
1075 * @param types The types of the parameters
1076 * node of the partition or only on remote nodes
1077 * @param targetNode is the target of the call
1078 * @return the value returned by the target method
1079 * @throws Exception Throws if a communication exception occurs
1080 */
1081 public Object callMethodOnNode(String serviceName, String methodName,
1082 Object[] args, Class[] types, long methodTimeout, ClusterNode targetNode) throws Throwable
1083 {
1084 if (!(targetNode instanceof ClusterNodeImpl))
1085 {
1086 throw new IllegalArgumentException("targetNode " + targetNode + " is not an instance of " +
1087 ClusterNodeImpl.class + " -- only targetNodes provided by this HAPartition should be used");
1088 }
1089 boolean trace = this.log.isTraceEnabled();
1090
1091 MethodCall m = new MethodCall(serviceName + "." + methodName, args, types);
1092
1093 if( trace )
1094 {
1095 this.log.trace("callMethodOnNode( objName="+serviceName
1096 +", methodName="+methodName);
1097 }
1098 Object rc = this.dispatcher.callRemoteMethod(((ClusterNodeImpl)targetNode).getOriginalJGAddress(), m, GroupRequest.GET_FIRST, methodTimeout);
1099 if (rc != null)
1100 {
1101 Object item = rc;
1102 if (item instanceof Rsp)
1103 {
1104 Rsp response = (Rsp) item;
1105 // Only include received responses
1106 boolean wasReceived = response.wasReceived();
1107 if( wasReceived == true )
1108 {
1109 item = response.getValue();
1110 if (!(item instanceof NoHandlerForRPC))
1111 {
1112 rc = item;
1113 }
1114 }
1115 else if( trace )
1116 {
1117 this.log.trace("Ignoring non-received response: "+response);
1118 }
1119 }
1120 else
1121 {
1122 if (!(item instanceof NoHandlerForRPC))
1123 {
1124 rc = item;
1125 }
1126 else if( trace )
1127 {
1128 this.log.trace("Ignoring NoHandlerForRPC");
1129 }
1130 }
1131 }
1132 return rc;
1133 }
1134
1135
1136 /**
1137 * Calls method on target node only.
1138 * @param serviceName Name of the target service name on which calls are de-multiplexed
1139 * @param methodName name of the Java method to be called on remote services
1140 * @param args array of Java Object representing the set of parameters to be
1141 * given to the remote method
1142 * @param types The types of the parameters
1143 * node of the partition or only on remote nodes
1144 * @param targetNode is the target of the call
1145 * @return none
1146 * @throws Exception Throws if a communication exception occurs
1147 */
1148 public void callAsyncMethodOnNode(String serviceName, String methodName,
1149 Object[] args, Class[] types, long methodTimeout, ClusterNode targetNode) throws Throwable
1150 {
1151 if (!(targetNode instanceof ClusterNodeImpl))
1152 {
1153 throw new IllegalArgumentException("targetNode " + targetNode + " is not an instance of " +
1154 ClusterNodeImpl.class + " -- only targetNodes provided by this HAPartition should be used");
1155 }
1156 boolean trace = this.log.isTraceEnabled();
1157
1158 MethodCall m = new MethodCall(serviceName + "." + methodName, args, types);
1159
1160 if( trace )
1161 {
1162 this.log.trace("callAsyncMethodOnNode( objName="+serviceName
1163 +", methodName="+methodName);
1164 }
1165 this.dispatcher.callRemoteMethod(((ClusterNodeImpl)targetNode).getOriginalJGAddress(), m, GroupRequest.GET_NONE, methodTimeout);
1166 }
1167
1168 private ArrayList processResponseList(RspList rsp, boolean trace)
1169 {
1170 ArrayList rtn = new ArrayList();
1171 if (rsp != null)
1172 {
1173 for (Object item : rsp.values())
1174 {
1175 if (item instanceof Rsp)
1176 {
1177 Rsp response = (Rsp) item;
1178 // Only include received responses
1179 boolean wasReceived = response.wasReceived();
1180 if( wasReceived == true )
1181 {
1182 item = response.getValue();
1183 if (!(item instanceof NoHandlerForRPC))
1184 {
1185 rtn.add(item);
1186 }
1187 }
1188 else if( trace )
1189 {
1190 this.log.trace("Ignoring non-received response: "+response);
1191 }
1192 }
1193 else
1194 {
1195 if (!(item instanceof NoHandlerForRPC))
1196 {
1197 rtn.add(item);
1198 }
1199 else if( trace )
1200 {
1201 this.log.trace("Ignoring NoHandlerForRPC");
1202 }
1203 }
1204 }
1205
1206 }
1207 return rtn;
1208 }
1209
1210 /**
1211 * This function is an abstraction of RpcDispatcher for asynchronous messages
1212 */
1213 public void callAsynchMethodOnCluster(String objName, String methodName,
1214 Object[] args, Class[] types, boolean excludeSelf) throws Exception
1215 {
1216 boolean trace = this.log.isTraceEnabled();
1217
1218 MethodCall m = new MethodCall(objName + "." + methodName, args, types);
1219
1220 if(this.channel.flushSupported())
1221 {
1222 this.flushBlockGate.await(this.getStateTransferTimeout());
1223 }
1224 if (excludeSelf)
1225 {
1226 if( trace )
1227 {
1228 this.log.trace("callAsynchMethodOnCluster(true), objName="+objName
1229 +", methodName="+methodName+", members="+this.jgotherMembers);
1230 }
1231 this.dispatcher.callRemoteMethods(this.jgotherMembers, m, GroupRequest.GET_NONE, this.getMethodCallTimeout());
1232 }
1233 else
1234 {
1235 if( trace )
1236 {
1237 this.log.trace("callAsynchMethodOnCluster(false), objName="+objName
1238 +", methodName="+methodName+", members="+this.members);
1239 }
1240 this.dispatcher.callRemoteMethods(null, m, GroupRequest.GET_NONE, this.getMethodCallTimeout());
1241 }
1242 }
1243
1244 // *************************
1245 // *************************
1246 // State transfer management
1247 // *************************
1248 // *************************
1249 //
1250 public void subscribeToStateTransferEvents(String objectName, HAPartitionStateTransfer subscriber)
1251 {
1252 this.stateHandlers.put(objectName, subscriber);
1253 }
1254
1255 public void unsubscribeFromStateTransferEvents(String objectName, HAPartitionStateTransfer subscriber)
1256 {
1257 this.stateHandlers.remove(objectName);
1258 }
1259
1260 // *************************
1261 // *************************
1262 // Group Membership listeners
1263 // *************************
1264 // *************************
1265 //
1266 public void registerMembershipListener(HAMembershipListener listener)
1267 {
1268 boolean isAsynch = (this.allowSyncListeners == false)
1269 || (listener instanceof AsynchHAMembershipListener)
1270 || (listener instanceof AsynchHAMembershipExtendedListener);
1271 if( isAsynch ) {
1272 synchronized(this.asynchListeners) {
1273 this.asynchListeners.add(listener);
1274 }
1275 }
1276 else {
1277 synchronized(this.synchListeners) {
1278 this.synchListeners.add(listener);
1279 }
1280 }
1281 }
1282
1283 public void unregisterMembershipListener(HAMembershipListener listener)
1284 {
1285 boolean isAsynch = (this.allowSyncListeners == false)
1286 || (listener instanceof AsynchHAMembershipListener)
1287 || (listener instanceof AsynchHAMembershipExtendedListener);
1288 if( isAsynch ) {
1289 synchronized(this.asynchListeners) {
1290 this.asynchListeners.remove(listener);
1291 }
1292 }
1293 else {
1294 synchronized(this.synchListeners) {
1295 this.synchListeners.remove(listener);
1296 }
1297 }
1298 }
1299
1300 public boolean getAllowSynchronousMembershipNotifications()
1301 {
1302 return this.allowSyncListeners;
1303 }
1304
1305 public void setAllowSynchronousMembershipNotifications(boolean allowSync)
1306 {
1307 this.allowSyncListeners = allowSync;
1308 }
1309
1310 // AsynchEventHandler.AsynchEventProcessor -----------------------
1311
1312 public void processEvent(Object event)
1313 {
1314 ViewChangeEvent vce = (ViewChangeEvent) event;
1315 this.notifyListeners(this.asynchListeners, vce.viewId, vce.allMembers,
1316 vce.deadMembers, vce.newMembers, vce.originatingGroups);
1317
1318 }
1319
1320
1321 // Public ------------------------------------------------------------------
1322
1323 public void setDistributedStateImpl(DistributedStateImpl distributedState)
1324 {
1325 this.distributedState = distributedState;
1326 }
1327
1328 public void setDistributedReplicantManagerImpl(DistributedReplicantManagerImpl drm)
1329 {
1330 if (this.replicantManager != null && !(this.replicantManager == drm))
1331 {
1332 throw new IllegalStateException("DistributedReplicantManager already set");
1333 }
1334
1335 this.replicantManager = drm;
1336 if (this.replicantManager != null)
1337 {
1338 this.replicantManager.setHAPartition(this);
1339 }
1340 }
1341
1342
1343 // Protected -----------------------------------------------------
1344
1345 protected void verifyNodeIsUnique () throws IllegalStateException
1346 {
1347 ClusterNodeImpl matched = null;
1348 for (ClusterNode member : this.getClusterNodes())
1349 {
1350 if (member.equals(this.me))
1351 {
1352 if (matched == null)
1353 {
1354 // We of course are in the view, so we expect one match
1355 // Just track that we've had one
1356 matched = (ClusterNodeImpl) member;
1357 }
1358 else
1359 {
1360 // Two nodes in view match us; try to figure out which one isn't us
1361 ClusterNodeImpl other = matched;
1362 if (other.getOriginalJGAddress().equals(((ClusterNodeImpl)this.me).getOriginalJGAddress()))
1363 {
1364 other = (ClusterNodeImpl) member;
1365 }
1366 throw new IllegalStateException("Found member " + other +
1367 " in current view that duplicates us (" + this.me + "). This" +
1368 " node cannot join partition until duplicate member has" +
1369 " been removed");
1370 }
1371 }
1372 }
1373 }
1374
1375 /**
1376 * Helper method that binds the partition in the JNDI tree.
1377 * @param jndiName Name under which the object must be bound
1378 * @param who Object to bind in JNDI
1379 * @param classType Class type under which should appear the bound object
1380 * @param ctx Naming context under which we bind the object
1381 * @throws Exception Thrown if a naming exception occurs during binding
1382 */
1383 protected void bind(String jndiName, Object who, Class classType, Context ctx) throws Exception
1384 {
1385 // Ah ! This service isn't serializable, so we use a helper class
1386 //
1387 NonSerializableFactory.bind(jndiName, who);
1388 Name n = ctx.getNameParser("").parse(jndiName);
1389 while (n.size () > 1)
1390 {
1391 String ctxName = n.get (0);
1392 try
1393 {
1394 ctx = (Context)ctx.lookup (ctxName);
1395 }
1396 catch (NameNotFoundException e)
1397 {
1398 this.log.debug ("creating Subcontext " + ctxName);
1399 ctx = ctx.createSubcontext (ctxName);
1400 }
1401 n = n.getSuffix (1);
1402 }
1403
1404 // The helper class NonSerializableFactory uses address type nns, we go on to
1405 // use the helper class to bind the service object in JNDI
1406 //
1407 StringRefAddr addr = new StringRefAddr("nns", jndiName);
1408 Reference ref = new Reference(classType.getName (), addr, NonSerializableFactory.class.getName (), null);
1409 ctx.rebind (n.get (0), ref);
1410 }
1411
1412 /**
1413 * Helper method that returns a vector of dead members from two input vectors: new and old vectors of two views.
1414 * Dead members are old - new members.
1415 * @param oldMembers Vector of old members
1416 * @param newMembers Vector of new members
1417 * @return Vector of members that have died between the two views, can be empty.
1418 */
1419 protected Vector<ClusterNode> getDeadMembers(Vector<ClusterNode> oldMembers, Vector<ClusterNode> newMembers)
1420 {
1421 if(oldMembers == null)
1422 {
1423 oldMembers=new Vector<ClusterNode>();
1424 }
1425 if(newMembers == null)
1426 {
1427 newMembers=new Vector<ClusterNode>();
1428 }
1429 Vector<ClusterNode> dead=(Vector)oldMembers.clone();
1430 dead.removeAll(newMembers);
1431 this.log.debug("dead members: " + dead);
1432 return dead;
1433 }
1434
1435 /**
1436 * Helper method that returns a vector of new members from two input vectors: new and old vectors of two views.
1437 * @param oldMembers Vector of old members
1438 * @param allMembers Vector of new members
1439 * @return Vector of members that have joined the partition between the two views
1440 */
1441 protected Vector<ClusterNode> getNewMembers(Vector<ClusterNode> oldMembers, Vector<ClusterNode> allMembers)
1442 {
1443 if(oldMembers == null)
1444 {
1445 oldMembers=new Vector<ClusterNode>();
1446 }
1447 if(allMembers == null)
1448 {
1449 allMembers=new Vector<ClusterNode>();
1450 }
1451 Vector<ClusterNode> newMembers=(Vector)allMembers.clone();
1452 newMembers.removeAll(oldMembers);
1453 return newMembers;
1454 }
1455
1456 protected void notifyListeners(ArrayList<HAMembershipListener> theListeners, long viewID,
1457 Vector<ClusterNode> allMembers, Vector<ClusterNode> deadMembers, Vector<ClusterNode> newMembers,
1458 Vector<View> originatingGroups)
1459 {
1460 this.log.debug("Begin notifyListeners, viewID: "+viewID);
1461 synchronized(theListeners)
1462 {
1463 // JBAS-3619 -- don't hold synch lock while notifying
1464 theListeners = (ArrayList) theListeners.clone();
1465 }
1466
1467 for (int i = 0; i < theListeners.size(); i++)
1468 {
1469 HAMembershipListener aListener = null;
1470 try
1471 {
1472 aListener = theListeners.get(i);
1473 if(originatingGroups != null && (aListener instanceof HAMembershipExtendedListener))
1474 {
1475 HAMembershipExtendedListener exListener = (HAMembershipExtendedListener) aListener;
1476 exListener.membershipChangedDuringMerge (deadMembers, newMembers,
1477 allMembers, originatingGroups);
1478 }
1479 else
1480 {
1481 aListener.membershipChanged(deadMembers, newMembers, allMembers);
1482 }
1483 }
1484 catch (Throwable e)
1485 {
1486 // a problem in a listener should not prevent other members to receive the new view
1487 this.log.warn("HAMembershipListener callback failure: "+aListener, e);
1488 }
1489 }
1490
1491 this.log.debug("End notifyListeners, viewID: "+viewID);
1492 }
1493
1494 /*
1495 * Allows caller to specify whether the partition instance should be bound into JNDI. Default value is true.
1496 * This method must be called before the partition is started as the binding occurs during startup.
1497 *
1498 * @param bind Whether to bind the partition into JNDI.
1499 */
1500 public void setBindIntoJndi(boolean bind)
1501 {
1502 this.bindIntoJndi = bind;
1503 }
1504
1505 /*
1506 * Allows caller to determine whether the partition instance has been bound into JNDI.
1507 *
1508 * @return true if the partition has been bound into JNDI.
1509 */
1510 public boolean getBindIntoJndi()
1511 {
1512 return this.bindIntoJndi;
1513 }
1514
1515 public ThreadPool getThreadPool()
1516 {
1517 return this.threadPool;
1518 }
1519
1520 public void setThreadPool(ThreadPool threadPool)
1521 {
1522 this.threadPool = threadPool;
1523 }
1524
1525 protected Vector<ClusterNode> translateAddresses(Vector<Address> addresses)
1526 {
1527 if (addresses == null)
1528 {
1529 return null;
1530 }
1531
1532 Vector<ClusterNode> result = new Vector<ClusterNode>(addresses.size());
1533 for (Address address: addresses)
1534 {
1535 result.add(new ClusterNodeImpl((IpAddress) address));
1536 }
1537
1538 return result;
1539 }
1540
1541 public void logHistory (String message)
1542 {
1543 try
1544 {
1545 this.history.add(new SimpleDateFormat().format (new Date()) + " : " + message);
1546 }
1547 catch (Exception ignored){}
1548 }
1549
1550 // --------------------------------------------------- ClusterPartitionMBean
1551
1552 public String showHistory ()
1553 {
1554 StringBuffer buff = new StringBuffer();
1555 Vector<String> data = new Vector<String>(this.history);
1556 for (java.util.Iterator<String> row = data.iterator(); row.hasNext();)
1557 {
1558 String info = row.next();
1559 buff.append(info).append("\n");
1560 }
1561 return buff.toString();
1562 }
1563
1564 public String showHistoryAsXML ()
1565 {
1566 StringBuffer buff = new StringBuffer();
1567 buff.append("<events>\n");
1568 Vector<String> data = new Vector<String>(this.history);
1569 for (java.util.Iterator<String> row = data.iterator(); row.hasNext();)
1570 {
1571 buff.append(" <event>\n ");
1572 String info = row.next();
1573 buff.append(info);
1574 buff.append("\n </event>\n");
1575 }
1576 buff.append("</events>\n");
1577 return buff.toString();
1578 }
1579
1580 public Cache getClusteredCache()
1581 {
1582 return this.cache;
1583 }
1584
1585 public boolean getDeadlockDetection()
1586 {
1587 return this.deadlock_detection;
1588 }
1589
1590 public void setDeadlockDetection(boolean doit)
1591 {
1592 this.deadlock_detection = doit;
1593 }
1594
1595 public HAPartition getHAPartition()
1596 {
1597 return this;
1598 }
1599
1600 public String getJGroupsVersion()
1601 {
1602 return Version.description + "( " + Version.cvs + ")";
1603 }
1604
1605 public ChannelFactory getChannelFactory()
1606 {
1607 return this.channelFactory;
1608 }
1609
1610 public CacheManager getCacheManager()
1611 {
1612 return this.cacheManager;
1613 }
1614
1615 public void setCacheManager(CacheManager cacheManager)
1616 {
1617 this.cacheManager = cacheManager;
1618 }
1619
1620 public String getCacheConfigName()
1621 {
1622 return this.cacheConfigName;
1623 }
1624
1625 public void setCacheConfigName(String cacheConfigName)
1626 {
1627 this.cacheConfigName = cacheConfigName;
1628 }
1629
1630 public String getChannelStackName()
1631 {
1632 return this.stackName;
1633 }
1634
1635 public InetAddress getNodeAddress()
1636 {
1637 return this.nodeAddress;
1638 }
1639
1640 public void setNodeAddress(InetAddress address)
1641 {
1642 this.nodeAddress = address;
1643 }
1644
1645 public long getStateTransferTimeout() {
1646 return this.state_transfer_timeout;
1647 }
1648
1649 public void setStateTransferTimeout(long timeout)
1650 {
1651 this.state_transfer_timeout = timeout;
1652 }
1653
1654 public long getMethodCallTimeout() {
1655 return this.method_call_timeout;
1656 }
1657
1658 public void setMethodCallTimeout(long timeout)
1659 {
1660 this.method_call_timeout = timeout;
1661 }
1662
1663 // Protected --------------------------------------------------------------
1664
1665 /**
1666 * Creates an object from a byte buffer
1667 */
1668 protected Object objectFromByteBufferInternal (byte[] buffer) throws Exception
1669 {
1670 if(buffer == null)
1671 {
1672 return null;
1673 }
1674
1675 ByteArrayInputStream bais = new ByteArrayInputStream(buffer);
1676 MarshalledValueInputStream mvis = new MarshalledValueInputStream(bais);
1677 return mvis.readObject();
1678 }
1679
1680 /**
1681 * Serializes an object into a byte buffer.
1682 * The object has to implement interface Serializable or Externalizable
1683 */
1684 protected byte[] objectToByteBufferInternal (Object obj) throws Exception
1685 {
1686 ByteArrayOutputStream baos = new ByteArrayOutputStream();
1687 MarshalledValueOutputStream mvos = new MarshalledValueOutputStream(baos);
1688 mvos.writeObject(obj);
1689 mvos.flush();
1690 return baos.toByteArray();
1691 }
1692
1693 /**
1694 * Creates a response object from a byte buffer - optimized for response marshalling
1695 */
1696 protected Object objectFromByteBufferResponseInternal (byte[] buffer) throws Exception
1697 {
1698 if(buffer == null)
1699 {
1700 return null;
1701 }
1702
1703 if (buffer[0] == NULL_VALUE)
1704 {
1705 return null;
1706 }
1707
1708 ByteArrayInputStream bais = new ByteArrayInputStream(buffer);
1709 // read past the null/serializable byte
1710 bais.read();
1711 MarshalledValueInputStream mvis = new MarshalledValueInputStream(bais);
1712 return mvis.readObject();
1713 }
1714
1715 /**
1716 * Serializes a response object into a byte buffer, optimized for response marshalling.
1717 * The object has to implement interface Serializable or Externalizable
1718 */
1719 protected byte[] objectToByteBufferResponseInternal (Object obj) throws Exception
1720 {
1721 if (obj == null)
1722 {
1723 return new byte[]{NULL_VALUE};
1724 }
1725
1726 ByteArrayOutputStream baos = new ByteArrayOutputStream();
1727 // write a marker to stream to distinguish from null value stream
1728 baos.write(SERIALIZABLE_VALUE);
1729 MarshalledValueOutputStream mvos = new MarshalledValueOutputStream(baos);
1730 mvos.writeObject(obj);
1731 mvos.flush();
1732 return baos.toByteArray();
1733 }
1734
1735 // Private -------------------------------------------------------
1736
1737 // Inner classes -------------------------------------------------
1738
1739 private class MessageListenerAdapter
1740 implements ExtendedMessageListener
1741 {
1742
1743 public void getState(OutputStream stream)
1744 {
1745 ClusterPartition.this.logHistory ("getState called on partition");
1746
1747 ClusterPartition.this.log.debug("getState called.");
1748 try
1749 {
1750 ClusterPartition.this.getStateInternal(stream);
1751 }
1752 catch (Exception ex)
1753 {
1754 ClusterPartition.this.log.error("getState failed", ex);
1755 }
1756
1757 }
1758
1759 public void getState(String state_id, OutputStream ostream)
1760 {
1761 throw new UnsupportedOperationException("Not implemented; see http://jira.jboss.com/jira/browse/JBAS-3594");
1762 }
1763
1764 public byte[] getState(String state_id)
1765 {
1766 throw new UnsupportedOperationException("Not implemented; see http://jira.jboss.com/jira/browse/JBAS-3594");
1767 }
1768
1769 public void setState(InputStream stream)
1770 {
1771 ClusterPartition.this.logHistory ("setState called on partition");
1772 try
1773 {
1774 if (stream == null)
1775 {
1776 ClusterPartition.this.log.debug("transferred serviceState is null (may be first member in cluster)");
1777 }
1778 else
1779 {
1780 ClusterPartition.this.setStateInternal(stream);
1781 }
1782
1783 ClusterPartition.this.isStateSet = true;
1784 }
1785 catch (Throwable t)
1786 {
1787 ClusterPartition.this.recordSetStateFailure(t);
1788 }
1789 finally
1790 {
1791 // Notify waiting thread that serviceState has been set.
1792 ClusterPartition.this.notifyChannelLock();
1793 }
1794 }
1795
1796 public byte[] getState()
1797 {
1798 ClusterPartition.this.logHistory ("getState called on partition");
1799
1800 ClusterPartition.this.log.debug("getState called.");
1801 try
1802 {
1803 ByteArrayOutputStream baos = new ByteArrayOutputStream(1024);
1804 ClusterPartition.this.getStateInternal(baos);
1805 return baos.toByteArray();
1806 }
1807 catch (Exception ex)
1808 {
1809 ClusterPartition.this.log.error("getState failed", ex);
1810 }
1811 return null; // This will cause the receiver to get a "false" on the channel.getState() call
1812 }
1813
1814 public void setState(String state_id, byte[] state)
1815 {
1816 throw new UnsupportedOperationException("Not implemented; see http://jira.jboss.com/jira/browse/JBAS-3594");
1817 }
1818
1819 public void setState(String state_id, InputStream istream)
1820 {
1821 throw new UnsupportedOperationException("Not implemented; see http://jira.jboss.com/jira/browse/JBAS-3594");
1822 }
1823
1824 public void receive(org.jgroups.Message msg)
1825 { /* complete */}
1826
1827 public void setState(byte[] obj)
1828 {
1829 ClusterPartition.this.logHistory ("setState called on partition");
1830 try
1831 {
1832 if (obj == null)
1833 {
1834 ClusterPartition.this.log.debug("transferred serviceState is null (may be first member in cluster)");
1835 }
1836 else
1837 {
1838 ByteArrayInputStream bais = new ByteArrayInputStream(obj);
1839 ClusterPartition.this.setStateInternal(bais);
1840 bais.close();
1841 }
1842
1843 ClusterPartition.this.isStateSet = true;
1844 }
1845 catch (Throwable t)
1846 {
1847 ClusterPartition.this.recordSetStateFailure(t);
1848 }
1849 finally
1850 {
1851 // Notify waiting thread that serviceState has been set.
1852 ClusterPartition.this.notifyChannelLock();
1853 }
1854 }
1855
1856 }
1857
1858 /**
1859 * A simple data class containing the view change event needed to
1860 * notify the HAMembershipListeners
1861 */
1862 private static class ViewChangeEvent
1863 {
1864 long viewId;
1865 Vector<ClusterNode> deadMembers;
1866 Vector<ClusterNode> newMembers;
1867 Vector<ClusterNode> allMembers;
1868 Vector<View> originatingGroups;
1869 }
1870
1871 private class RequestMarshallerImpl implements org.jgroups.blocks.RpcDispatcher.Marshaller
1872 {
1873
1874 public Object objectFromByteBuffer(byte[] buf) throws Exception
1875 {
1876 return ClusterPartition.this.objectFromByteBufferInternal(buf);
1877 }
1878
1879 public byte[] objectToByteBuffer(Object obj) throws Exception
1880 {
1881 // wrap MethodCall in Object[service_name, byte[]] so that service name is available during demarshalling
1882 if (obj instanceof MethodCall)
1883 {
1884 String name = ((MethodCall)obj).getName();
1885 int idx = name.lastIndexOf('.');
1886 String serviceName = name.substring(0, idx);
1887 return ClusterPartition.this.objectToByteBufferInternal(new Object[]{serviceName, ClusterPartition.this.objectToByteBufferInternal(obj)});
1888 }
1889
1890 return ClusterPartition.this.objectToByteBufferInternal(obj);
1891 }
1892 }
1893
1894 private class ResponseMarshallerImpl implements org.jgroups.blocks.RpcDispatcher.Marshaller
1895 {
1896
1897 public Object objectFromByteBuffer(byte[] buf) throws Exception
1898 {
1899 boolean trace = ClusterPartition.this.log.isTraceEnabled();
1900 Object retval = ClusterPartition.this.objectFromByteBufferResponseInternal(buf);
1901 // HAServiceResponse is only received when a scoped classloader is required for unmarshalling
1902 if (!(retval instanceof HAServiceResponse))
1903 {
1904 return retval;
1905 }
1906
1907 String serviceName = ((HAServiceResponse)retval).getServiceName();
1908 byte[] payload = ((HAServiceResponse)retval).getPayload();
1909
1910 ClassLoader previousCL = null;
1911 boolean overrideCL = false;
1912 try
1913 {
1914 WeakReference<ClassLoader> weak = ClusterPartition.this.clmap.get(serviceName);
1915 if (weak != null) // this should always be true since we only use HAServiceResponse when classloader is specified
1916 {
1917 previousCL = Thread.currentThread().getContextClassLoader();
1918 ClassLoader loader = weak.get();
1919 if( trace )
1920 {
1921 ClusterPartition.this.log.trace("overriding response Thread ContextClassLoader for service " + serviceName);
1922 }
1923 overrideCL = true;
1924 Thread.currentThread().setContextClassLoader(loader);
1925 }
1926 retval = ClusterPartition.this.objectFromByteBufferResponseInternal(payload);
1927
1928 return retval;
1929 }
1930 finally
1931 {
1932 if (overrideCL == true)
1933 {
1934 ClusterPartition.this.log.trace("resetting response classloader");
1935 Thread.currentThread().setContextClassLoader(previousCL);
1936 }
1937 }
1938 }
1939
1940 public byte[] objectToByteBuffer(Object obj) throws Exception
1941 {
1942 return ClusterPartition.this.objectToByteBufferResponseInternal(obj);
1943 }
1944 }
1945
1946 /**
1947 * Overrides RpcDispatcher.Handle so that we can dispatch to many
1948 * different objects.
1949 */
1950 private class RpcHandler extends RpcDispatcher
1951 {
1952 private RpcHandler(Channel channel, MessageListener l, MembershipListener l2, Object server_obj,
1953 boolean deadlock_detection)
1954 {
1955 super(channel, l, l2, server_obj, deadlock_detection);
1956 }
1957
1958 /**
1959 * Analyze the MethodCall contained in <code>req</code> to find the
1960 * registered service object to invoke against, and then execute it
1961 * against *that* object and return result.
1962 *
1963 * This overrides RpcDispatcher.Handle so that we can dispatch to many different objects.
1964 * @param req The org.jgroups. representation of the method invocation
1965 * @return The serializable return value from the invocation
1966 */
1967 public Object handle(Message req)
1968 {
1969 Object body = null;
1970 Object retval = null;
1971 Object handler = null;
1972 boolean trace = this.log.isTraceEnabled();
1973 boolean overrideCL = false;
1974 ClassLoader previousCL = null;
1975 String service = null;
1976 byte[] request_bytes = null;
1977
1978 if( trace )
1979 {
1980 this.log.trace("Partition " + ClusterPartition.this.getPartitionName() + " received msg");
1981 }
1982 if(req == null || req.getBuffer() == null)
1983 {
1984 this.log.warn("Partition " + ClusterPartition.this.getPartitionName() + " message or message buffer is null!");
1985 return null;
1986 }
1987
1988 try
1989 {
1990 Object wrapper = ClusterPartition.this.objectFromByteBufferInternal(req.getBuffer());
1991 if(wrapper == null || !(wrapper instanceof Object[]))
1992 {
1993 this.log.warn("Partition " + ClusterPartition.this.getPartitionName() + " message wrapper does not contain Object[] object!");
1994 return null;
1995 }
1996
1997 // wrapper should be Object[]{service_name, byte[]}
1998 Object[] temp = (Object[])wrapper;
1999 service = (String)temp[0];
2000 request_bytes = (byte[])temp[1];
2001
2002 // see if this node has registered to handle this service
2003 handler = ClusterPartition.this.rpcHandlers.get(service);
2004 if (handler == null)
2005 {
2006 if( trace )
2007 {
2008 this.log.trace("Partition " + ClusterPartition.this.getPartitionName() + " no rpc handler registered under service " + service);
2009 }
2010 return new NoHandlerForRPC();
2011 }
2012 }
2013 catch(Exception e)
2014 {
2015 this.log.warn("Partition " + ClusterPartition.this.getPartitionName() + " failed unserializing message buffer (msg=" + req + ")", e);
2016 return null;
2017 }
2018
2019 try
2020 {
2021 // If client registered the service with a classloader, override the thread classloader here
2022 WeakReference<ClassLoader> weak = ClusterPartition.this.clmap.get(service);
2023 if (weak != null)
2024 {
2025 if( trace )
2026 {
2027 this.log.trace("overriding Thread ContextClassLoader for RPC service " + service);
2028 }
2029 previousCL = Thread.currentThread().getContextClassLoader();
2030 ClassLoader loader = weak.get();
2031 overrideCL = true;
2032 Thread.currentThread().setContextClassLoader(loader);
2033 }
2034 body = ClusterPartition.this.objectFromByteBufferInternal(request_bytes);
2035 }
2036 catch (Exception e)
2037 {
2038 this.log.warn("Partition " + ClusterPartition.this.getPartitionName() + " failed extracting message body from request bytes", e);
2039 return null;
2040 }
2041 finally
2042 {
2043 if (overrideCL)
2044 {
2045 this.log.trace("resetting Thread ContextClassLoader");
2046 Thread.currentThread().setContextClassLoader(previousCL);
2047 }
2048 }
2049
2050 if(body == null || !(body instanceof MethodCall))
2051 {
2052 this.log.warn("Partition " + ClusterPartition.this.getPartitionName() + " message does not contain a MethodCall object!");
2053 return null;
2054 }
2055
2056 // get method call information
2057 MethodCall method_call = (MethodCall)body;
2058 String methodName = method_call.getName();
2059
2060 if( trace )
2061 {
2062 this.log.trace("full methodName: " + methodName);
2063 }
2064
2065 int idx = methodName.lastIndexOf('.');
2066 String handlerName = methodName.substring(0, idx);
2067 String newMethodName = methodName.substring(idx + 1);
2068 if( trace )
2069 {
2070 this.log.trace("handlerName: " + handlerName + " methodName: " + newMethodName);
2071 this.log.trace("Handle: " + methodName);
2072 }
2073
2074 // prepare method call
2075 method_call.setName(newMethodName);
2076
2077 /* Invoke it and just return any exception with trace level logging of
2078 the exception. The exception semantics of a group rpc call are weak as
2079 the return value may be a normal return value or the exception thrown.
2080 */
2081 try
2082 {
2083 retval = method_call.invoke(handler);
2084 if (overrideCL)
2085 {
2086 // wrap the response so that the service name can be accessed during unmarshalling of the response
2087 byte[] retbytes = ClusterPartition.this.objectToByteBufferResponseInternal(retval);
2088 retval = new HAServiceResponse(handlerName, retbytes);
2089 }
2090 if( trace )
2091 {
2092 this.log.trace("rpc call return value: " + retval);
2093 }
2094 }
2095 catch (Throwable t)
2096 {
2097 if( trace )
2098 {
2099 this.log.trace("Partition " + ClusterPartition.this.getPartitionName() + " rpc call threw exception", t);
2100 }
2101 retval = t;
2102 }
2103
2104 return retval;
2105 }
2106
2107 }
2108 /**
2109 * Copyright (c) 2005 Brian Goetz and Tim Peierls
2110 * Released under the Creative Commons Attribution License
2111 * (http://creativecommons.org/licenses/by/2.5)
2112 * Official home: http://www.jcip.net
2113 *
2114 * ThreadGate <p/> Recloseable gate using wait and notifyAll
2115 *
2116 * @author Brian Goetz and Tim Peierls
2117 */
2118
2119 private static class ThreadGate {
2120 // CONDITION-PREDICATE: opened-since(n) (isOpen || generation>n)
2121 private boolean isOpen;
2122
2123 private int generation;
2124
2125 public synchronized void close()
2126 {
2127 this.isOpen = false;
2128 }
2129
2130 public synchronized void open()
2131 {
2132 ++this.generation;
2133 this.isOpen = true;
2134 this.notifyAll();
2135 }
2136
2137 // BLOCKS-UNTIL: opened-since(generation on entry)
2138 public synchronized void await() throws InterruptedException
2139 {
2140 int arrivalGeneration = this.generation;
2141 while(!this.isOpen && arrivalGeneration == this.generation)
2142 {
2143 this.wait();
2144 }
2145 }
2146
2147 // BLOCKS-UNTIL: opened-since(generation on entry)
2148 public synchronized void await(long timeout) throws InterruptedException
2149 {
2150 int arrivalGeneration = this.generation;
2151 while(!this.isOpen && arrivalGeneration == this.generation)
2152 {
2153 this.wait(timeout);
2154 }
2155 }
2156 }
2157
2158 private void setupLoggers(String partitionName)
2159 {
2160 if (partitionName == null)
2161 {
2162 this.log = Logger.getLogger(HAPartition.class.getName());
2163 this.clusterLifeCycleLog = Logger.getLogger(HAPartition.class.getName() + ".lifecycle");
2164 }
2165 else
2166 {
2167 this.log = Logger.getLogger(HAPartition.class.getName() + "." + partitionName);
2168 this.clusterLifeCycleLog = Logger.getLogger(HAPartition.class.getName() + ".lifecycle." + partitionName);
2169 }
2170 }
2171
2172 }