Docjar: A Java Source and Docuemnt Enginecom.*    java.*    javax.*    org.*    all    new    plug-in

Quick Search    Search Deep

Source code: com/strangeberry/rendezvous/Rendezvous.java


1   // Copyright (C) 2002  Strangeberry Inc.
2   // %Z%%M%, %I%, %G%
3   //
4   // This library is free software; you can redistribute it and/or
5   // modify it under the terms of the GNU Lesser General Public
6   // License as published by the Free Software Foundation; either
7   // version 2.1 of the License, or (at your option) any later version.
8   // 
9   // This library is distributed in the hope that it will be useful,
10  // but WITHOUT ANY WARRANTY; without even the implied warranty of
11  // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  // Lesser General Public License for more details.
13  // 
14  // You should have received a copy of the GNU Lesser General Public
15  // License along with this library; if not, write to the Free Software
16  // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
17  
18  package com.strangeberry.rendezvous;
19  
20  import java.io.*;
21  import java.net.*;
22  import java.util.*;
23  
24  // REMIND: multiple IP addresses
25  
26  /**
27   * Rendezvous implementation in Java.
28   *
29   * @author  Arthur van Hoff
30   * @version   %I%, %G%
31   */
32  public class Rendezvous extends DNSConstants
33  {
34      static int debug = Integer.parseInt(System.getProperty("rendezvous.debug", "0"));
35  
36      InetAddress group;
37      MulticastSocket socket;
38      Vector listeners;
39      Vector browsers;
40      DNSCache cache;
41      Hashtable services;
42      Thread shutdown;
43      boolean done;
44  
45      /**
46       * Create an instance of Rendezvous.
47       */
48      public Rendezvous() throws IOException
49      {
50    try {
51        init(InetAddress.getLocalHost());
52    } catch (IOException e) {
53        init(null);
54    }
55      }
56  
57      /**
58       * Create an instance of Rendezvous and bind it to a
59       * specific network interface given its IP-address.
60       */
61      public Rendezvous(InetAddress addr) throws IOException
62      {
63    init(addr);
64      }
65  
66      /**
67       * Initialize everything.
68       */
69      void init(InetAddress intf) throws IOException
70      {
71    group = InetAddress.getByName(MDNS_GROUP);
72    socket = new MulticastSocket(MDNS_PORT);
73    if (intf != null) {
74        socket.setInterface(intf);
75    }
76    socket.setTimeToLive(255);
77    socket.joinGroup(group);
78  
79    cache = new DNSCache(100);
80    listeners = new Vector();
81    browsers = new Vector();
82    services = new Hashtable(20);
83  
84    new Thread(new SocketListener(), "Rendezvous.SocketListener").start();
85    new Thread(new RecordReaper(), "Rendezvous.RecordReaper").start();
86    shutdown = new Thread(new Shutdown(), "Rendezvous.Shutdown");
87    Runtime.getRuntime().addShutdownHook(shutdown);
88      }
89  
90      /**
91       * Get service information. If the information is not cached, the method
92       * will block until updated informatin is received.
93       *
94       * @param type full qualified service type, such as <code>_http._tcp.local.</code>.
95       * @param type full qualified service name, such as <code>foobar._http._tcp.local.</code>.
96       * @return null if the service information cannot be obtained
97       */
98      public ServiceInfo getServiceInfo(String type, String name)
99      {
100   return getServiceInfo(type, name, 3*1000);
101     }
102 
103     /**
104      * Get service information. If the information is not cached, the method
105      * will block for the given timeout until updated informatin is received.
106      *
107      * @param type full qualified service type, such as <code>_http._tcp.local.</code>.
108      * @param type full qualified service name, such as <code>foobar._http._tcp.local.</code>.
109      * @param timeout timeout in milliseconds
110      * @return null if the service information cannot be obtained
111      */
112     public ServiceInfo getServiceInfo(String type, String name, int timeout)
113     {
114   ServiceInfo info = new ServiceInfo(type, name);
115   return info.request(this, timeout) ? info : null;
116     }
117 
118     /**
119      * Listen for services of a given type. The type has to be a fully qualified
120      * type name such as <code>_http._tcp.local.</code>.
121      * @param type full qualified service type, such as <code>_http._tcp.local.</code>.
122      * @param listener listener for service updates
123      */
124     public synchronized void addServiceListener(String type, ServiceListener listener)
125     {
126   removeServiceListener(listener);
127   browsers.addElement(new ServiceBrowser(type, listener));
128     }
129 
130     /**
131      * Remove listener for services of a given type.
132      * @param listener listener for service updates
133      */
134     public synchronized void removeServiceListener(ServiceListener listener)
135     {
136   for (int i = browsers.size() ; i-- > 0 ;) {
137       ServiceBrowser browser = (ServiceBrowser)browsers.elementAt(i);
138       if (browser.listener == listener) {
139     browsers.removeElementAt(i);
140     browser.close();
141     return;
142       }
143   }
144     }
145 
146     /**
147      * Register a service. The service is registered for access by other rendezvous clients.
148      * The name of the service may be changed to make it unique.
149      */
150     public synchronized void registerService(ServiceInfo info) throws IOException
151     {
152   try {
153       // check for a unqiue name
154       checkService(info);
155 
156       // add the service
157       services.put(info.name.toLowerCase(), info);
158 
159       // announce the service
160       long now = System.currentTimeMillis();
161       long nextTime = now;
162       for (int i = 0 ; i < 3 ;) {
163     if (now < nextTime) {
164         wait(nextTime - now);
165         now = System.currentTimeMillis();
166         continue;
167     }
168     DNSOutgoing out = new DNSOutgoing(FLAGS_QR_RESPONSE | FLAGS_AA);
169     out.addAnswer(new DNSRecord.Pointer(info.type, TYPE_PTR, CLASS_IN, 60, info.name), 0);
170     out.addAnswer(new DNSRecord.Service(info.name, TYPE_SRV, CLASS_IN, 60, info.priority, info.weight, info.port, info.name), 0);
171     out.addAnswer(new DNSRecord.Text(info.name, TYPE_TXT, CLASS_IN, 60, info.text), 0);
172     out.addAnswer(new DNSRecord.Address(info.name, TYPE_A, CLASS_IN, 60, info.getIPAddress()), 0);
173     send(out);
174     i++;
175     nextTime += 225;
176       }
177   } catch (InterruptedException e) {
178       throw new IOException("interrupted I/O");
179   }
180     }
181 
182     /**
183      * Unregister a service. The service should have been registered.
184      */
185     public synchronized void unregisterService(ServiceInfo info)
186     {
187   try {
188       services.remove(info.name);
189 
190       // unregister the service
191       long now = System.currentTimeMillis();
192       long nextTime = now;
193       for (int i = 0 ; i < 3 ; ) {
194     if (now < nextTime) {
195         wait(nextTime - now);
196         now = System.currentTimeMillis();
197         continue;
198     }
199     DNSOutgoing out = new DNSOutgoing(FLAGS_QR_RESPONSE | FLAGS_AA);
200     out.addAnswer(new DNSRecord.Pointer(info.type, TYPE_PTR, CLASS_IN, 0, info.name), 0);
201     send(out);
202     i++;
203     nextTime += 125;
204       }
205   } catch (IOException e) {
206       // ignore
207   } catch (InterruptedException e) {
208       // ignore
209   }
210     }
211 
212     /**
213      * Unregister a service.
214      */
215     public synchronized void unregisterAllServices()
216     {
217   if (services.size() == 0) {
218       return;
219   }
220 
221   try {
222       // unregister all services
223       long now = System.currentTimeMillis();
224       long nextTime = now;
225       for (int i = 0 ; i < 3 ; ) {
226     if (now < nextTime) {
227         wait(nextTime - now);
228         now = System.currentTimeMillis();
229         continue;
230     }
231     DNSOutgoing out = new DNSOutgoing(FLAGS_QR_RESPONSE | FLAGS_AA);
232     for (Enumeration e = services.elements() ; e.hasMoreElements() ;) {
233         ServiceInfo info = (ServiceInfo)e.nextElement();
234         out.addAnswer(new DNSRecord.Pointer(info.type, TYPE_PTR, CLASS_IN, 0, info.name), 0);
235     }
236     send(out);
237     i++;
238     nextTime += 125;
239       }
240   } catch (IOException e) {
241       // ignore
242   } catch (InterruptedException e) {
243       // ignore
244   }
245     }
246 
247     /**
248      * Check that a service name is unique.
249      */
250     void checkService(ServiceInfo info) throws IOException, InterruptedException
251     {
252   long now = System.currentTimeMillis();
253   long nextTime = now;
254   for (int i = 0 ; i < 3 ;) {
255       for (Iterator j = cache.find(info.type) ; j.hasNext() ;) {
256     DNSRecord a = (DNSRecord)j.next();
257     if ((a.type == TYPE_PTR) && !a.isExpired(now) && info.name.equals(((DNSRecord.Pointer)a).alias)) {
258         // collision, uniquify using the IP address
259         if (info.getName().indexOf('.') < 0) {
260       info.name = info.getName() + ".[" + Integer.toHexString(info.getIPAddress()) + ":" + info.port + "]." + info.type;
261       checkService(info);
262       return;
263         }
264         throw new IOException("failed to pick unique service name");
265     }
266       }
267       if (now < nextTime) {
268     wait(nextTime - now);
269     now = System.currentTimeMillis();
270     continue;
271       }
272       DNSOutgoing out = new DNSOutgoing(FLAGS_QR_QUERY | FLAGS_AA);
273       out.addQuestion(new DNSQuestion(info.type, TYPE_PTR, CLASS_IN));
274       out.addAuthorativeAnswer(new DNSRecord.Pointer(info.type, TYPE_PTR, CLASS_IN, 60, info.name));
275       send(out);
276       i++;
277       nextTime += 175;
278   }
279     }
280 
281     /**
282      * Listener for record updates.
283      */
284     static abstract class Listener extends DNSConstants {
285   /**
286    * Update a DNS record.
287    */
288   abstract void updateRecord(Rendezvous rendezvous, long now, DNSRecord record);
289     }
290 
291     /**
292      * Add a listener for a question. The listener will receive updates to
293      * of answers to the question as they arrive, or from the cache if they
294      * are already available.
295      */
296     synchronized void addListener(Listener listener, DNSQuestion question)
297     {
298   long now = System.currentTimeMillis();
299 
300   // add the new listener
301   listeners.addElement(listener);
302 
303   // report existing matched records
304   if (question != null) {
305       for (Iterator i = cache.find(question.name) ; i.hasNext() ; ) {
306     DNSRecord c = (DNSRecord)i.next();
307     if (question.answeredBy(c) && !c.isExpired(now)) {
308         listener.updateRecord(this, now, c);
309     }
310       }
311   }
312   notifyAll();
313     }
314 
315     /**
316      * Remove a listener from all outstanding questions. The listener will no longer
317      * receive any updates.
318      */
319     synchronized void removeListener(Listener listener)
320     {
321   listeners.removeElement(listener);
322   notifyAll();
323     }
324 
325     /**
326      * Notify all listeners that a record was updated.
327      */
328     synchronized void updateRecord(long now, DNSRecord rec)
329     {
330   for (Enumeration e = listeners.elements() ; e.hasMoreElements() ;) {
331       Listener listener = (Listener)e.nextElement();
332       listener.updateRecord(this, now, rec);
333   }
334   notifyAll();
335     }
336 
337     /**
338      * Handle an incoming response. Cache answers, and pass them on to
339      * the appropriate questions.
340      */
341     synchronized void handleResponse(DNSIncoming msg) throws IOException
342     {
343   long now = System.currentTimeMillis();
344 
345   for (Enumeration e = msg.answers.elements() ; e.hasMoreElements() ;) {
346       DNSRecord rec = (DNSRecord)e.nextElement();
347       boolean expired = rec.isExpired(now);
348 
349       // update the cache
350       DNSRecord c = (DNSRecord)cache.get(rec);
351       if (c != null) {
352     if (expired) {
353         cache.remove(c);
354     } else {
355         c.resetTTL(rec);
356         rec = c;
357     }
358       } else if (!expired) {
359     cache.add(rec);
360       }
361 
362       // notify the listeners
363       updateRecord(now, rec);
364   }
365     }
366 
367     /**
368      * Handle an incoming query. See if we can answer any part of it
369      * given our registered records.
370      */
371     synchronized void handleQuery(DNSIncoming in, InetAddress addr, int port) throws IOException
372     {
373   DNSOutgoing out = null;
374 
375   // for unicast responses the question must be included
376   if (port != MDNS_PORT) {
377       out = new DNSOutgoing(FLAGS_QR_RESPONSE | FLAGS_AA, false);
378       for (Enumeration e = in.questions.elements() ; e.hasMoreElements() ;) {
379     out.addQuestion((DNSQuestion)e.nextElement());
380       }
381   }
382 
383   // answer relevant questions
384   for (Enumeration e = in.questions.elements() ; e.hasMoreElements() ;) {
385       DNSQuestion q = (DNSQuestion)e.nextElement();
386       switch (q.type) {
387         case TYPE_PTR:
388     // find matching services
389     for (Enumeration s = services.elements() ; s.hasMoreElements() ; ) {
390         ServiceInfo info = (ServiceInfo)s.nextElement();
391         if (q.name.equals(info.type)) {
392       if (out == null) {
393           out = new DNSOutgoing(FLAGS_QR_RESPONSE | FLAGS_AA);
394       }
395       out.addAnswer(in, new DNSRecord.Pointer(info.type, TYPE_PTR, CLASS_IN, 60, info.name));
396         }
397     }
398     break;
399         default:
400     // find service
401     ServiceInfo info = (ServiceInfo)services.get(q.name.toLowerCase());
402     if (info != null) {
403         if (out == null) {
404       out = new DNSOutgoing(FLAGS_QR_RESPONSE | FLAGS_AA);
405         }
406         if ((q.type == TYPE_SRV) || (q.type == TYPE_ANY)) {
407       out.addAnswer(in, new DNSRecord.Service(q.name, TYPE_SRV, CLASS_IN | CLASS_UNIQUE, 60, info.priority, info.weight, info.port, info.name));
408         }
409         if ((q.type == TYPE_TXT) || (q.type == TYPE_ANY)) {
410       out.addAnswer(in, new DNSRecord.Text(q.name, TYPE_TXT, CLASS_IN | CLASS_UNIQUE, 60, info.text));
411         }
412         if ((q.type == TYPE_A) || (q.type == TYPE_ANY)) {
413       out.addAnswer(in, new DNSRecord.Address(q.name, TYPE_A, CLASS_IN | CLASS_UNIQUE, 60, info.getIPAddress()));
414         }
415     }
416       } 
417   }
418   if ((out != null) && (out.numAnswers > 0)) {
419       out.id = in.id;
420       out.finish();
421       socket.send(new DatagramPacket(out.data, out.off, addr, port));
422   }
423     }
424 
425     /**
426      * Send a message an outgoing multicast DNS message.
427      */
428     synchronized void send(DNSOutgoing out) throws IOException
429     {
430   out.finish();
431   socket.send(new DatagramPacket(out.data, out.off, group, MDNS_PORT));
432     }
433 
434     /**
435      * Listen for multicast packets.
436      */
437     class SocketListener implements Runnable
438     {
439   public void run()
440   {
441       try {
442     byte buf[] = new byte[MAX_MSG_ABSOLUTE];
443     DatagramPacket packet = new DatagramPacket(buf, buf.length);
444     while (!done) {
445         packet.setLength(buf.length);
446         socket.receive(packet);
447         if (done) {
448       break;
449         }
450         try {
451       DNSIncoming msg = new DNSIncoming(packet);
452       if (debug > 0) {
453           msg.print(debug > 1);
454           System.out.println();
455       }
456       if (msg.isQuery()) {
457           handleQuery(msg, packet.getAddress(), packet.getPort());
458           if (packet.getPort() != MDNS_PORT) {
459         handleQuery(msg, packet.getAddress(), MDNS_PORT);
460           }
461       } else {
462           handleResponse(msg);
463       }
464         } catch (IOException e) {
465       e.printStackTrace();
466         }
467     }
468       } catch (IOException e) {
469     if (!done) {
470         e.printStackTrace();
471     }
472       }
473   }
474     }
475 
476     /**
477      * Schedule questions
478      */
479     class RecordReaper implements Runnable
480     {
481   public void run()
482   {
483 
484       try {
485     synchronized (Rendezvous.this) {
486         while (true) {
487       Rendezvous.this.wait(10 * 1000);
488       if (done) {
489           return;
490       }
491 
492       // remove expired answers from the cache
493       long now = System.currentTimeMillis();
494       for (Iterator i = cache.all() ; i.hasNext() ; ) {
495           DNSRecord c = (DNSRecord)i.next();
496           if (c.isExpired(now)) {
497         updateRecord(now, c);
498         i.remove();
499           }
500       }
501         }
502     }
503       } catch (InterruptedException e) {
504     e.printStackTrace();
505       }
506   }
507     }
508 
509     /**
510      * Browse for a service of a given type
511      */
512     class ServiceBrowser extends Rendezvous.Listener implements Runnable
513     {
514   String type;
515   ServiceListener listener;
516   Hashtable services;
517   long nextTime;
518   int delay;
519   boolean done;
520   LinkedList list;
521     
522   /**
523    * Create a browser for a service type.
524    */
525   ServiceBrowser(String type, ServiceListener listener)
526   {
527       this.type = type;
528       this.listener = listener;
529       this.services = new Hashtable();
530       this.nextTime = System.currentTimeMillis();
531       this.delay = 500;
532       this.list = new LinkedList();
533 
534       addListener(this, new DNSQuestion(type, TYPE_PTR, CLASS_IN));
535       new Thread(this, "Rendezvous.ServiceBrowser: " + type).start();
536   }
537 
538 
539   /**
540    * Event for notifying a service listener.
541    */
542   abstract class Event
543   {
544       String name;
545 
546       Event(String name)
547       {
548     this.name = name;
549       }
550       abstract void send();
551   }
552 
553   /**
554    * Update a record.
555    */
556   void updateRecord(Rendezvous rendezvous, long now, DNSRecord rec)
557   {
558       if ((rec.type == TYPE_PTR) && rec.name.equals(type)) {
559     boolean expired = rec.isExpired(now);
560     String name = ((DNSRecord.Pointer)rec).alias;
561     DNSRecord old = (DNSRecord)services.get(name.toLowerCase());
562     if ((old == null) && !expired) {
563         // new record
564         services.put(name.toLowerCase(), rec);
565         list.addLast(new Event(name) {
566           void send() {listener.addService(Rendezvous.this, type, this.name);}
567       });
568     } else if ((old != null) && !expired) {
569         // update record
570         old.resetTTL(rec);
571     } else if ((old != null) && expired) {
572         // expire record
573         services.remove(name);
574         list.addLast(new Event(name) {
575           void send() {listener.removeService(Rendezvous.this, type, this.name);}
576       });
577         return;
578     }
579 
580     // adjust next request time
581     long expires = rec.getExpirationTime(75);
582     if (expires < nextTime) {
583         nextTime = rec.getExpirationTime(75);
584     }
585       }
586   }
587 
588   /**
589    * Request.
590    */
591   public void run()
592   {
593       try {
594     while (true) {
595         Event evt = null;
596         
597         synchronized (Rendezvous.this) {
598       long now = System.currentTimeMillis();
599       if ((list.size() == 0) && (nextTime > now)) {
600           Rendezvous.this.wait(nextTime - now);
601       }
602       if (done) {
603           return;
604       }
605       now = System.currentTimeMillis();
606 
607       // send query
608       if (nextTime <= now) {
609           DNSOutgoing out = new DNSOutgoing(FLAGS_QR_QUERY);
610           out.addQuestion(new DNSQuestion(type, TYPE_PTR, CLASS_IN));
611           for (Enumeration e = services.elements() ; e.hasMoreElements() ;) {
612         DNSRecord rec = (DNSRecord)e.nextElement();
613         if (!rec.isExpired(now)) {
614             out.addAnswer(rec, now);
615         }
616           }
617           send(out);
618 
619           // schedule the next one
620           nextTime = now + delay;
621           delay = Math.min(20*1000, delay * 2);
622       }
623 
624       // get the next event
625       if (list.size() > 0) {
626           evt = (Event)list.removeFirst();
627       }
628         }
629         if (evt != null) {
630       evt.send();
631         }
632     }
633       } catch (IOException e) {
634     e.printStackTrace();
635       } catch (InterruptedException e) {
636     // oops
637       }
638   }
639 
640   void close()
641   {
642       synchronized (Rendezvous.this) {
643     if (!done) {
644         done = true;
645         removeListener(this);
646     }
647       }
648   }
649     }
650 
651     /**
652      * Shutdown operations.
653      */
654     class Shutdown implements Runnable
655     {
656   public void run()
657   {
658       shutdown = null;
659       close();
660   }
661     }
662 
663     /**
664      * Close down rendezvous. Release all resources and unregister all services.
665      */
666     public synchronized void close()
667     {
668   if (!done) {
669       done = true;
670       notifyAll();
671 
672       // remove the shutdown hook
673       if (shutdown != null) {
674     Runtime.getRuntime().removeShutdownHook(shutdown);
675       }
676 
677       // unregister services
678       unregisterAllServices();
679 
680       // close socket
681       try {
682     socket.leaveGroup(group);
683     socket.close();
684       } catch (IOException e) {
685     // ignore
686       }
687   }
688     }
689 
690     /**
691      * List cache entries, for debugging only.
692      */
693     void print()
694     {
695   if (cache.count > 0) {
696       System.out.println("---- cache ----");
697       cache.print();
698       System.out.println();
699   }
700     }
701 }