Source code: org/jgroups/stack/ProtocolStack.java
1 // $Id: ProtocolStack.java,v 1.24 2005/09/29 12:10:03 belaban Exp $
2
3 package org.jgroups.stack;
4
5 import org.jgroups.*;
6 import org.jgroups.conf.ClassConfigurator;
7 import org.jgroups.util.Promise;
8 import org.jgroups.util.TimeScheduler;
9
10 import java.util.*;
11
12
13
14
15 /**
16 * A ProtocolStack manages a number of protocols layered above each other. It creates all
17 * protocol classes, initializes them and, when ready, starts all of them, beginning with the
18 * bottom most protocol. It also dispatches messages received from the stack to registered
19 * objects (e.g. channel, GMP) and sends messages sent by those objects down the stack.<p>
20 * The ProtocolStack makes use of the Configurator to setup and initialize stacks, and to
21 * destroy them again when not needed anymore
22 * @author Bela Ban
23 */
24 public class ProtocolStack extends Protocol implements Transport {
25 private Protocol top_prot=null;
26 private Protocol bottom_prot=null;
27 private final Configurator conf=new Configurator();
28 private String setup_string;
29 private JChannel channel=null;
30 private boolean stopped=true;
31 public final TimeScheduler timer=new TimeScheduler(60000);
32 // final Promise ack_promise=new Promise();
33
34 /** Used to sync on START/START_OK events for start()*/
35 Promise start_promise=null;
36
37 /** used to sync on STOP/STOP_OK events for stop() */
38 Promise stop_promise=null;
39
40 public static final int ABOVE=1; // used by insertProtocol()
41 public static final int BELOW=2; // used by insertProtocol()
42
43
44
45 public ProtocolStack(JChannel channel, String setup_string) throws ChannelException {
46 this.setup_string=setup_string;
47 this.channel=channel;
48 ClassConfigurator.getInstance(true); // will create the singleton
49 }
50
51 /** Only used by Simulator; don't use */
52 public ProtocolStack() {
53
54 }
55
56
57 public Channel getChannel() {
58 return channel;
59 }
60
61 /** Returns all protocols in a list, from top to bottom. <em>These are not copies of protocols,
62 so modifications will affect the actual instances !</em> */
63 public Vector getProtocols() {
64 Protocol p;
65 Vector v=new Vector();
66
67 p=top_prot;
68 while(p != null) {
69 v.addElement(p);
70 p=p.getDownProtocol();
71 }
72 return v;
73 }
74
75 /**
76 *
77 * @return Map<String,Map<key,val>>
78 */
79 public Map dumpStats() {
80 Protocol p;
81 Map retval=new HashMap(), tmp;
82 String prot_name;
83
84 p=top_prot;
85 while(p != null) {
86 prot_name=p.getName();
87 tmp=p.dumpStats();
88 if(prot_name != null && tmp != null)
89 retval.put(prot_name, tmp);
90 p=p.getDownProtocol();
91 }
92 return retval;
93 }
94
95 public String dumpTimerQueue() {
96 return timer.dumpTaskQueue();
97 }
98
99 /**
100 * Prints the names of the protocols, from the bottom to top. If include_properties is true,
101 * the properties for each protocol will also be printed.
102 */
103 public String printProtocolSpec(boolean include_properties) {
104 StringBuffer sb=new StringBuffer();
105 Protocol prot=top_prot;
106 Properties tmpProps;
107 String name;
108 Map.Entry entry;
109
110 while(prot != null) {
111 name=prot.getName();
112 if(name != null) {
113 if("ProtocolStack".equals(name))
114 break;
115 sb.append(name);
116 if(include_properties) {
117 tmpProps=prot.getProperties();
118 if(tmpProps != null) {
119 sb.append('\n');
120 for(Iterator it=tmpProps.entrySet().iterator(); it.hasNext();) {
121 entry=(Map.Entry)it.next();
122 sb.append(entry + "\n");
123 }
124 }
125 }
126 sb.append('\n');
127
128 prot=prot.getDownProtocol();
129 }
130 }
131
132 return sb.toString();
133 }
134
135 public String printProtocolSpecAsXML() {
136 StringBuffer sb=new StringBuffer();
137 Protocol prot=bottom_prot;
138 Properties tmpProps;
139 String name;
140 Map.Entry entry;
141 int len, max_len=30;
142
143 sb.append("<config>\n");
144 while(prot != null) {
145 name=prot.getName();
146 if(name != null) {
147 if("ProtocolStack".equals(name))
148 break;
149 sb.append(" <").append(name).append(" ");
150 tmpProps=prot.getProperties();
151 if(tmpProps != null) {
152 len=name.length();
153 String s;
154 for(Iterator it=tmpProps.entrySet().iterator(); it.hasNext();) {
155 entry=(Map.Entry)it.next();
156 s=entry.getKey() + "=\"" + entry.getValue() + "\" ";
157 if(len + s.length() > max_len) {
158 sb.append("\n ");
159 len=8;
160 }
161 sb.append(s);
162 len+=s.length();
163 }
164 }
165 sb.append("/>\n");
166 prot=prot.getUpProtocol();
167 }
168 }
169 sb.append("</config>");
170
171 return sb.toString();
172 }
173
174
175 public void setup() throws Exception {
176 if(top_prot == null) {
177 top_prot=conf.setupProtocolStack(setup_string, this); // calls init() on each protocol
178 if(top_prot == null)
179 throw new Exception("ProtocolStack.setup(): couldn't create protocol stack");
180 top_prot.setUpProtocol(this);
181 bottom_prot=conf.getBottommostProtocol(top_prot);
182 conf.startProtocolStack(bottom_prot); // sets up queues and threads
183 }
184 }
185
186
187
188
189 /**
190 * Creates a new protocol given the protocol specification.
191 * @param prot_spec The specification of the protocol. Same convention as for specifying a protocol stack.
192 * An exception will be thrown if the class cannot be created. Example:
193 * <pre>"VERIFY_SUSPECT(timeout=1500)"</pre> Note that no colons (:) have to be
194 * specified
195 * @return Protocol The newly created protocol
196 * @exception Exception Will be thrown when the new protocol cannot be created
197 */
198 public Protocol createProtocol(String prot_spec) throws Exception {
199 return conf.createProtocol(prot_spec, this);
200 }
201
202
203
204
205
206
207 /**
208 * Inserts an already created (and initialized) protocol into the protocol list. Sets the links
209 * to the protocols above and below correctly and adjusts the linked list of protocols accordingly.
210 * Note that this method may change the value of top_prot or bottom_prot.
211 * @param prot The protocol to be inserted. Before insertion, a sanity check will ensure that none
212 * of the existing protocols have the same name as the new protocol.
213 * @param position Where to place the protocol with respect to the neighbor_prot (ABOVE, BELOW)
214 * @param neighbor_prot The name of the neighbor protocol. An exception will be thrown if this name
215 * is not found
216 * @exception Exception Will be thrown when the new protocol cannot be created, or inserted.
217 */
218 public void insertProtocol(Protocol prot, int position, String neighbor_prot) throws Exception {
219 conf.insertProtocol(prot, position, neighbor_prot, this);
220 }
221
222
223
224
225
226 /**
227 * Removes a protocol from the stack. Stops the protocol and readjusts the linked lists of
228 * protocols.
229 * @param prot_name The name of the protocol. Since all protocol names in a stack have to be unique
230 * (otherwise the stack won't be created), the name refers to just 1 protocol.
231 * @exception Exception Thrown if the protocol cannot be stopped correctly.
232 */
233 public void removeProtocol(String prot_name) throws Exception {
234 conf.removeProtocol(prot_name);
235 }
236
237
238 /** Returns a given protocol or null if not found */
239 public Protocol findProtocol(String name) {
240 Protocol tmp=top_prot;
241 String prot_name;
242 while(tmp != null) {
243 prot_name=tmp.getName();
244 if(prot_name != null && prot_name.equals(name))
245 return tmp;
246 tmp=tmp.getDownProtocol();
247 }
248 return null;
249 }
250
251
252 public void destroy() {
253 if(top_prot != null) {
254 conf.stopProtocolStack(top_prot); // destroys msg queues and threads
255 top_prot=null;
256 }
257 }
258
259
260
261 /**
262 * Start all layers. The {@link Protocol#start()} method is called in each protocol,
263 * <em>from top to bottom</em>.
264 * Each layer can perform some initialization, e.g. create a multicast socket
265 */
266 public void startStack() throws Exception {
267 Object start_result=null;
268 if(stopped == false) return;
269
270 timer.start();
271
272 if(start_promise == null)
273 start_promise=new Promise();
274 else
275 start_promise.reset();
276
277 down(new Event(Event.START));
278 start_result=start_promise.getResult(0);
279 if(start_result != null && start_result instanceof Throwable) {
280 if(start_result instanceof Exception)
281 throw (Exception)start_result;
282 else
283 throw new Exception("ProtocolStack.start(): exception is " + start_result);
284 }
285
286 stopped=false;
287 }
288
289
290
291 public void startUpHandler() {
292 // DON'T REMOVE !!!! Avoids a superfluous thread
293 }
294
295 public void startDownHandler() {
296 // DON'T REMOVE !!!! Avoids a superfluous thread
297 }
298
299
300 /**
301 * Iterates through all the protocols <em>from top to bottom</em> and does the following:
302 * <ol>
303 * <li>Waits until all messages in the down queue have been flushed (ie., size is 0)
304 * <li>Calls stop() on the protocol
305 * </ol>
306 */
307 public void stopStack() {
308 if(timer != null) {
309 try {
310 timer.stop();
311 }
312 catch(Exception ex) {
313 }
314 }
315
316 if(stopped) return;
317
318 if(stop_promise == null)
319 stop_promise=new Promise();
320 else
321 stop_promise.reset();
322
323 down(new Event(Event.STOP));
324 stop_promise.getResult(5000);
325 stopped=true;
326 }
327
328 /**
329 * Not needed anymore, just left in here for backwards compatibility with JBoss AS
330 * @deprecated
331 */
332 public void flushEvents() {
333
334 }
335
336 public void stopInternal() {
337 // do nothing, DON'T REMOVE !!!!
338 }
339
340
341
342 /*--------------------------- Transport interface ------------------------------*/
343
344 public void send(Message msg) throws Exception {
345 down(new Event(Event.MSG, msg));
346 }
347
348 public Object receive(long timeout) throws Exception {
349 throw new Exception("ProtocolStack.receive(): not implemented !");
350 }
351 /*------------------------- End of Transport interface ---------------------------*/
352
353
354
355
356
357 /*--------------------------- Protocol functionality ------------------------------*/
358 public String getName() {return "ProtocolStack";}
359
360
361
362
363 public void up(Event evt) {
364 switch(evt.getType()) {
365 case Event.START_OK:
366 if(start_promise != null)
367 start_promise.setResult(evt.getArg());
368 return;
369 case Event.STOP_OK:
370 if(stop_promise != null)
371 stop_promise.setResult(evt.getArg());
372 return;
373 }
374
375 if(channel != null)
376 channel.up(evt);
377 }
378
379
380
381
382 public void down(Event evt) {
383 if(top_prot != null)
384 top_prot.receiveDownEvent(evt);
385 else
386 log.error("no down protocol available !");
387 }
388
389
390
391 protected void receiveUpEvent(Event evt) {
392 up(evt);
393 }
394
395
396
397 /** Override with null functionality: we don't need any threads to be started ! */
398 public void startWork() {}
399
400 /** Override with null functionality: we don't need any threads to be started ! */
401 public void stopWork() {}
402
403
404 /*----------------------- End of Protocol functionality ---------------------------*/
405
406
407
408
409 }