Docjar: A Java Source and Docuemnt Enginecom.*    java.*    javax.*    org.*    all    new    plug-in

Quick Search    Search Deep

Source code: com/jabberwookie/Stream.java


1   /*
2    * Stream.java
3    *
4    * Created on April 21, 2003, 11:00 PM
5    * Copyright (c) 2003, Sean M. Meiners, sean@jabberwookie.com
6    * All rights reserved.
7    * 
8    * Redistribution and use in source and binary forms, with or without
9    * modification, are permitted provided that the following conditions are met:
10   * 
11   *     * Redistributions of source code must retain the above copyright notice,
12   *       this list of conditions and the following disclaimer.
13   *     * Redistributions in binary form must reproduce the above copyright notice,
14   *       this list of conditions and the following disclaimer in the documentation
15   *       and/or other materials provided with the distribution.
16   *     * Neither the name of JabberWookie nor the names of its contributors may be used
17   *       to endorse or promote products derived from this software without specific
18   *       prior written permission.
19   * 
20   * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
21   * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
22   * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
23   * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
24   * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
25   * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
26   * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
27   * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
28   * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
29   * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
30   */
31  
32  package com.jabberwookie;
33  
34  import java.io.BufferedWriter;
35  import java.io.IOException;
36  import java.io.EOFException;
37  import java.io.InputStream;
38  import java.io.OutputStream;
39  import java.io.DataInputStream;
40  import java.io.OutputStreamWriter;
41  import java.io.Writer;
42  
43  //import java.util.Random;
44  import java.util.Stack;
45  import java.util.Vector;
46  import java.util.Hashtable;
47  
48  import com.jabberwookie.ns.jabber.Const;
49  import com.jabberwookie.ns.jabber.Chunk;
50  import com.jabberwookie.ns.jabber.IQ;
51  import com.jabberwookie.ns.jabber.Message;
52  import com.jabberwookie.ns.jabber.Presence;
53  
54  import com.ssttr.xml.SAXParser;
55  import com.ssttr.xml.SAXInterface;
56  import com.ssttr.xml.XMLElement;
57  import com.ssttr.xml.ParserException;
58  
59  import com.ssttr.util.processor.Processor;
60  
61  /**
62   * This implements the most basic functions required to establish and
63   * maintain a Jabber stream.  It must be extended to be truely useful.
64   * Currently this is done by the {@link Client2Server} and 
65   * {@link Component2Server} classes.
66   * @author  smeiners
67   */
68  public abstract class Stream implements SAXInterface
69  {
70  //    protected static final boolean    DEBUG           = true;
71      protected static final  boolean     DEBUG           = false;
72      
73      /** How long to wait before giving up on trying to process
74       * a chunk. (but only if you're using a Processor) */
75      protected static final  int         PROCESSOR_WAIT_TIME = 10000;
76      
77      /* these are only used in makeRandomID, which isn't used.
78      protected static final  Random      random          = new Random();
79      protected static final  byte        _a_             = (byte)'a';
80      protected static final  byte        _A_             = (byte)'A';
81      protected static final  byte        _0_             = (byte)'0';
82      */
83      
84      protected DataInputStream           in              = null;
85      protected OutputStream              out             = null;
86      protected Writer                    outWriter       = null;
87      
88      protected ParserThread              parserT         = null;
89      
90      protected Processor                 processor       = null;
91      protected Vector                    chunkQueue      = null;
92      protected ChunkProcessor            chunkProcessor  = null;
93      
94      protected MessageListener           mListener       = null;
95      protected PresenceListener          pListener       = null;
96      protected IQListener                iqListener      = null;
97      protected UnrecognizedChunkListener unListener      = null;
98      
99      protected boolean                   connected       = false;
100     protected String                    connectionId    = null;
101     
102     protected Namespaces                ns              = null;
103     
104     protected Hashtable                 waitingIds      = new Hashtable(11);
105     
106     private   XMLElement                el              = null;
107     private   Chunk                     chunk           = null;
108     private   Stack                     elStack         = new Stack();
109     
110     /** Creates a new instance of Stream.  If you use this constuctor
111      * it will block on any calls to your *Listeners. */
112     public Stream (InputStream in, OutputStream out)
113     {
114         this(in,out,null);
115     }
116     
117     /** Creates a new instance of Stream.  If you use this constructor
118      * and pass in a valid Processor it will call your *Listeners as
119      * fast as the Processor allows (when there is data available, of
120      * course). */
121     public Stream (InputStream in, OutputStream out, Processor processor)
122     {
123         if( in instanceof DataInputStream )
124             this.in = (DataInputStream)in;
125         else
126             this.in = new DataInputStream(in);
127         
128         this.out = out;
129         outWriter = new BufferedWriter(new OutputStreamWriter(out));
130         
131         parserT = new ParserThread();
132         
133         setProcessor(processor);
134     }
135     
136     /**
137      * Sets the internal Processor used when calling your *Listeners.
138      * If you're going to take a while to process chunks it's highly
139      * recomended that you not only use a Processor, but that you
140      * use one with either a large queue, a lot of threads, or both.
141      * This is because if your processor refuses to accept a chunk
142      * for more than 10 seconds it will give up and drop it on the
143      * floor.  No, this isn't the best behavior, but it is easily
144      * avoidable, just be careful.
145      * @param processor
146      */
147     public void setProcessor(Processor processor)
148     {
149         this.processor = processor;
150         
151         if( processor == null )
152         {
153             chunkQueue = null;
154             chunkProcessor = null;
155         }
156         else
157         {
158             chunkQueue = new Vector(processor.getMaxTasks());
159             chunkProcessor = new ChunkProcessor();
160         }
161     }
162 
163     /**
164      * Sets the namespaces used when interpreting incoming data.
165      * If you don't know what the namespaces are I would suggest
166      * reading some of the Jabber protocol documentation.
167      * @param namespaces
168      */    
169     public void setNamespaces(Namespaces namespaces)
170     { ns = namespaces; }
171     
172     /**
173      * Opens the stream, needs to be impelmented by the subclass.
174      * @param serverName
175      * @param timeoutSecs
176      * @return
177      * @throws IOException
178      */
179     public abstract boolean open(String serverName, int timeoutSecs)
180         throws IOException;
181     
182     /**
183      * Closes the stream, needs to be impelmented by the subclass.
184      */
185     public abstract void close();
186     
187     /**
188      * Sends the chunk to the other end and returns immediatly.
189      * @param chunk
190      * @throws IOException
191      */
192     public void send(Chunk chunk)
193     throws IOException
194     { send(chunk, -1); }
195     
196     /**
197      * Sends the chunk to the other end and waits for a reply.
198      * 
199      * @param chunk
200      * @param timeoutMillis
201      * @return
202      * @throws IOException
203      */
204     public Chunk send(Chunk chunk, int timeoutMillis)
205     throws IOException
206     {
207         if( ! connected )
208             throw new IOException("Not connected");
209             
210         if( chunk == null )
211             return null;
212         
213         if( DEBUG )
214             System.out.println("SEND: " + chunk);
215         
216         String id = null;
217         if( timeoutMillis > 0 )
218         {
219             id = chunk.getId();
220 /*
221             if( id == null )
222                 id = makeRandomID();
223 */
224             waitingIds.put( id, "");
225         }
226         
227         chunk.writeTo(outWriter);
228         outWriter.flush();
229         
230         if( timeoutMillis > 0 )
231             return waitForId(id, timeoutMillis);
232         else
233             return null;
234     }
235     
236     /**
237      * Blocks until the given message id is received. 
238      * @param id
239      * @param timeoutMillis
240      * @return
241      */
242     synchronized private Chunk waitForId(String id, int timeoutMillis)
243     {
244         long stopTime = System.currentTimeMillis() + timeoutMillis;
245         Object o;
246         
247         for( ; ; )
248         {
249             if( waitingIds.containsKey(id) )
250             {
251                 o = waitingIds.get(id);
252                 if( o instanceof Chunk )
253                 {
254                     waitingIds.remove(id);
255                     return (Chunk)o;
256                 }
257             }
258             
259             if( System.currentTimeMillis() > stopTime )
260                 return null;
261             
262             try
263             { wait (50); }
264             catch( InterruptedException x )
265             { }
266         }
267     }
268     
269     public boolean isConnected()
270     { return connected; }
271     
272     public String getConnectionId()
273     { return connectionId; }
274     
275     public void setMessageListener(MessageListener listener)
276     { mListener = listener; }
277     
278     public void setPresenceListener(PresenceListener listener)
279     { pListener = listener; }
280     
281     public void setIQListener(IQListener listener)
282     { iqListener = listener; }
283     
284     public void setUnrecogizedChunkListener(UnrecognizedChunkListener listener)
285     { unListener = listener; }
286     
287     /** Does NOT set the UnrecogizedChunkListener.
288      * This will only work it the passed listener implements
289      * MessageListener, PresenceListener, and IQListener.  If not
290      * it will throw a ClassCastException.
291      */
292     public void setAllListeners(Object listener)
293     {
294         mListener = (MessageListener)listener;
295         pListener = (PresenceListener)listener;
296         iqListener = (IQListener)listener;
297     }
298     
299     /**
300      * Signals waitForID that a new message came in and it should
301      * check it.
302      * @param id
303      * @param chunk
304      */
305     synchronized private void gotId(String id, Chunk chunk)
306     {
307         waitingIds.put(id,chunk);
308         notifyAll();
309     }
310     
311     /**
312      * Receives data from the XML parser and maps it to the
313      * appropriate namespace when possible.
314      * @author smeiners
315      */
316     private class ParserThread
317     extends Thread
318     {
319         private   SAXParser parser;
320             
321         ParserThread()
322         {
323             parser = new SAXParser(Stream.this);
324             start();
325         }
326         
327         public void run()
328         {
329             try
330             { parser.parse (in); }
331             
332             catch( ParserException x )
333             { connected = false; x.printStackTrace(); }
334             
335             catch( EOFException x )
336             { connected = false; }
337             
338             catch( IOException x )
339             { connected = false; }
340         }
341     }
342         
343     public void cData (String data)
344     {
345         if( DEBUG )
346             System.out.println("cData: '"+data+"'");
347 
348         el = (XMLElement)elStack.peek();
349       el.appendValue(data);
350     }
351 
352     public void chunkStart (String tag, Hashtable attrs)
353     {
354         if( DEBUG )
355             System.out.println("chunkStart "+tag);
356 
357         if( tag.equals(Const.MESSAGE) )
358         {
359             el = new Message();
360             el.setAttributes(attrs);
361         }
362         else if( tag.equals(Const.PRESENCE) )
363         {
364             el = new Presence();
365             el.setAttributes(attrs);
366         }
367         else if( tag.equals(Const.IQ) )
368         {
369             el = new IQ();
370             el.setAttributes(attrs);
371         }
372         else
373         {
374             el = new Chunk(tag);
375             el.setAttributes(attrs);
376         }
377 
378         elStack.push(el);
379     }
380 
381     public void docStart (String tag, Hashtable attrs)
382     {
383         if( DEBUG )
384             System.out.println("docStart " + tag + " = " + attrs);
385 
386         if( tag.equals("stream:stream") )
387         {
388             connected = true;
389             connectionId = (String)attrs.get(Const.ID);
390         }
391     }
392 
393     public void elementStart (String tag, Hashtable attrs)
394     {
395         if( DEBUG )
396             System.out.println("elementStart: '"+tag+"' "+attrs);
397 
398         el = ns.getElement( (String)attrs.get("xmlns"), tag );
399         if( el == null )
400             el = ((XMLElement)elStack.peek()).addChild(tag,attrs);
401         else
402         {
403             el.setAttributes(attrs);
404             ((XMLElement)elStack.peek()).addChild(el);
405         }
406         elStack.push(el);
407     }
408 
409     public void elementStop (String tag)
410     {
411         if( DEBUG )
412             System.out.println("elementStop: '"+tag+"'");
413 
414         elStack.pop();
415     }
416 
417     public void chunkStop (String tag)
418     {
419         chunk = (Chunk)elStack.pop();
420 
421         if( DEBUG )
422             System.out.println("RECV: "+chunk);
423 
424         if( waitingIds.size() > 0 )
425         {
426             String id = chunk.getId();
427             if( id != null && waitingIds.containsKey(id) )
428             {
429                 gotId(id, chunk);
430                 return;
431             }
432         }
433 
434         if( processor == null )
435             processChunk(chunk);
436         else
437         {
438             // keep attempting to process the chunk until it succeeds
439             // or we run out of time.
440             chunkQueue.addElement(chunk);
441             if( ! processor.process(chunkProcessor) )
442             {
443                 long stopTime = System.currentTimeMillis() + PROCESSOR_WAIT_TIME;
444                 boolean processed = false;
445                 
446                 while( ! processed && System.currentTimeMillis() < stopTime )
447                 {
448                     try
449                     { Thread.sleep(5); }
450                     catch( InterruptedException x )
451                     { }
452                     processed = processor.process(chunkProcessor);
453                 }
454                 
455                 if( ! processed )
456                 {
457                     System.err.println("!!SEVERE!! "+this.getClass().getName()+" was not able to process a chunk within "+PROCESSOR_WAIT_TIME+" milliseconds, the chunk will be dropped.\n" +
458                                        "  Chunk: '"+chunk+"'");
459                 }
460             }
461         }
462     }
463     
464     public void processChunk(Chunk chunk)
465     {
466         switch (chunk.getTypeInt())
467         {
468             case Chunk.Type.MESSAGE:
469                 if( mListener != null )
470                     mListener.incomingMessage((Message)chunk);
471                 break;
472             case Chunk.Type.PRESENCE:
473                 if( pListener != null )
474                     pListener.incomingPresence((Presence)chunk);
475                 break;
476             case Chunk.Type.IQ:
477                 if( iqListener != null )
478                     iqListener.incomingIQ((IQ)chunk);
479                 break;
480             default:
481             {
482                 if( chunk.getName().equals("stream:error") )
483                 {
484                     close();
485                     break;
486                 }
487                 if( unListener != null )
488                     unListener.incomingChunk(chunk);
489                 else
490                     System.err.println(this.getClass().getName()+": Unrecognized chunk received: '"+chunk+"'");
491             }
492         }
493     }
494 
495     public void docStop (String tag)
496     {
497         if( DEBUG )
498             System.out.println("docStop "+tag);
499 
500         connected = false;
501     }
502 
503     public void processingInstruction (String element)
504     {
505         if( DEBUG )
506             System.out.println("pI: '"+element+"'");
507     }
508 
509     public void dtdData (String dtd)
510     {
511         if( DEBUG )
512             System.out.println("dtdData: '"+dtd+"'");
513     }
514 
515     private class ChunkProcessor
516     implements Runnable
517     {
518         public synchronized void run()
519         {
520             Chunk chunk = null;
521             
522             chunk = (Chunk)chunkQueue.elementAt(0);
523             chunkQueue.removeElementAt(0);
524             processChunk(chunk);
525         }
526     }
527 /*    
528     private String makeRandomID()
529     {
530         StringBuffer id = new StringBuffer();
531         int c = 0;
532         int r = 0;
533         
534         for( int i = 20; i > 0; i -- )
535         {
536             c = random.nextInt();
537             c %= 26 * 2 + 10;
538             r = c / 26;
539             c %= 26;
540             
541             switch( r )
542             {
543                 case 0:
544                     id.append( (byte)(c + _a_) );
545                     break;
546                 case 1:
547                     id.append( (byte)(c + _A_) );
548                     break;
549                 default:
550                     id.append( (byte)(c + _0_) );
551                     break;
552             }
553         }
554         
555         return id.toString();
556     }
557 */
558 }