Source code: com/strangeberry/rendezvous/Rendezvous.java
1 // Copyright (C) 2002 Strangeberry Inc.
2 // %Z%%M%, %I%, %G%
3 //
4 // This library is free software; you can redistribute it and/or
5 // modify it under the terms of the GNU Lesser General Public
6 // License as published by the Free Software Foundation; either
7 // version 2.1 of the License, or (at your option) any later version.
8 //
9 // This library is distributed in the hope that it will be useful,
10 // but WITHOUT ANY WARRANTY; without even the implied warranty of
11 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 // Lesser General Public License for more details.
13 //
14 // You should have received a copy of the GNU Lesser General Public
15 // License along with this library; if not, write to the Free Software
16 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
18 package com.strangeberry.rendezvous;
19
20 import java.io.*;
21 import java.net.*;
22 import java.util.*;
23
24 // REMIND: multiple IP addresses
25
26 /**
27 * Rendezvous implementation in Java.
28 *
29 * @author Arthur van Hoff
30 * @version %I%, %G%
31 */
32 public class Rendezvous extends DNSConstants
33 {
34 static int debug = Integer.parseInt(System.getProperty("rendezvous.debug", "0"));
35
36 InetAddress group;
37 MulticastSocket socket;
38 Vector listeners;
39 Vector browsers;
40 DNSCache cache;
41 Hashtable services;
42 Thread shutdown;
43 boolean done;
44
45 /**
46 * Create an instance of Rendezvous.
47 */
48 public Rendezvous() throws IOException
49 {
50 try {
51 init(InetAddress.getLocalHost());
52 } catch (IOException e) {
53 init(null);
54 }
55 }
56
57 /**
58 * Create an instance of Rendezvous and bind it to a
59 * specific network interface given its IP-address.
60 */
61 public Rendezvous(InetAddress addr) throws IOException
62 {
63 init(addr);
64 }
65
66 /**
67 * Initialize everything.
68 */
69 void init(InetAddress intf) throws IOException
70 {
71 group = InetAddress.getByName(MDNS_GROUP);
72 socket = new MulticastSocket(MDNS_PORT);
73 if (intf != null) {
74 socket.setInterface(intf);
75 }
76 socket.setTimeToLive(255);
77 socket.joinGroup(group);
78
79 cache = new DNSCache(100);
80 listeners = new Vector();
81 browsers = new Vector();
82 services = new Hashtable(20);
83
84 new Thread(new SocketListener(), "Rendezvous.SocketListener").start();
85 new Thread(new RecordReaper(), "Rendezvous.RecordReaper").start();
86 shutdown = new Thread(new Shutdown(), "Rendezvous.Shutdown");
87 Runtime.getRuntime().addShutdownHook(shutdown);
88 }
89
90 /**
91 * Get service information. If the information is not cached, the method
92 * will block until updated informatin is received.
93 *
94 * @param type full qualified service type, such as <code>_http._tcp.local.</code>.
95 * @param type full qualified service name, such as <code>foobar._http._tcp.local.</code>.
96 * @return null if the service information cannot be obtained
97 */
98 public ServiceInfo getServiceInfo(String type, String name)
99 {
100 return getServiceInfo(type, name, 3*1000);
101 }
102
103 /**
104 * Get service information. If the information is not cached, the method
105 * will block for the given timeout until updated informatin is received.
106 *
107 * @param type full qualified service type, such as <code>_http._tcp.local.</code>.
108 * @param type full qualified service name, such as <code>foobar._http._tcp.local.</code>.
109 * @param timeout timeout in milliseconds
110 * @return null if the service information cannot be obtained
111 */
112 public ServiceInfo getServiceInfo(String type, String name, int timeout)
113 {
114 ServiceInfo info = new ServiceInfo(type, name);
115 return info.request(this, timeout) ? info : null;
116 }
117
118 /**
119 * Listen for services of a given type. The type has to be a fully qualified
120 * type name such as <code>_http._tcp.local.</code>.
121 * @param type full qualified service type, such as <code>_http._tcp.local.</code>.
122 * @param listener listener for service updates
123 */
124 public synchronized void addServiceListener(String type, ServiceListener listener)
125 {
126 removeServiceListener(listener);
127 browsers.addElement(new ServiceBrowser(type, listener));
128 }
129
130 /**
131 * Remove listener for services of a given type.
132 * @param listener listener for service updates
133 */
134 public synchronized void removeServiceListener(ServiceListener listener)
135 {
136 for (int i = browsers.size() ; i-- > 0 ;) {
137 ServiceBrowser browser = (ServiceBrowser)browsers.elementAt(i);
138 if (browser.listener == listener) {
139 browsers.removeElementAt(i);
140 browser.close();
141 return;
142 }
143 }
144 }
145
146 /**
147 * Register a service. The service is registered for access by other rendezvous clients.
148 * The name of the service may be changed to make it unique.
149 */
150 public synchronized void registerService(ServiceInfo info) throws IOException
151 {
152 try {
153 // check for a unqiue name
154 checkService(info);
155
156 // add the service
157 services.put(info.name.toLowerCase(), info);
158
159 // announce the service
160 long now = System.currentTimeMillis();
161 long nextTime = now;
162 for (int i = 0 ; i < 3 ;) {
163 if (now < nextTime) {
164 wait(nextTime - now);
165 now = System.currentTimeMillis();
166 continue;
167 }
168 DNSOutgoing out = new DNSOutgoing(FLAGS_QR_RESPONSE | FLAGS_AA);
169 out.addAnswer(new DNSRecord.Pointer(info.type, TYPE_PTR, CLASS_IN, 60, info.name), 0);
170 out.addAnswer(new DNSRecord.Service(info.name, TYPE_SRV, CLASS_IN, 60, info.priority, info.weight, info.port, info.name), 0);
171 out.addAnswer(new DNSRecord.Text(info.name, TYPE_TXT, CLASS_IN, 60, info.text), 0);
172 out.addAnswer(new DNSRecord.Address(info.name, TYPE_A, CLASS_IN, 60, info.getIPAddress()), 0);
173 send(out);
174 i++;
175 nextTime += 225;
176 }
177 } catch (InterruptedException e) {
178 throw new IOException("interrupted I/O");
179 }
180 }
181
182 /**
183 * Unregister a service. The service should have been registered.
184 */
185 public synchronized void unregisterService(ServiceInfo info)
186 {
187 try {
188 services.remove(info.name);
189
190 // unregister the service
191 long now = System.currentTimeMillis();
192 long nextTime = now;
193 for (int i = 0 ; i < 3 ; ) {
194 if (now < nextTime) {
195 wait(nextTime - now);
196 now = System.currentTimeMillis();
197 continue;
198 }
199 DNSOutgoing out = new DNSOutgoing(FLAGS_QR_RESPONSE | FLAGS_AA);
200 out.addAnswer(new DNSRecord.Pointer(info.type, TYPE_PTR, CLASS_IN, 0, info.name), 0);
201 send(out);
202 i++;
203 nextTime += 125;
204 }
205 } catch (IOException e) {
206 // ignore
207 } catch (InterruptedException e) {
208 // ignore
209 }
210 }
211
212 /**
213 * Unregister a service.
214 */
215 public synchronized void unregisterAllServices()
216 {
217 if (services.size() == 0) {
218 return;
219 }
220
221 try {
222 // unregister all services
223 long now = System.currentTimeMillis();
224 long nextTime = now;
225 for (int i = 0 ; i < 3 ; ) {
226 if (now < nextTime) {
227 wait(nextTime - now);
228 now = System.currentTimeMillis();
229 continue;
230 }
231 DNSOutgoing out = new DNSOutgoing(FLAGS_QR_RESPONSE | FLAGS_AA);
232 for (Enumeration e = services.elements() ; e.hasMoreElements() ;) {
233 ServiceInfo info = (ServiceInfo)e.nextElement();
234 out.addAnswer(new DNSRecord.Pointer(info.type, TYPE_PTR, CLASS_IN, 0, info.name), 0);
235 }
236 send(out);
237 i++;
238 nextTime += 125;
239 }
240 } catch (IOException e) {
241 // ignore
242 } catch (InterruptedException e) {
243 // ignore
244 }
245 }
246
247 /**
248 * Check that a service name is unique.
249 */
250 void checkService(ServiceInfo info) throws IOException, InterruptedException
251 {
252 long now = System.currentTimeMillis();
253 long nextTime = now;
254 for (int i = 0 ; i < 3 ;) {
255 for (Iterator j = cache.find(info.type) ; j.hasNext() ;) {
256 DNSRecord a = (DNSRecord)j.next();
257 if ((a.type == TYPE_PTR) && !a.isExpired(now) && info.name.equals(((DNSRecord.Pointer)a).alias)) {
258 // collision, uniquify using the IP address
259 if (info.getName().indexOf('.') < 0) {
260 info.name = info.getName() + ".[" + Integer.toHexString(info.getIPAddress()) + ":" + info.port + "]." + info.type;
261 checkService(info);
262 return;
263 }
264 throw new IOException("failed to pick unique service name");
265 }
266 }
267 if (now < nextTime) {
268 wait(nextTime - now);
269 now = System.currentTimeMillis();
270 continue;
271 }
272 DNSOutgoing out = new DNSOutgoing(FLAGS_QR_QUERY | FLAGS_AA);
273 out.addQuestion(new DNSQuestion(info.type, TYPE_PTR, CLASS_IN));
274 out.addAuthorativeAnswer(new DNSRecord.Pointer(info.type, TYPE_PTR, CLASS_IN, 60, info.name));
275 send(out);
276 i++;
277 nextTime += 175;
278 }
279 }
280
281 /**
282 * Listener for record updates.
283 */
284 static abstract class Listener extends DNSConstants {
285 /**
286 * Update a DNS record.
287 */
288 abstract void updateRecord(Rendezvous rendezvous, long now, DNSRecord record);
289 }
290
291 /**
292 * Add a listener for a question. The listener will receive updates to
293 * of answers to the question as they arrive, or from the cache if they
294 * are already available.
295 */
296 synchronized void addListener(Listener listener, DNSQuestion question)
297 {
298 long now = System.currentTimeMillis();
299
300 // add the new listener
301 listeners.addElement(listener);
302
303 // report existing matched records
304 if (question != null) {
305 for (Iterator i = cache.find(question.name) ; i.hasNext() ; ) {
306 DNSRecord c = (DNSRecord)i.next();
307 if (question.answeredBy(c) && !c.isExpired(now)) {
308 listener.updateRecord(this, now, c);
309 }
310 }
311 }
312 notifyAll();
313 }
314
315 /**
316 * Remove a listener from all outstanding questions. The listener will no longer
317 * receive any updates.
318 */
319 synchronized void removeListener(Listener listener)
320 {
321 listeners.removeElement(listener);
322 notifyAll();
323 }
324
325 /**
326 * Notify all listeners that a record was updated.
327 */
328 synchronized void updateRecord(long now, DNSRecord rec)
329 {
330 for (Enumeration e = listeners.elements() ; e.hasMoreElements() ;) {
331 Listener listener = (Listener)e.nextElement();
332 listener.updateRecord(this, now, rec);
333 }
334 notifyAll();
335 }
336
337 /**
338 * Handle an incoming response. Cache answers, and pass them on to
339 * the appropriate questions.
340 */
341 synchronized void handleResponse(DNSIncoming msg) throws IOException
342 {
343 long now = System.currentTimeMillis();
344
345 for (Enumeration e = msg.answers.elements() ; e.hasMoreElements() ;) {
346 DNSRecord rec = (DNSRecord)e.nextElement();
347 boolean expired = rec.isExpired(now);
348
349 // update the cache
350 DNSRecord c = (DNSRecord)cache.get(rec);
351 if (c != null) {
352 if (expired) {
353 cache.remove(c);
354 } else {
355 c.resetTTL(rec);
356 rec = c;
357 }
358 } else if (!expired) {
359 cache.add(rec);
360 }
361
362 // notify the listeners
363 updateRecord(now, rec);
364 }
365 }
366
367 /**
368 * Handle an incoming query. See if we can answer any part of it
369 * given our registered records.
370 */
371 synchronized void handleQuery(DNSIncoming in, InetAddress addr, int port) throws IOException
372 {
373 DNSOutgoing out = null;
374
375 // for unicast responses the question must be included
376 if (port != MDNS_PORT) {
377 out = new DNSOutgoing(FLAGS_QR_RESPONSE | FLAGS_AA, false);
378 for (Enumeration e = in.questions.elements() ; e.hasMoreElements() ;) {
379 out.addQuestion((DNSQuestion)e.nextElement());
380 }
381 }
382
383 // answer relevant questions
384 for (Enumeration e = in.questions.elements() ; e.hasMoreElements() ;) {
385 DNSQuestion q = (DNSQuestion)e.nextElement();
386 switch (q.type) {
387 case TYPE_PTR:
388 // find matching services
389 for (Enumeration s = services.elements() ; s.hasMoreElements() ; ) {
390 ServiceInfo info = (ServiceInfo)s.nextElement();
391 if (q.name.equals(info.type)) {
392 if (out == null) {
393 out = new DNSOutgoing(FLAGS_QR_RESPONSE | FLAGS_AA);
394 }
395 out.addAnswer(in, new DNSRecord.Pointer(info.type, TYPE_PTR, CLASS_IN, 60, info.name));
396 }
397 }
398 break;
399 default:
400 // find service
401 ServiceInfo info = (ServiceInfo)services.get(q.name.toLowerCase());
402 if (info != null) {
403 if (out == null) {
404 out = new DNSOutgoing(FLAGS_QR_RESPONSE | FLAGS_AA);
405 }
406 if ((q.type == TYPE_SRV) || (q.type == TYPE_ANY)) {
407 out.addAnswer(in, new DNSRecord.Service(q.name, TYPE_SRV, CLASS_IN | CLASS_UNIQUE, 60, info.priority, info.weight, info.port, info.name));
408 }
409 if ((q.type == TYPE_TXT) || (q.type == TYPE_ANY)) {
410 out.addAnswer(in, new DNSRecord.Text(q.name, TYPE_TXT, CLASS_IN | CLASS_UNIQUE, 60, info.text));
411 }
412 if ((q.type == TYPE_A) || (q.type == TYPE_ANY)) {
413 out.addAnswer(in, new DNSRecord.Address(q.name, TYPE_A, CLASS_IN | CLASS_UNIQUE, 60, info.getIPAddress()));
414 }
415 }
416 }
417 }
418 if ((out != null) && (out.numAnswers > 0)) {
419 out.id = in.id;
420 out.finish();
421 socket.send(new DatagramPacket(out.data, out.off, addr, port));
422 }
423 }
424
425 /**
426 * Send a message an outgoing multicast DNS message.
427 */
428 synchronized void send(DNSOutgoing out) throws IOException
429 {
430 out.finish();
431 socket.send(new DatagramPacket(out.data, out.off, group, MDNS_PORT));
432 }
433
434 /**
435 * Listen for multicast packets.
436 */
437 class SocketListener implements Runnable
438 {
439 public void run()
440 {
441 try {
442 byte buf[] = new byte[MAX_MSG_ABSOLUTE];
443 DatagramPacket packet = new DatagramPacket(buf, buf.length);
444 while (!done) {
445 packet.setLength(buf.length);
446 socket.receive(packet);
447 if (done) {
448 break;
449 }
450 try {
451 DNSIncoming msg = new DNSIncoming(packet);
452 if (debug > 0) {
453 msg.print(debug > 1);
454 System.out.println();
455 }
456 if (msg.isQuery()) {
457 handleQuery(msg, packet.getAddress(), packet.getPort());
458 if (packet.getPort() != MDNS_PORT) {
459 handleQuery(msg, packet.getAddress(), MDNS_PORT);
460 }
461 } else {
462 handleResponse(msg);
463 }
464 } catch (IOException e) {
465 e.printStackTrace();
466 }
467 }
468 } catch (IOException e) {
469 if (!done) {
470 e.printStackTrace();
471 }
472 }
473 }
474 }
475
476 /**
477 * Schedule questions
478 */
479 class RecordReaper implements Runnable
480 {
481 public void run()
482 {
483
484 try {
485 synchronized (Rendezvous.this) {
486 while (true) {
487 Rendezvous.this.wait(10 * 1000);
488 if (done) {
489 return;
490 }
491
492 // remove expired answers from the cache
493 long now = System.currentTimeMillis();
494 for (Iterator i = cache.all() ; i.hasNext() ; ) {
495 DNSRecord c = (DNSRecord)i.next();
496 if (c.isExpired(now)) {
497 updateRecord(now, c);
498 i.remove();
499 }
500 }
501 }
502 }
503 } catch (InterruptedException e) {
504 e.printStackTrace();
505 }
506 }
507 }
508
509 /**
510 * Browse for a service of a given type
511 */
512 class ServiceBrowser extends Rendezvous.Listener implements Runnable
513 {
514 String type;
515 ServiceListener listener;
516 Hashtable services;
517 long nextTime;
518 int delay;
519 boolean done;
520 LinkedList list;
521
522 /**
523 * Create a browser for a service type.
524 */
525 ServiceBrowser(String type, ServiceListener listener)
526 {
527 this.type = type;
528 this.listener = listener;
529 this.services = new Hashtable();
530 this.nextTime = System.currentTimeMillis();
531 this.delay = 500;
532 this.list = new LinkedList();
533
534 addListener(this, new DNSQuestion(type, TYPE_PTR, CLASS_IN));
535 new Thread(this, "Rendezvous.ServiceBrowser: " + type).start();
536 }
537
538
539 /**
540 * Event for notifying a service listener.
541 */
542 abstract class Event
543 {
544 String name;
545
546 Event(String name)
547 {
548 this.name = name;
549 }
550 abstract void send();
551 }
552
553 /**
554 * Update a record.
555 */
556 void updateRecord(Rendezvous rendezvous, long now, DNSRecord rec)
557 {
558 if ((rec.type == TYPE_PTR) && rec.name.equals(type)) {
559 boolean expired = rec.isExpired(now);
560 String name = ((DNSRecord.Pointer)rec).alias;
561 DNSRecord old = (DNSRecord)services.get(name.toLowerCase());
562 if ((old == null) && !expired) {
563 // new record
564 services.put(name.toLowerCase(), rec);
565 list.addLast(new Event(name) {
566 void send() {listener.addService(Rendezvous.this, type, this.name);}
567 });
568 } else if ((old != null) && !expired) {
569 // update record
570 old.resetTTL(rec);
571 } else if ((old != null) && expired) {
572 // expire record
573 services.remove(name);
574 list.addLast(new Event(name) {
575 void send() {listener.removeService(Rendezvous.this, type, this.name);}
576 });
577 return;
578 }
579
580 // adjust next request time
581 long expires = rec.getExpirationTime(75);
582 if (expires < nextTime) {
583 nextTime = rec.getExpirationTime(75);
584 }
585 }
586 }
587
588 /**
589 * Request.
590 */
591 public void run()
592 {
593 try {
594 while (true) {
595 Event evt = null;
596
597 synchronized (Rendezvous.this) {
598 long now = System.currentTimeMillis();
599 if ((list.size() == 0) && (nextTime > now)) {
600 Rendezvous.this.wait(nextTime - now);
601 }
602 if (done) {
603 return;
604 }
605 now = System.currentTimeMillis();
606
607 // send query
608 if (nextTime <= now) {
609 DNSOutgoing out = new DNSOutgoing(FLAGS_QR_QUERY);
610 out.addQuestion(new DNSQuestion(type, TYPE_PTR, CLASS_IN));
611 for (Enumeration e = services.elements() ; e.hasMoreElements() ;) {
612 DNSRecord rec = (DNSRecord)e.nextElement();
613 if (!rec.isExpired(now)) {
614 out.addAnswer(rec, now);
615 }
616 }
617 send(out);
618
619 // schedule the next one
620 nextTime = now + delay;
621 delay = Math.min(20*1000, delay * 2);
622 }
623
624 // get the next event
625 if (list.size() > 0) {
626 evt = (Event)list.removeFirst();
627 }
628 }
629 if (evt != null) {
630 evt.send();
631 }
632 }
633 } catch (IOException e) {
634 e.printStackTrace();
635 } catch (InterruptedException e) {
636 // oops
637 }
638 }
639
640 void close()
641 {
642 synchronized (Rendezvous.this) {
643 if (!done) {
644 done = true;
645 removeListener(this);
646 }
647 }
648 }
649 }
650
651 /**
652 * Shutdown operations.
653 */
654 class Shutdown implements Runnable
655 {
656 public void run()
657 {
658 shutdown = null;
659 close();
660 }
661 }
662
663 /**
664 * Close down rendezvous. Release all resources and unregister all services.
665 */
666 public synchronized void close()
667 {
668 if (!done) {
669 done = true;
670 notifyAll();
671
672 // remove the shutdown hook
673 if (shutdown != null) {
674 Runtime.getRuntime().removeShutdownHook(shutdown);
675 }
676
677 // unregister services
678 unregisterAllServices();
679
680 // close socket
681 try {
682 socket.leaveGroup(group);
683 socket.close();
684 } catch (IOException e) {
685 // ignore
686 }
687 }
688 }
689
690 /**
691 * List cache entries, for debugging only.
692 */
693 void print()
694 {
695 if (cache.count > 0) {
696 System.out.println("---- cache ----");
697 cache.print();
698 System.out.println();
699 }
700 }
701 }