Docjar: A Java Source and Docuemnt Enginecom.*    java.*    javax.*    org.*    all    new    plug-in

Quick Search    Search Deep

Source code: org/jgroups/stack/ProtocolStack.java


1   // $Id: ProtocolStack.java,v 1.24 2005/09/29 12:10:03 belaban Exp $
2   
3   package org.jgroups.stack;
4   
5   import org.jgroups.*;
6   import org.jgroups.conf.ClassConfigurator;
7   import org.jgroups.util.Promise;
8   import org.jgroups.util.TimeScheduler;
9   
10  import java.util.*;
11  
12  
13  
14  
15  /**
16   * A ProtocolStack manages a number of protocols layered above each other. It creates all
17   * protocol classes, initializes them and, when ready, starts all of them, beginning with the
18   * bottom most protocol. It also dispatches messages received from the stack to registered
19   * objects (e.g. channel, GMP) and sends messages sent by those objects down the stack.<p>
20   * The ProtocolStack makes use of the Configurator to setup and initialize stacks, and to
21   * destroy them again when not needed anymore
22   * @author Bela Ban
23   */
24  public class ProtocolStack extends Protocol implements Transport {
25      private Protocol                top_prot=null;
26      private Protocol                bottom_prot=null;
27      private final Configurator      conf=new Configurator();
28      private String                  setup_string;
29      private JChannel                channel=null;
30      private boolean                 stopped=true;
31      public final  TimeScheduler     timer=new TimeScheduler(60000);
32      // final Promise                   ack_promise=new Promise();
33  
34      /** Used to sync on START/START_OK events for start()*/
35      Promise                         start_promise=null;
36  
37      /** used to sync on STOP/STOP_OK events for stop() */
38      Promise                         stop_promise=null;
39  
40      public static final int         ABOVE=1; // used by insertProtocol()
41      public static final int         BELOW=2; // used by insertProtocol()
42  
43  
44  
45      public ProtocolStack(JChannel channel, String setup_string) throws ChannelException {
46          this.setup_string=setup_string;
47          this.channel=channel;
48          ClassConfigurator.getInstance(true); // will create the singleton
49      }
50  
51      /** Only used by Simulator; don't use */
52      public ProtocolStack() {
53  
54      }
55  
56  
57      public Channel getChannel() {
58          return channel;
59      }
60  
61      /** Returns all protocols in a list, from top to bottom. <em>These are not copies of protocols,
62       so modifications will affect the actual instances !</em> */
63      public Vector getProtocols() {
64          Protocol p;
65          Vector   v=new Vector();
66  
67          p=top_prot;
68          while(p != null) {
69              v.addElement(p);
70              p=p.getDownProtocol();
71          }
72          return v;
73      }
74  
75      /**
76       *
77       * @return Map<String,Map<key,val>>
78       */
79      public Map dumpStats() {
80          Protocol p;
81          Map retval=new HashMap(), tmp;
82          String prot_name;
83  
84          p=top_prot;
85          while(p != null) {
86              prot_name=p.getName();
87              tmp=p.dumpStats();
88              if(prot_name != null && tmp != null)
89                  retval.put(prot_name, tmp);
90              p=p.getDownProtocol();
91          }
92          return retval;
93      }
94  
95      public String dumpTimerQueue() {
96          return timer.dumpTaskQueue();
97      }
98  
99      /**
100      * Prints the names of the protocols, from the bottom to top. If include_properties is true,
101      * the properties for each protocol will also be printed.
102      */
103     public String printProtocolSpec(boolean include_properties) {
104         StringBuffer sb=new StringBuffer();
105         Protocol     prot=top_prot;
106         Properties   tmpProps;
107         String       name;
108         Map.Entry    entry;
109 
110         while(prot != null) {
111             name=prot.getName();
112             if(name != null) {
113                 if("ProtocolStack".equals(name))
114                     break;
115                 sb.append(name);
116                 if(include_properties) {
117                     tmpProps=prot.getProperties();
118                     if(tmpProps != null) {
119                         sb.append('\n');
120                         for(Iterator it=tmpProps.entrySet().iterator(); it.hasNext();) {
121                             entry=(Map.Entry)it.next();
122                             sb.append(entry + "\n");
123                         }
124                     }
125                 }
126                 sb.append('\n');
127 
128                 prot=prot.getDownProtocol();
129             }
130         }
131 
132         return sb.toString();
133     }
134 
135     public String printProtocolSpecAsXML() {
136         StringBuffer sb=new StringBuffer();
137         Protocol     prot=bottom_prot;
138         Properties   tmpProps;
139         String       name;
140         Map.Entry    entry;
141         int len, max_len=30;
142 
143         sb.append("<config>\n");
144         while(prot != null) {
145             name=prot.getName();
146             if(name != null) {
147                 if("ProtocolStack".equals(name))
148                     break;
149                 sb.append("  <").append(name).append(" ");
150                 tmpProps=prot.getProperties();
151                 if(tmpProps != null) {
152                     len=name.length();
153                     String s;
154                     for(Iterator it=tmpProps.entrySet().iterator(); it.hasNext();) {
155                         entry=(Map.Entry)it.next();
156                         s=entry.getKey() + "=\"" + entry.getValue() + "\" ";
157                         if(len + s.length() > max_len) {
158                             sb.append("\n       ");
159                             len=8;
160                         }
161                         sb.append(s);
162                         len+=s.length();
163                     }
164                 }
165                 sb.append("/>\n");
166                 prot=prot.getUpProtocol();
167             }
168         }
169         sb.append("</config>");
170 
171         return sb.toString();
172     }
173 
174 
175     public void setup() throws Exception {
176         if(top_prot == null) {
177             top_prot=conf.setupProtocolStack(setup_string, this); // calls init() on each protocol
178             if(top_prot == null)
179                 throw new Exception("ProtocolStack.setup(): couldn't create protocol stack");
180             top_prot.setUpProtocol(this);
181             bottom_prot=conf.getBottommostProtocol(top_prot);
182             conf.startProtocolStack(bottom_prot);        // sets up queues and threads
183         }
184     }
185 
186 
187 
188 
189     /**
190      * Creates a new protocol given the protocol specification.
191      * @param prot_spec The specification of the protocol. Same convention as for specifying a protocol stack.
192      *                  An exception will be thrown if the class cannot be created. Example:
193      *                  <pre>"VERIFY_SUSPECT(timeout=1500)"</pre> Note that no colons (:) have to be
194      *                  specified
195      * @return Protocol The newly created protocol
196      * @exception Exception Will be thrown when the new protocol cannot be created
197      */
198     public Protocol createProtocol(String prot_spec) throws Exception {
199         return conf.createProtocol(prot_spec, this);
200     }
201 
202 
203 
204 
205 
206 
207     /**
208      * Inserts an already created (and initialized) protocol into the protocol list. Sets the links
209      * to the protocols above and below correctly and adjusts the linked list of protocols accordingly.
210      * Note that this method may change the value of top_prot or bottom_prot.
211      * @param prot The protocol to be inserted. Before insertion, a sanity check will ensure that none
212      *             of the existing protocols have the same name as the new protocol.
213      * @param position Where to place the protocol with respect to the neighbor_prot (ABOVE, BELOW)
214      * @param neighbor_prot The name of the neighbor protocol. An exception will be thrown if this name
215      *                      is not found
216      * @exception Exception Will be thrown when the new protocol cannot be created, or inserted.
217      */
218     public void insertProtocol(Protocol prot, int position, String neighbor_prot) throws Exception {
219         conf.insertProtocol(prot, position, neighbor_prot, this);
220     }
221 
222 
223 
224 
225 
226     /**
227      * Removes a protocol from the stack. Stops the protocol and readjusts the linked lists of
228      * protocols.
229      * @param prot_name The name of the protocol. Since all protocol names in a stack have to be unique
230      *                  (otherwise the stack won't be created), the name refers to just 1 protocol.
231      * @exception Exception Thrown if the protocol cannot be stopped correctly.
232      */
233     public void removeProtocol(String prot_name) throws Exception {
234         conf.removeProtocol(prot_name);
235     }
236 
237 
238     /** Returns a given protocol or null if not found */
239     public Protocol findProtocol(String name) {
240         Protocol tmp=top_prot;
241         String   prot_name;
242         while(tmp != null) {
243             prot_name=tmp.getName();
244             if(prot_name != null && prot_name.equals(name))
245                 return tmp;
246             tmp=tmp.getDownProtocol();
247         }
248         return null;
249     }
250 
251 
252     public void destroy() {
253         if(top_prot != null) {
254             conf.stopProtocolStack(top_prot);           // destroys msg queues and threads
255             top_prot=null;
256         }
257     }
258 
259 
260 
261     /**
262      * Start all layers. The {@link Protocol#start()} method is called in each protocol,
263      * <em>from top to bottom</em>.
264      * Each layer can perform some initialization, e.g. create a multicast socket
265      */
266     public void startStack() throws Exception {
267         Object start_result=null;
268         if(stopped == false) return;
269 
270         timer.start();
271 
272         if(start_promise == null)
273             start_promise=new Promise();
274         else
275             start_promise.reset();
276 
277         down(new Event(Event.START));
278         start_result=start_promise.getResult(0);
279         if(start_result != null && start_result instanceof Throwable) {
280             if(start_result instanceof Exception)
281                 throw (Exception)start_result;
282             else
283                 throw new Exception("ProtocolStack.start(): exception is " + start_result);
284         }
285 
286         stopped=false;
287     }
288 
289 
290 
291     public void startUpHandler() {
292         // DON'T REMOVE !!!!  Avoids a superfluous thread
293     }
294 
295     public void startDownHandler() {
296         // DON'T REMOVE !!!!  Avoids a superfluous thread
297     }
298 
299 
300     /**
301      * Iterates through all the protocols <em>from top to bottom</em> and does the following:
302      * <ol>
303      * <li>Waits until all messages in the down queue have been flushed (ie., size is 0)
304      * <li>Calls stop() on the protocol
305      * </ol>
306      */
307     public void stopStack() {
308         if(timer != null) {
309             try {
310                 timer.stop();
311             }
312             catch(Exception ex) {
313             }
314         }
315 
316         if(stopped) return;
317 
318         if(stop_promise == null)
319             stop_promise=new Promise();
320         else
321             stop_promise.reset();
322 
323         down(new Event(Event.STOP));
324         stop_promise.getResult(5000);
325         stopped=true;
326     }
327 
328     /**
329      * Not needed anymore, just left in here for backwards compatibility with JBoss AS
330      * @deprecated
331      */
332     public void flushEvents() {
333 
334     }
335 
336     public void stopInternal() {
337         // do nothing, DON'T REMOVE !!!!
338     }
339 
340 
341 
342     /*--------------------------- Transport interface ------------------------------*/
343 
344     public void send(Message msg) throws Exception {
345         down(new Event(Event.MSG, msg));
346     }
347 
348     public Object receive(long timeout) throws Exception {
349         throw new Exception("ProtocolStack.receive(): not implemented !");
350     }
351     /*------------------------- End of  Transport interface ---------------------------*/
352 
353 
354 
355 
356 
357     /*--------------------------- Protocol functionality ------------------------------*/
358     public String getName()  {return "ProtocolStack";}
359 
360 
361 
362 
363     public void up(Event evt) {
364         switch(evt.getType()) {
365             case Event.START_OK:
366                 if(start_promise != null)
367                     start_promise.setResult(evt.getArg());
368                 return;
369             case Event.STOP_OK:
370                 if(stop_promise != null)
371                     stop_promise.setResult(evt.getArg());
372                 return;
373         }
374 
375         if(channel != null)
376             channel.up(evt);
377     }
378 
379 
380 
381 
382     public void down(Event evt) {
383         if(top_prot != null)
384             top_prot.receiveDownEvent(evt);
385         else
386             log.error("no down protocol available !");
387     }
388 
389 
390 
391     protected void receiveUpEvent(Event evt) {
392         up(evt);
393     }
394 
395 
396 
397     /** Override with null functionality: we don't need any threads to be started ! */
398     public void startWork() {}
399 
400     /** Override with null functionality: we don't need any threads to be started ! */
401     public void stopWork()  {}
402 
403 
404     /*----------------------- End of Protocol functionality ---------------------------*/
405 
406 
407 
408 
409 }