Source code: com/jabberwookie/Stream.java
1 /*
2 * Stream.java
3 *
4 * Created on April 21, 2003, 11:00 PM
5 * Copyright (c) 2003, Sean M. Meiners, sean@jabberwookie.com
6 * All rights reserved.
7 *
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions are met:
10 *
11 * * Redistributions of source code must retain the above copyright notice,
12 * this list of conditions and the following disclaimer.
13 * * Redistributions in binary form must reproduce the above copyright notice,
14 * this list of conditions and the following disclaimer in the documentation
15 * and/or other materials provided with the distribution.
16 * * Neither the name of JabberWookie nor the names of its contributors may be used
17 * to endorse or promote products derived from this software without specific
18 * prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
21 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
22 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
23 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
24 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
25 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
26 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
27 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
28 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
29 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
30 */
31
32 package com.jabberwookie;
33
34 import java.io.BufferedWriter;
35 import java.io.IOException;
36 import java.io.EOFException;
37 import java.io.InputStream;
38 import java.io.OutputStream;
39 import java.io.DataInputStream;
40 import java.io.OutputStreamWriter;
41 import java.io.Writer;
42
43 //import java.util.Random;
44 import java.util.Stack;
45 import java.util.Vector;
46 import java.util.Hashtable;
47
48 import com.jabberwookie.ns.jabber.Const;
49 import com.jabberwookie.ns.jabber.Chunk;
50 import com.jabberwookie.ns.jabber.IQ;
51 import com.jabberwookie.ns.jabber.Message;
52 import com.jabberwookie.ns.jabber.Presence;
53
54 import com.ssttr.xml.SAXParser;
55 import com.ssttr.xml.SAXInterface;
56 import com.ssttr.xml.XMLElement;
57 import com.ssttr.xml.ParserException;
58
59 import com.ssttr.util.processor.Processor;
60
61 /**
62 * This implements the most basic functions required to establish and
63 * maintain a Jabber stream. It must be extended to be truely useful.
64 * Currently this is done by the {@link Client2Server} and
65 * {@link Component2Server} classes.
66 * @author smeiners
67 */
68 public abstract class Stream implements SAXInterface
69 {
70 // protected static final boolean DEBUG = true;
71 protected static final boolean DEBUG = false;
72
73 /** How long to wait before giving up on trying to process
74 * a chunk. (but only if you're using a Processor) */
75 protected static final int PROCESSOR_WAIT_TIME = 10000;
76
77 /* these are only used in makeRandomID, which isn't used.
78 protected static final Random random = new Random();
79 protected static final byte _a_ = (byte)'a';
80 protected static final byte _A_ = (byte)'A';
81 protected static final byte _0_ = (byte)'0';
82 */
83
84 protected DataInputStream in = null;
85 protected OutputStream out = null;
86 protected Writer outWriter = null;
87
88 protected ParserThread parserT = null;
89
90 protected Processor processor = null;
91 protected Vector chunkQueue = null;
92 protected ChunkProcessor chunkProcessor = null;
93
94 protected MessageListener mListener = null;
95 protected PresenceListener pListener = null;
96 protected IQListener iqListener = null;
97 protected UnrecognizedChunkListener unListener = null;
98
99 protected boolean connected = false;
100 protected String connectionId = null;
101
102 protected Namespaces ns = null;
103
104 protected Hashtable waitingIds = new Hashtable(11);
105
106 private XMLElement el = null;
107 private Chunk chunk = null;
108 private Stack elStack = new Stack();
109
110 /** Creates a new instance of Stream. If you use this constuctor
111 * it will block on any calls to your *Listeners. */
112 public Stream (InputStream in, OutputStream out)
113 {
114 this(in,out,null);
115 }
116
117 /** Creates a new instance of Stream. If you use this constructor
118 * and pass in a valid Processor it will call your *Listeners as
119 * fast as the Processor allows (when there is data available, of
120 * course). */
121 public Stream (InputStream in, OutputStream out, Processor processor)
122 {
123 if( in instanceof DataInputStream )
124 this.in = (DataInputStream)in;
125 else
126 this.in = new DataInputStream(in);
127
128 this.out = out;
129 outWriter = new BufferedWriter(new OutputStreamWriter(out));
130
131 parserT = new ParserThread();
132
133 setProcessor(processor);
134 }
135
136 /**
137 * Sets the internal Processor used when calling your *Listeners.
138 * If you're going to take a while to process chunks it's highly
139 * recomended that you not only use a Processor, but that you
140 * use one with either a large queue, a lot of threads, or both.
141 * This is because if your processor refuses to accept a chunk
142 * for more than 10 seconds it will give up and drop it on the
143 * floor. No, this isn't the best behavior, but it is easily
144 * avoidable, just be careful.
145 * @param processor
146 */
147 public void setProcessor(Processor processor)
148 {
149 this.processor = processor;
150
151 if( processor == null )
152 {
153 chunkQueue = null;
154 chunkProcessor = null;
155 }
156 else
157 {
158 chunkQueue = new Vector(processor.getMaxTasks());
159 chunkProcessor = new ChunkProcessor();
160 }
161 }
162
163 /**
164 * Sets the namespaces used when interpreting incoming data.
165 * If you don't know what the namespaces are I would suggest
166 * reading some of the Jabber protocol documentation.
167 * @param namespaces
168 */
169 public void setNamespaces(Namespaces namespaces)
170 { ns = namespaces; }
171
172 /**
173 * Opens the stream, needs to be impelmented by the subclass.
174 * @param serverName
175 * @param timeoutSecs
176 * @return
177 * @throws IOException
178 */
179 public abstract boolean open(String serverName, int timeoutSecs)
180 throws IOException;
181
182 /**
183 * Closes the stream, needs to be impelmented by the subclass.
184 */
185 public abstract void close();
186
187 /**
188 * Sends the chunk to the other end and returns immediatly.
189 * @param chunk
190 * @throws IOException
191 */
192 public void send(Chunk chunk)
193 throws IOException
194 { send(chunk, -1); }
195
196 /**
197 * Sends the chunk to the other end and waits for a reply.
198 *
199 * @param chunk
200 * @param timeoutMillis
201 * @return
202 * @throws IOException
203 */
204 public Chunk send(Chunk chunk, int timeoutMillis)
205 throws IOException
206 {
207 if( ! connected )
208 throw new IOException("Not connected");
209
210 if( chunk == null )
211 return null;
212
213 if( DEBUG )
214 System.out.println("SEND: " + chunk);
215
216 String id = null;
217 if( timeoutMillis > 0 )
218 {
219 id = chunk.getId();
220 /*
221 if( id == null )
222 id = makeRandomID();
223 */
224 waitingIds.put( id, "");
225 }
226
227 chunk.writeTo(outWriter);
228 outWriter.flush();
229
230 if( timeoutMillis > 0 )
231 return waitForId(id, timeoutMillis);
232 else
233 return null;
234 }
235
236 /**
237 * Blocks until the given message id is received.
238 * @param id
239 * @param timeoutMillis
240 * @return
241 */
242 synchronized private Chunk waitForId(String id, int timeoutMillis)
243 {
244 long stopTime = System.currentTimeMillis() + timeoutMillis;
245 Object o;
246
247 for( ; ; )
248 {
249 if( waitingIds.containsKey(id) )
250 {
251 o = waitingIds.get(id);
252 if( o instanceof Chunk )
253 {
254 waitingIds.remove(id);
255 return (Chunk)o;
256 }
257 }
258
259 if( System.currentTimeMillis() > stopTime )
260 return null;
261
262 try
263 { wait (50); }
264 catch( InterruptedException x )
265 { }
266 }
267 }
268
269 public boolean isConnected()
270 { return connected; }
271
272 public String getConnectionId()
273 { return connectionId; }
274
275 public void setMessageListener(MessageListener listener)
276 { mListener = listener; }
277
278 public void setPresenceListener(PresenceListener listener)
279 { pListener = listener; }
280
281 public void setIQListener(IQListener listener)
282 { iqListener = listener; }
283
284 public void setUnrecogizedChunkListener(UnrecognizedChunkListener listener)
285 { unListener = listener; }
286
287 /** Does NOT set the UnrecogizedChunkListener.
288 * This will only work it the passed listener implements
289 * MessageListener, PresenceListener, and IQListener. If not
290 * it will throw a ClassCastException.
291 */
292 public void setAllListeners(Object listener)
293 {
294 mListener = (MessageListener)listener;
295 pListener = (PresenceListener)listener;
296 iqListener = (IQListener)listener;
297 }
298
299 /**
300 * Signals waitForID that a new message came in and it should
301 * check it.
302 * @param id
303 * @param chunk
304 */
305 synchronized private void gotId(String id, Chunk chunk)
306 {
307 waitingIds.put(id,chunk);
308 notifyAll();
309 }
310
311 /**
312 * Receives data from the XML parser and maps it to the
313 * appropriate namespace when possible.
314 * @author smeiners
315 */
316 private class ParserThread
317 extends Thread
318 {
319 private SAXParser parser;
320
321 ParserThread()
322 {
323 parser = new SAXParser(Stream.this);
324 start();
325 }
326
327 public void run()
328 {
329 try
330 { parser.parse (in); }
331
332 catch( ParserException x )
333 { connected = false; x.printStackTrace(); }
334
335 catch( EOFException x )
336 { connected = false; }
337
338 catch( IOException x )
339 { connected = false; }
340 }
341 }
342
343 public void cData (String data)
344 {
345 if( DEBUG )
346 System.out.println("cData: '"+data+"'");
347
348 el = (XMLElement)elStack.peek();
349 el.appendValue(data);
350 }
351
352 public void chunkStart (String tag, Hashtable attrs)
353 {
354 if( DEBUG )
355 System.out.println("chunkStart "+tag);
356
357 if( tag.equals(Const.MESSAGE) )
358 {
359 el = new Message();
360 el.setAttributes(attrs);
361 }
362 else if( tag.equals(Const.PRESENCE) )
363 {
364 el = new Presence();
365 el.setAttributes(attrs);
366 }
367 else if( tag.equals(Const.IQ) )
368 {
369 el = new IQ();
370 el.setAttributes(attrs);
371 }
372 else
373 {
374 el = new Chunk(tag);
375 el.setAttributes(attrs);
376 }
377
378 elStack.push(el);
379 }
380
381 public void docStart (String tag, Hashtable attrs)
382 {
383 if( DEBUG )
384 System.out.println("docStart " + tag + " = " + attrs);
385
386 if( tag.equals("stream:stream") )
387 {
388 connected = true;
389 connectionId = (String)attrs.get(Const.ID);
390 }
391 }
392
393 public void elementStart (String tag, Hashtable attrs)
394 {
395 if( DEBUG )
396 System.out.println("elementStart: '"+tag+"' "+attrs);
397
398 el = ns.getElement( (String)attrs.get("xmlns"), tag );
399 if( el == null )
400 el = ((XMLElement)elStack.peek()).addChild(tag,attrs);
401 else
402 {
403 el.setAttributes(attrs);
404 ((XMLElement)elStack.peek()).addChild(el);
405 }
406 elStack.push(el);
407 }
408
409 public void elementStop (String tag)
410 {
411 if( DEBUG )
412 System.out.println("elementStop: '"+tag+"'");
413
414 elStack.pop();
415 }
416
417 public void chunkStop (String tag)
418 {
419 chunk = (Chunk)elStack.pop();
420
421 if( DEBUG )
422 System.out.println("RECV: "+chunk);
423
424 if( waitingIds.size() > 0 )
425 {
426 String id = chunk.getId();
427 if( id != null && waitingIds.containsKey(id) )
428 {
429 gotId(id, chunk);
430 return;
431 }
432 }
433
434 if( processor == null )
435 processChunk(chunk);
436 else
437 {
438 // keep attempting to process the chunk until it succeeds
439 // or we run out of time.
440 chunkQueue.addElement(chunk);
441 if( ! processor.process(chunkProcessor) )
442 {
443 long stopTime = System.currentTimeMillis() + PROCESSOR_WAIT_TIME;
444 boolean processed = false;
445
446 while( ! processed && System.currentTimeMillis() < stopTime )
447 {
448 try
449 { Thread.sleep(5); }
450 catch( InterruptedException x )
451 { }
452 processed = processor.process(chunkProcessor);
453 }
454
455 if( ! processed )
456 {
457 System.err.println("!!SEVERE!! "+this.getClass().getName()+" was not able to process a chunk within "+PROCESSOR_WAIT_TIME+" milliseconds, the chunk will be dropped.\n" +
458 " Chunk: '"+chunk+"'");
459 }
460 }
461 }
462 }
463
464 public void processChunk(Chunk chunk)
465 {
466 switch (chunk.getTypeInt())
467 {
468 case Chunk.Type.MESSAGE:
469 if( mListener != null )
470 mListener.incomingMessage((Message)chunk);
471 break;
472 case Chunk.Type.PRESENCE:
473 if( pListener != null )
474 pListener.incomingPresence((Presence)chunk);
475 break;
476 case Chunk.Type.IQ:
477 if( iqListener != null )
478 iqListener.incomingIQ((IQ)chunk);
479 break;
480 default:
481 {
482 if( chunk.getName().equals("stream:error") )
483 {
484 close();
485 break;
486 }
487 if( unListener != null )
488 unListener.incomingChunk(chunk);
489 else
490 System.err.println(this.getClass().getName()+": Unrecognized chunk received: '"+chunk+"'");
491 }
492 }
493 }
494
495 public void docStop (String tag)
496 {
497 if( DEBUG )
498 System.out.println("docStop "+tag);
499
500 connected = false;
501 }
502
503 public void processingInstruction (String element)
504 {
505 if( DEBUG )
506 System.out.println("pI: '"+element+"'");
507 }
508
509 public void dtdData (String dtd)
510 {
511 if( DEBUG )
512 System.out.println("dtdData: '"+dtd+"'");
513 }
514
515 private class ChunkProcessor
516 implements Runnable
517 {
518 public synchronized void run()
519 {
520 Chunk chunk = null;
521
522 chunk = (Chunk)chunkQueue.elementAt(0);
523 chunkQueue.removeElementAt(0);
524 processChunk(chunk);
525 }
526 }
527 /*
528 private String makeRandomID()
529 {
530 StringBuffer id = new StringBuffer();
531 int c = 0;
532 int r = 0;
533
534 for( int i = 20; i > 0; i -- )
535 {
536 c = random.nextInt();
537 c %= 26 * 2 + 10;
538 r = c / 26;
539 c %= 26;
540
541 switch( r )
542 {
543 case 0:
544 id.append( (byte)(c + _a_) );
545 break;
546 case 1:
547 id.append( (byte)(c + _A_) );
548 break;
549 default:
550 id.append( (byte)(c + _0_) );
551 break;
552 }
553 }
554
555 return id.toString();
556 }
557 */
558 }