1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17 package org.apache.catalina.tribes.group;
18
19
20 import java.io.Serializable;
21 import java.util.ArrayList;
22 import java.util.Iterator;
23
24 import org.apache.catalina.tribes.ByteMessage;
25 import org.apache.catalina.tribes.Channel;
26 import org.apache.catalina.tribes.ChannelException;
27 import org.apache.catalina.tribes.ChannelInterceptor;
28 import org.apache.catalina.tribes.ChannelListener;
29 import org.apache.catalina.tribes.ChannelMessage;
30 import org.apache.catalina.tribes.ChannelReceiver;
31 import org.apache.catalina.tribes.ChannelSender;
32 import org.apache.catalina.tribes.ErrorHandler;
33 import org.apache.catalina.tribes.ManagedChannel;
34 import org.apache.catalina.tribes.Member;
35 import org.apache.catalina.tribes.MembershipListener;
36 import org.apache.catalina.tribes.MembershipService;
37 import org.apache.catalina.tribes.group.interceptors.MessageDispatchInterceptor;
38 import org.apache.catalina.tribes.io.ChannelData;
39 import org.apache.catalina.tribes.io.XByteBuffer;
40 import org.apache.catalina.tribes.UniqueId;
41 import org.apache.catalina.tribes.Heartbeat;
42 import org.apache.catalina.tribes.io.BufferPool;
43 import org.apache.catalina.tribes.RemoteProcessException;
44 import org.apache.catalina.tribes.util.Logs;
45 import org.apache.catalina.tribes.util.Arrays;
46
47 /**
48 * The default implementation of a Channel.<br>
49 * The GroupChannel manages the replication channel. It coordinates
50 * message being sent and received with membership announcements.
51 * The channel has an chain of interceptors that can modify the message or perform other logic.<br>
52 * It manages a complete group, both membership and replication.
53 * @author Filip Hanik
54 * @version $Revision: 568742 $, $Date: 2007-08-22 22:19:54 +0200 (mer., 22 août 2007) $
55 */
56 public class GroupChannel extends ChannelInterceptorBase implements ManagedChannel {
57 /**
58 * Flag to determine if the channel manages its own heartbeat
59 * If set to true, the channel will start a local thread for the heart beat.
60 */
61 protected boolean heartbeat = true;
62 /**
63 * If <code>heartbeat == true</code> then how often do we want this
64 * heartbeat to run. default is one minute
65 */
66 protected long heartbeatSleeptime = 5*1000;//every 5 seconds
67
68 /**
69 * Internal heartbeat thread
70 */
71 protected HeartbeatThread hbthread = null;
72
73 /**
74 * The <code>ChannelCoordinator</code> coordinates the bottom layer components:<br>
75 * - MembershipService<br>
76 * - ChannelSender <br>
77 * - ChannelReceiver<br>
78 */
79 protected ChannelCoordinator coordinator = new ChannelCoordinator();
80
81 /**
82 * The first interceptor in the inteceptor stack.
83 * The interceptors are chained in a linked list, so we only need a reference to the
84 * first one
85 */
86 protected ChannelInterceptor interceptors = null;
87
88 /**
89 * A list of membership listeners that subscribe to membership announcements
90 */
91 protected ArrayList membershipListeners = new ArrayList();
92
93 /**
94 * A list of channel listeners that subscribe to incoming messages
95 */
96 protected ArrayList channelListeners = new ArrayList();
97
98 /**
99 * If set to true, the GroupChannel will check to make sure that
100 */
101 protected boolean optionCheck = false;
102
103 /**
104 * Creates a GroupChannel. This constructor will also
105 * add the first interceptor in the GroupChannel.<br>
106 * The first interceptor is always the channel itself.
107 */
108 public GroupChannel() {
109 addInterceptor(this);
110 }
111
112
113 /**
114 * Adds an interceptor to the stack for message processing<br>
115 * Interceptors are ordered in the way they are added.<br>
116 * <code>channel.addInterceptor(A);</code><br>
117 * <code>channel.addInterceptor(C);</code><br>
118 * <code>channel.addInterceptor(B);</code><br>
119 * Will result in a interceptor stack like this:<br>
120 * <code>A -> C -> B</code><br>
121 * The complete stack will look like this:<br>
122 * <code>Channel -> A -> C -> B -> ChannelCoordinator</code><br>
123 * @param interceptor ChannelInterceptorBase
124 */
125 public void addInterceptor(ChannelInterceptor interceptor) {
126 if ( interceptors == null ) {
127 interceptors = interceptor;
128 interceptors.setNext(coordinator);
129 interceptors.setPrevious(null);
130 coordinator.setPrevious(interceptors);
131 } else {
132 ChannelInterceptor last = interceptors;
133 while ( last.getNext() != coordinator ) {
134 last = last.getNext();
135 }
136 last.setNext(interceptor);
137 interceptor.setNext(coordinator);
138 interceptor.setPrevious(last);
139 coordinator.setPrevious(interceptor);
140 }
141 }
142
143 /**
144 * Sends a heartbeat through the interceptor stack.<br>
145 * Invoke this method from the application on a periodic basis if
146 * you have turned off internal heartbeats <code>channel.setHeartbeat(false)</code>
147 */
148 public void heartbeat() {
149 super.heartbeat();
150 Iterator i = membershipListeners.iterator();
151 while ( i.hasNext() ) {
152 Object o = i.next();
153 if ( o instanceof Heartbeat ) ((Heartbeat)o).heartbeat();
154 }
155 i = channelListeners.iterator();
156 while ( i.hasNext() ) {
157 Object o = i.next();
158 if ( o instanceof Heartbeat ) ((Heartbeat)o).heartbeat();
159 }
160
161 }
162
163
164 /**
165 * Send a message to the destinations specified
166 * @param destination Member[] - destination.length > 1
167 * @param msg Serializable - the message to send
168 * @param options int - sender options, options can trigger guarantee levels and different interceptors to
169 * react to the message see class documentation for the <code>Channel</code> object.<br>
170 * @return UniqueId - the unique Id that was assigned to this message
171 * @throws ChannelException - if an error occurs processing the message
172 * @see org.apache.catalina.tribes.Channel
173 */
174 public UniqueId send(Member[] destination, Serializable msg, int options) throws ChannelException {
175 return send(destination,msg,options,null);
176 }
177
178 /**
179 *
180 * @param destination Member[] - destination.length > 1
181 * @param msg Serializable - the message to send
182 * @param options int - sender options, options can trigger guarantee levels and different interceptors to
183 * react to the message see class documentation for the <code>Channel</code> object.<br>
184 * @param handler - callback object for error handling and completion notification, used when a message is
185 * sent asynchronously using the <code>Channel.SEND_OPTIONS_ASYNCHRONOUS</code> flag enabled.
186 * @return UniqueId - the unique Id that was assigned to this message
187 * @throws ChannelException - if an error occurs processing the message
188 * @see org.apache.catalina.tribes.Channel
189 */
190 public UniqueId send(Member[] destination, Serializable msg, int options, ErrorHandler handler) throws ChannelException {
191 if ( msg == null ) throw new ChannelException("Cant send a NULL message");
192 XByteBuffer buffer = null;
193 try {
194 if ( destination == null || destination.length == 0) throw new ChannelException("No destination given");
195 ChannelData data = new ChannelData(true);//generates a unique Id
196 data.setAddress(getLocalMember(false));
197 data.setTimestamp(System.currentTimeMillis());
198 byte[] b = null;
199 if ( msg instanceof ByteMessage ){
200 b = ((ByteMessage)msg).getMessage();
201 options = options | SEND_OPTIONS_BYTE_MESSAGE;
202 } else {
203 b = XByteBuffer.serialize(msg);
204 options = options & (~SEND_OPTIONS_BYTE_MESSAGE);
205 }
206 data.setOptions(options);
207 //XByteBuffer buffer = new XByteBuffer(b.length+128,false);
208 buffer = BufferPool.getBufferPool().getBuffer(b.length+128, false);
209 buffer.append(b,0,b.length);
210 data.setMessage(buffer);
211 InterceptorPayload payload = null;
212 if ( handler != null ) {
213 payload = new InterceptorPayload();
214 payload.setErrorHandler(handler);
215 }
216 getFirstInterceptor().sendMessage(destination, data, payload);
217 if ( Logs.MESSAGES.isTraceEnabled() ) {
218 Logs.MESSAGES.trace("GroupChannel - Sent msg:" + new UniqueId(data.getUniqueId()) + " at " +new java.sql.Timestamp(System.currentTimeMillis())+ " to "+Arrays.toNameString(destination));
219 Logs.MESSAGES.trace("GroupChannel - Send Message:" + new UniqueId(data.getUniqueId()) + " is " +msg);
220 }
221
222 return new UniqueId(data.getUniqueId());
223 }catch ( Exception x ) {
224 if ( x instanceof ChannelException ) throw (ChannelException)x;
225 throw new ChannelException(x);
226 } finally {
227 if ( buffer != null ) BufferPool.getBufferPool().returnBuffer(buffer);
228 }
229 }
230
231
232 /**
233 * Callback from the interceptor stack. <br>
234 * When a message is received from a remote node, this method will be invoked by
235 * the previous interceptor.<br>
236 * This method can also be used to send a message to other components within the same application,
237 * but its an extreme case, and you're probably better off doing that logic between the applications itself.
238 * @param msg ChannelMessage
239 */
240 public void messageReceived(ChannelMessage msg) {
241 if ( msg == null ) return;
242 try {
243 if ( Logs.MESSAGES.isTraceEnabled() ) {
244 Logs.MESSAGES.trace("GroupChannel - Received msg:" + new UniqueId(msg.getUniqueId()) + " at " +new java.sql.Timestamp(System.currentTimeMillis())+ " from "+msg.getAddress().getName());
245 }
246
247 Serializable fwd = null;
248 if ( (msg.getOptions() & SEND_OPTIONS_BYTE_MESSAGE) == SEND_OPTIONS_BYTE_MESSAGE ) {
249 fwd = new ByteMessage(msg.getMessage().getBytes());
250 } else {
251 try {
252 fwd = XByteBuffer.deserialize(msg.getMessage().getBytesDirect(), 0, msg.getMessage().getLength());
253 }catch (Exception sx) {
254 log.error("Unable to deserialize message:"+msg,sx);
255 return;
256 }
257 }
258 if ( Logs.MESSAGES.isTraceEnabled() ) {
259 Logs.MESSAGES.trace("GroupChannel - Receive Message:" + new UniqueId(msg.getUniqueId()) + " is " +fwd);
260 }
261
262 //get the actual member with the correct alive time
263 Member source = msg.getAddress();
264 boolean rx = false;
265 boolean delivered = false;
266 for ( int i=0; i<channelListeners.size(); i++ ) {
267 ChannelListener channelListener = (ChannelListener)channelListeners.get(i);
268 if (channelListener != null && channelListener.accept(fwd, source)) {
269 channelListener.messageReceived(fwd, source);
270 delivered = true;
271 //if the message was accepted by an RPC channel, that channel
272 //is responsible for returning the reply, otherwise we send an absence reply
273 if ( channelListener instanceof RpcChannel ) rx = true;
274 }
275 }//for
276 if ((!rx) && (fwd instanceof RpcMessage)) {
277 //if we have a message that requires a response,
278 //but none was given, send back an immediate one
279 sendNoRpcChannelReply((RpcMessage)fwd,source);
280 }
281 if ( Logs.MESSAGES.isTraceEnabled() ) {
282 Logs.MESSAGES.trace("GroupChannel delivered["+delivered+"] id:"+new UniqueId(msg.getUniqueId()));
283 }
284
285 } catch ( Exception x ) {
286 //this could be the channel listener throwing an exception, we should log it
287 //as a warning.
288 if ( log.isWarnEnabled() ) log.warn("Error receiving message:",x);
289 throw new RemoteProcessException("Exception:"+x.getMessage(),x);
290 }
291 }
292
293 /**
294 * Sends a <code>NoRpcChannelReply</code> message to a member<br>
295 * This method gets invoked by the channel if a RPC message comes in
296 * and no channel listener accepts the message. This avoids timeout
297 * @param msg RpcMessage
298 * @param destination Member - the destination for the reply
299 */
300 protected void sendNoRpcChannelReply(RpcMessage msg, Member destination) {
301 try {
302 //avoid circular loop
303 if ( msg instanceof RpcMessage.NoRpcChannelReply) return;
304 RpcMessage.NoRpcChannelReply reply = new RpcMessage.NoRpcChannelReply(msg.rpcId,msg.uuid);
305 send(new Member[]{destination},reply,Channel.SEND_OPTIONS_ASYNCHRONOUS);
306 } catch ( Exception x ) {
307 log.error("Unable to find rpc channel, failed to send NoRpcChannelReply.",x);
308 }
309 }
310
311 /**
312 * memberAdded gets invoked by the interceptor below the channel
313 * and the channel will broadcast it to the membership listeners
314 * @param member Member - the new member
315 */
316 public void memberAdded(Member member) {
317 //notify upwards
318 for (int i=0; i<membershipListeners.size(); i++ ) {
319 MembershipListener membershipListener = (MembershipListener)membershipListeners.get(i);
320 if (membershipListener != null) membershipListener.memberAdded(member);
321 }
322 }
323
324 /**
325 * memberDisappeared gets invoked by the interceptor below the channel
326 * and the channel will broadcast it to the membership listeners
327 * @param member Member - the member that left or crashed
328 */
329 public void memberDisappeared(Member member) {
330 //notify upwards
331 for (int i=0; i<membershipListeners.size(); i++ ) {
332 MembershipListener membershipListener = (MembershipListener)membershipListeners.get(i);
333 if (membershipListener != null) membershipListener.memberDisappeared(member);
334 }
335 }
336
337 /**
338 * Sets up the default implementation interceptor stack
339 * if no interceptors have been added
340 * @throws ChannelException
341 */
342 protected synchronized void setupDefaultStack() throws ChannelException {
343
344 if ( getFirstInterceptor() != null &&
345 ((getFirstInterceptor().getNext() instanceof ChannelCoordinator))) {
346 ChannelInterceptor interceptor = null;
347 Class clazz = null;
348 try {
349 clazz = Class.forName("org.apache.catalina.tribes.group.interceptors.MessageDispatch15Interceptor",
350 true,GroupChannel.class.getClassLoader());
351 clazz.newInstance();
352 } catch ( Throwable x ) {
353 clazz = MessageDispatchInterceptor.class;
354 }//catch
355 try {
356 interceptor = (ChannelInterceptor) clazz.newInstance();
357 } catch (Exception x) {
358 throw new ChannelException("Unable to add MessageDispatchInterceptor to interceptor chain.",x);
359 }
360 this.addInterceptor(interceptor);
361 }
362 }
363
364 /**
365 * Validates the option flags that each interceptor is using and reports
366 * an error if two interceptor share the same flag.
367 * @throws ChannelException
368 */
369 protected void checkOptionFlags() throws ChannelException {
370 StringBuffer conflicts = new StringBuffer();
371 ChannelInterceptor first = interceptors;
372 while ( first != null ) {
373 int flag = first.getOptionFlag();
374 if ( flag != 0 ) {
375 ChannelInterceptor next = first.getNext();
376 while ( next != null ) {
377 int nflag = next.getOptionFlag();
378 if (nflag!=0 && (((flag & nflag) == flag ) || ((flag & nflag) == nflag)) ) {
379 conflicts.append("[");
380 conflicts.append(first.getClass().getName());
381 conflicts.append(":");
382 conflicts.append(flag);
383 conflicts.append(" == ");
384 conflicts.append(next.getClass().getName());
385 conflicts.append(":");
386 conflicts.append(nflag);
387 conflicts.append("] ");
388 }//end if
389 next = next.getNext();
390 }//while
391 }//end if
392 first = first.getNext();
393 }//while
394 if ( conflicts.length() > 0 ) throw new ChannelException("Interceptor option flag conflict: "+conflicts.toString());
395
396 }
397
398 /**
399 * Starts the channel
400 * @param svc int - what service to start
401 * @throws ChannelException
402 * @see org.apache.catalina.tribes.Channel#start(int)
403 */
404 public synchronized void start(int svc) throws ChannelException {
405 setupDefaultStack();
406 if (optionCheck) checkOptionFlags();
407 super.start(svc);
408 if ( hbthread == null && heartbeat ) {
409 hbthread = new HeartbeatThread(this,heartbeatSleeptime);
410 hbthread.start();
411 }
412 }
413
414 /**
415 * Stops the channel
416 * @param svc int
417 * @throws ChannelException
418 * @see org.apache.catalina.tribes.Channel#stop(int)
419 */
420 public synchronized void stop(int svc) throws ChannelException {
421 if (hbthread != null) {
422 hbthread.stopHeartbeat();
423 hbthread = null;
424 }
425 super.stop(svc);
426 }
427
428 /**
429 * Returns the first interceptor of the stack. Useful for traversal.
430 * @return ChannelInterceptor
431 */
432 public ChannelInterceptor getFirstInterceptor() {
433 if (interceptors != null) return interceptors;
434 else return coordinator;
435 }
436
437 /**
438 * Returns the channel receiver component
439 * @return ChannelReceiver
440 */
441 public ChannelReceiver getChannelReceiver() {
442 return coordinator.getClusterReceiver();
443 }
444
445 /**
446 * Returns the channel sender component
447 * @return ChannelSender
448 */
449 public ChannelSender getChannelSender() {
450 return coordinator.getClusterSender();
451 }
452
453 /**
454 * Returns the membership service component
455 * @return MembershipService
456 */
457 public MembershipService getMembershipService() {
458 return coordinator.getMembershipService();
459 }
460
461 /**
462 * Sets the channel receiver component
463 * @param clusterReceiver ChannelReceiver
464 */
465 public void setChannelReceiver(ChannelReceiver clusterReceiver) {
466 coordinator.setClusterReceiver(clusterReceiver);
467 }
468
469 /**
470 * Sets the channel sender component
471 * @param clusterSender ChannelSender
472 */
473 public void setChannelSender(ChannelSender clusterSender) {
474 coordinator.setClusterSender(clusterSender);
475 }
476
477 /**
478 * Sets the membership component
479 * @param membershipService MembershipService
480 */
481 public void setMembershipService(MembershipService membershipService) {
482 coordinator.setMembershipService(membershipService);
483 }
484
485 /**
486 * Adds a membership listener to the channel.<br>
487 * Membership listeners are uniquely identified using the equals(Object) method
488 * @param membershipListener MembershipListener
489 */
490 public void addMembershipListener(MembershipListener membershipListener) {
491 if (!this.membershipListeners.contains(membershipListener) )
492 this.membershipListeners.add(membershipListener);
493 }
494
495 /**
496 * Removes a membership listener from the channel.<br>
497 * Membership listeners are uniquely identified using the equals(Object) method
498 * @param membershipListener MembershipListener
499 */
500
501 public void removeMembershipListener(MembershipListener membershipListener) {
502 membershipListeners.remove(membershipListener);
503 }
504
505 /**
506 * Adds a channel listener to the channel.<br>
507 * Channel listeners are uniquely identified using the equals(Object) method
508 * @param channelListener ChannelListener
509 */
510 public void addChannelListener(ChannelListener channelListener) {
511 if (!this.channelListeners.contains(channelListener) ) {
512 this.channelListeners.add(channelListener);
513 } else {
514 throw new IllegalArgumentException("Listener already exists:"+channelListener+"["+channelListener.getClass().getName()+"]");
515 }
516 }
517
518 /**
519 *
520 * Removes a channel listener from the channel.<br>
521 * Channel listeners are uniquely identified using the equals(Object) method
522 * @param channelListener ChannelListener
523 */
524 public void removeChannelListener(ChannelListener channelListener) {
525 channelListeners.remove(channelListener);
526 }
527
528 /**
529 * Returns an iterator of all the interceptors in this stack
530 * @return Iterator
531 */
532 public Iterator getInterceptors() {
533 return new InterceptorIterator(this.getNext(),this.coordinator);
534 }
535
536 /**
537 * Enables/disables the option check<br>
538 * Setting this to true, will make the GroupChannel perform a conflict check
539 * on the interceptors. If two interceptors are using the same option flag
540 * and throw an error upon start.
541 * @param optionCheck boolean
542 */
543 public void setOptionCheck(boolean optionCheck) {
544 this.optionCheck = optionCheck;
545 }
546
547 /**
548 * Configure local heartbeat sleep time<br>
549 * Only used when <code>getHeartbeat()==true</code>
550 * @param heartbeatSleeptime long - time in milliseconds to sleep between heartbeats
551 */
552 public void setHeartbeatSleeptime(long heartbeatSleeptime) {
553 this.heartbeatSleeptime = heartbeatSleeptime;
554 }
555
556 /**
557 * Enables or disables local heartbeat.
558 * if <code>setHeartbeat(true)</code> is invoked then the channel will start an internal
559 * thread to invoke <code>Channel.heartbeat()</code> every <code>getHeartbeatSleeptime</code> milliseconds
560 * @param heartbeat boolean
561 */
562 public void setHeartbeat(boolean heartbeat) {
563 this.heartbeat = heartbeat;
564 }
565
566 /**
567 * @see #setOptionCheck(boolean)
568 * @return boolean
569 */
570 public boolean getOptionCheck() {
571 return optionCheck;
572 }
573
574 /**
575 * @see #setHeartbeat(boolean)
576 * @return boolean
577 */
578 public boolean getHeartbeat() {
579 return heartbeat;
580 }
581
582 /**
583 * Returns the sleep time in milliseconds that the internal heartbeat will
584 * sleep in between invokations of <code>Channel.heartbeat()</code>
585 * @return long
586 */
587 public long getHeartbeatSleeptime() {
588 return heartbeatSleeptime;
589 }
590
591 /**
592 *
593 * <p>Title: Interceptor Iterator</p>
594 *
595 * <p>Description: An iterator to loop through the interceptors in a channel</p>
596 *
597 * @version 1.0
598 */
599 public static class InterceptorIterator implements Iterator {
600 private ChannelInterceptor end;
601 private ChannelInterceptor start;
602 public InterceptorIterator(ChannelInterceptor start, ChannelInterceptor end) {
603 this.end = end;
604 this.start = start;
605 }
606
607 public boolean hasNext() {
608 return start!=null && start != end;
609 }
610
611 public Object next() {
612 Object result = null;
613 if ( hasNext() ) {
614 result = start;
615 start = start.getNext();
616 }
617 return result;
618 }
619
620 public void remove() {
621 //empty operation
622 }
623 }
624
625 /**
626 *
627 * <p>Title: Internal heartbeat thread</p>
628 *
629 * <p>Description: if <code>Channel.getHeartbeat()==true</code> then a thread of this class
630 * is created</p>
631 *
632 * @version 1.0
633 */
634 public static class HeartbeatThread extends Thread {
635 protected static org.apache.juli.logging.Log log = org.apache.juli.logging.LogFactory.getLog(HeartbeatThread.class);
636 protected static int counter = 1;
637 protected static synchronized int inc() {
638 return counter++;
639 }
640
641 protected boolean doRun = true;
642 protected GroupChannel channel;
643 protected long sleepTime;
644 public HeartbeatThread(GroupChannel channel, long sleepTime) {
645 super();
646 this.setPriority(MIN_PRIORITY);
647 setName("GroupChannel-Heartbeat-"+inc());
648 setDaemon(true);
649 this.channel = channel;
650 this.sleepTime = sleepTime;
651 }
652 public void stopHeartbeat() {
653 doRun = false;
654 interrupt();
655 }
656
657 public void run() {
658 while (doRun) {
659 try {
660 Thread.sleep(sleepTime);
661 channel.heartbeat();
662 } catch ( InterruptedException x ) {
663 interrupted();
664 } catch ( Exception x ) {
665 log.error("Unable to send heartbeat through Tribes interceptor stack. Will try to sleep again.",x);
666 }//catch
667 }//while
668 }//run
669 }//HeartbeatThread
670
671
672
673 }