Source code: net/jxta/impl/rendezvous/rpv/PeerView.java
1 /*
2 * Copyright (c) 2002-2004 Sun Microsystems, Inc. All rights reserved.
3 *
4 * Redistribution and use in source and binary forms, with or without
5 * modification, are permitted provided that the following conditions
6 * are met:
7 *
8 * 1. Redistributions of source code must retain the above copyright
9 * notice, this list of conditions and the following disclaimer.
10 *
11 * 2. Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in
13 * the documentation and/or other materials provided with the
14 * distribution.
15 *
16 * 3. The end-user documentation included with the redistribution,
17 * if any, must include the following acknowledgment:
18 * "This product includes software developed by the
19 * Sun Microsystems, Inc. for Project JXTA."
20 * Alternately, this acknowledgment may appear in the software itself,
21 * if and wherever such third-party acknowledgments normally appear.
22 *
23 * 4. The names "Sun", "Sun Microsystems, Inc.", "JXTA" and
24 * "Project JXTA" must not be used to endorse or promote products
25 * derived from this software without prior written permission.
26 * For written permission, please contact Project JXTA at
27 * http://www.jxta.org.
28 *
29 * 5. Products derived from this software may not be called "JXTA",
30 * nor may "JXTA" appear in their name, without prior written
31 * permission of Sun.
32 *
33 * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
34 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
35 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
36 * DISCLAIMED. IN NO EVENT SHALL SUN MICROSYSTEMS OR
37 * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
38 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
39 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
40 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
41 * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
42 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
43 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
44 * SUCH DAMAGE.
45 * ====================================================================
46 *
47 * This software consists of voluntary contributions made by many
48 * individuals on behalf of Project JXTA. For more
49 * information on Project JXTA, please see
50 * <http://www.jxta.org/>.
51 *
52 * This license is based on the BSD license adopted by the Apache
53 * Foundation.
54 *
55 * $Id: PeerView.java,v 1.92 2004/12/07 20:17:41 bondolo Exp $
56 */
57
58 package net.jxta.impl.rendezvous.rpv;
59
60 import java.io.BufferedReader;
61 import java.io.InputStream;
62 import java.io.InputStreamReader;
63 import java.net.URI;
64 import java.net.URL;
65 import java.net.URLConnection;
66 import java.util.ArrayList;
67 import java.util.Arrays;
68 import java.util.Collections;
69 import java.util.Enumeration;
70 import java.util.HashSet;
71 import java.util.Iterator;
72 import java.util.List;
73 import java.util.ListIterator;
74 import java.util.Random;
75 import java.util.Set;
76 import java.util.SortedSet;
77 import java.util.Timer;
78 import java.util.TimerTask;
79 import java.util.TreeSet;
80 import java.util.Vector;
81
82 import java.io.IOException;
83 import java.io.File;
84 import java.net.URISyntaxException;
85 import java.util.NoSuchElementException;
86
87 import org.apache.log4j.Level;
88 import org.apache.log4j.Logger;
89
90 import net.jxta.discovery.DiscoveryService;
91 import net.jxta.document.Advertisement;
92 import net.jxta.document.AdvertisementFactory;
93 import net.jxta.document.MimeMediaType;
94 import net.jxta.document.XMLDocument;
95 import net.jxta.document.TextDocument;
96 import net.jxta.document.TextElement;
97 import net.jxta.endpoint.EndpointAddress;
98 import net.jxta.endpoint.EndpointListener;
99 import net.jxta.endpoint.EndpointService;
100 import net.jxta.endpoint.Message;
101 import net.jxta.endpoint.Messenger;
102 import net.jxta.endpoint.MessageElement;
103 import net.jxta.endpoint.MessageTransport;
104 import net.jxta.endpoint.StringMessageElement;
105 import net.jxta.endpoint.TextDocumentMessageElement;
106 import net.jxta.id.ID;
107 import net.jxta.id.IDFactory;
108 import net.jxta.peer.PeerID;
109 import net.jxta.peergroup.PeerGroup;
110 import net.jxta.pipe.InputPipe;
111 import net.jxta.pipe.OutputPipe;
112 import net.jxta.pipe.PipeID;
113 import net.jxta.pipe.PipeMsgEvent;
114 import net.jxta.pipe.PipeMsgListener;
115 import net.jxta.pipe.PipeService;
116 import net.jxta.protocol.AccessPointAdvertisement;
117 import net.jxta.protocol.ConfigParams;
118 import net.jxta.protocol.PeerAdvertisement;
119 import net.jxta.protocol.PipeAdvertisement;
120 import net.jxta.protocol.RdvAdvertisement;
121 import net.jxta.protocol.RouteAdvertisement;
122 import net.jxta.rendezvous.RendezvousEvent;
123 import net.jxta.rendezvous.RendezvousListener;
124
125 import net.jxta.impl.access.AccessList;
126 import net.jxta.impl.config.Config;
127 import net.jxta.impl.endpoint.relay.RelayClient;
128 import net.jxta.impl.protocol.RdvConfigAdv;
129 import net.jxta.impl.rendezvous.RendezVousServiceImpl;
130 import net.jxta.impl.util.TimeUtils;
131 import net.jxta.impl.util.TimerThreadNamer;
132
133
134 /**
135 * This class models a Rendezvous Peer View (RPV):
136 * ordered collection of all other Rendezvous Peers visible to
137 * this Peer.
138 *
139 * <p/>Presently this class implements a random "diffusion" algorithm
140 * where each Peer periodically selects a randomly selected peer
141 * advertisement from its view and sends it over to a randomly
142 * selected peer from its view. Over time, this causes every peer to
143 * learn about every other peer, resulting in a "consistent" peer
144 * view.
145 *
146 * <p/>This diffusion process is bootstrapped by every peer sending their
147 * own peer advertisements to some well-known, stable, "seed" peers on
148 * startup.
149 **/
150 public final class PeerView implements EndpointListener, RendezvousListener {
151
152 /**
153 * Log4J Logger
154 **/
155 private static final transient Logger LOG = Logger.getLogger(PeerView.class.getName());
156
157 /**
158 * Our service name
159 **/
160 static final String SERVICE_NAME = "PeerView";
161
162 /**
163 * Namespace used for rdv message elements.
164 **/
165 static final String MESSAGE_NAMESPACE = "jxta";
166
167 /**
168 * Element name of outgoing messages.
169 **/
170 static final String MESSAGE_ELEMENT_NAME = "PeerView.PeerAdv";
171
172 /**
173 * Element name of responses.
174 */
175 static final String RESPONSE_ELEMENT_NAME = "PeerView.PeerAdv.Response";
176
177 /**
178 * Element name that specifies by its presence whether the enclosed
179 * RdvAdvertisement is a hint (cached), or is known to be active.
180 * In practice, when sending its own RdvAdvertisement,a peer does not
181 * set this element, but when sending another peer's RdvAdvertisement,
182 * this element is set.
183 */
184 static final String CACHED_RADV_ELEMENT_NAME = "PeerView.Cached";
185 static final MessageElement CACHED_RADV_ELEMENT = new StringMessageElement(CACHED_RADV_ELEMENT_NAME, Boolean.TRUE.toString(), null);
186
187 /**
188 * (optional) Element name that specifies the route advertisement of the
189 * source of the message.
190 */
191 static final String SRCROUTEADV_ELEMENT_NAME = "PeerView.SrcRouteAdv";
192
193 /**
194 * Element name that specifies by its presence whether the sender
195 * is configured as an edge peer. Edge peers do not become part of
196 * any peer's view.
197 */
198 static final String EDGE_ELEMENT_NAME = "PeerView.EdgePeer";
199 static final MessageElement EDGE_ELEMENT = new StringMessageElement(EDGE_ELEMENT_NAME, Boolean.TRUE.toString(), null);
200
201 /**
202 * Element name that specifies by its presence whether the enclosed
203 * RdvAdvertisement is about a failure.
204 */
205 static final String FAILURE_ELEMENT_NAME = "PeerView.Failure";
206 static final MessageElement FAILURE_ELEMENT = new StringMessageElement(FAILURE_ELEMENT_NAME, Boolean.TRUE.toString(), null);
207
208 /**
209 * This is the interval between adv exchange in seconds. This is
210 * the main tunable runtime parameter for the diffusion
211 * process. An interval that is too low will improve view
212 * consistency at the expense of gratuitous network traffic. On
213 * the other hand, an interval that is too high will cause the
214 * view to become inconsistent. It is desirable to err on the side
215 * of extra traffic.
216 */
217 private static final long DEFAULT_SEEDING_PERIOD = 5 * TimeUtils.ASECOND;
218
219 private static final long WATCHDOG_PERIOD = 30 * TimeUtils.ASECOND;
220 private static final long ACL_REFRESH_PERIOD = 30 * TimeUtils.AMINUTE;
221 private static final long WATCHDOG_GRACE_DELAY = 5 * TimeUtils.AMINUTE;
222
223 private static final long DEFAULT_BOOTSTRAP_KICK_INTERVAL = 3 * TimeUtils.ASECOND;
224
225 private static final int MIN_BOOTLEVEL = 0;
226 private static final int BOOTLEVEL_INCREMENT = 1;
227 private static final int MAX_EDGE_PEER_BOOTLEVEL = 8;
228 private static final int MAX_RDV_PEER_BOOTLEVEL = 6;
229
230 /**
231 * Default lifetimes for RdvAdvertisement that are published.
232 **/
233 private static final long DEFAULT_RADV_LIFETIME = 1 * TimeUtils.ADAY;
234 private static final long DEFAULT_RADV_EXPIRATION = 1 * TimeUtils.ANHOUR;
235
236 /**
237 * DEFAULT_SEEDING_RDVPEERS
238 *
239 * This value is the maximum number of rendezvous peers that will be
240 * send our own advertisement at boot time.
241 **/
242 private static final int DEFAULT_SEEDING_RDVPEERS = 5;
243
244 private final PeerGroup group;
245 private final PeerGroup advertisingGroup;
246 private final RendezVousServiceImpl rdvService;
247 private final EndpointService endpoint;
248 private long refreshTime =0;
249
250 /**
251 * The name of this PeerView.
252 *
253 * <p>FIXME 20040623 bondolo This should be a CodatID.
254 **/
255 private final String name;
256
257 /**
258 * Delay in relative milliseconds to apply before contacting seeding rdvs.
259 * 0 is supposed to be reserved by RdvConfig to mean "use the default".
260 * However, it is in fact a valid value and also the one we want for the default.
261 * The only problem with that is that it is not possible to configure this value
262 * explicitly, should it one day not be the default. The issue is actually in RdvConfig.
263 **/
264 private long seedingRdvConnDelay = 0;
265
266 /**
267 * Whether relays should be probed for rendezvous advertisements.
268 **/
269 private boolean probeRelays = true;
270
271 /**
272 * If the peerview is smaller than this we will try harder to find
273 * additional peerview members.
274 **/
275 private int minHappyPeerView = 4;
276
277 /**
278 * Whether we are restricted to using seed rdvs only.
279 **/
280 private boolean useOnlySeeds = false;
281
282 /**
283 * These are seed peers which were specified as part of the configuration
284 * data or programmatically. These seeds are never deleted.
285 *
286 * <ul>
287 * <li>Values are {@link java.net.URI}.</li>
288 * </ul>
289 */
290 private Set permanentSeedHosts = Collections.synchronizedSet( new HashSet() );
291
292 /**
293 * The active list of seed peers. Formed by the union of the permanent
294 * seed hosts and the lists of seed peers downloaded from the seeding
295 * URIs.
296 *
297 * <ul>
298 * <li>Values are {@link java.net.URI}.</li>
299 * </ul>
300 */
301 private Set activeSeedHosts = Collections.synchronizedSet( new HashSet() );
302
303 /**
304 * These URIs specify location of seed peer lists. The URIs will be resolved
305 * via URLConnection and are assumed to refer to plain text lists of URIs.
306 *
307 * <ul>
308 * <li>Values are {@link java.net.URI}.</li>
309 * </ul>
310 */
311 private Set seedingURIs = Collections.synchronizedSet( new HashSet() );
312
313 /**
314 * A single timer is used to periodically kick each PeerView
315 * into activity. For the Random PeerView, this activity consists
316 * of selecting a PeerViewElement at random from its view and
317 * sending it across to a randomly-selected peer from its view.
318 *
319 * <p/>FIXME 20021121 lomax
320 *
321 * <p/>The original idea of using a common timer in order to save threads IS a
322 * very good idea. However, limitations, and actually, bugs, in java.util.Timer
323 * creates the following problems when using a static Timer:
324 *
325 * <ul>
326 * <li>Memory Leak: Cancelling a TimerTask does not remove it from the
327 * execution queue of the Timer until the Timer is cancelled or the
328 * TimerTask is fired. Since most of the TimerTasks are inner classes
329 * this can mean that the PeerView is held around for a long time.</li>
330 *
331 * <li>java.util.Timer is not only not real-time (which is more or less fine
332 * for the PeerView, but it sequentially invokes tasks (only one Thread
333 * per Timer). As a result, tasks that takes a long time to run delays
334 * other tasks.</li>
335 * </ul>
336 *
337 * <p/>The PeerView would function better with a better Timer, but JDK does
338 * not provide a standard timer that would fullfill the needs of the
339 * PeerView. Maybe we should implement a JXTA Timer, since lots of the JXTA
340 * services, by being very asynchronous, rely on the same kind of timer
341 * semantics as the PeerView. Until then, creating a Timer per instance of
342 * the PeerView (i.e. per instance of joined PeerGroup) is the best
343 * workaround.
344 **/
345 private final Timer timer;
346
347 /**
348 * A random number generator.
349 */
350 private final static Random random = new Random();
351
352 /**
353 * List of scheduled tasks
354 **/
355 private final Set scheduledTasks = Collections.synchronizedSet(new HashSet());
356
357 /**
358 * Describes the frequency and amount of effort we will spend updating
359 * the peerview.
360 **/
361 private int bootLevel = MIN_BOOTLEVEL;
362
363 /**
364 * Earliest absolute time in milliseconds at which we will allow a reseed
365 * to take place.
366 **/
367 private long earliestReseed = 0L;
368
369 private final String uniqueGroupId;
370
371 private final Set rpvListeners = Collections.synchronizedSet(new HashSet());
372
373 /**
374 * Used for querying for pves.
375 **/
376 private InputPipe wirePipeInputPipe = null;
377
378 /**
379 * Used for querying for pves.
380 **/
381 private OutputPipe wirePipeOutputPipe = null;
382
383 /**
384 * Used for notifications about pve failures.
385 **/
386 private InputPipe localGroupWirePipeInputPipe = null;
387
388 /**
389 * Used for notifications about pve failures.
390 **/
391 private OutputPipe localGroupWirePipeOutputPipe = null;
392
393 /**
394 * A task which monitors the up and down peers in the peerview.
395 **/
396 private WatchdogTask watchdogTask = null;
397
398 /**
399 * This is the accumulated view by an instance of this class.
400 *
401 * <p/>Values are {@see net.jxta.impl.rendezvous.rpv.PeerViewElement}
402 */
403 private final SortedSet localView = Collections.synchronizedSortedSet(new TreeSet());
404
405 /**
406 * PVE for ourself.
407 *
408 * FIXME bondolo 20041015 This should be part of the local view.
409 **/
410 private final PeerViewElement self;
411 private PeerViewElement upPeer = null;
412 private PeerViewElement downPeer = null;
413
414 private final PeerViewStrategy replyStrategy;
415
416 private final PeerViewStrategy kickRecipientStrategy;
417
418 private final PeerViewStrategy kickAdvertisementStrategy;
419
420 private final PeerViewStrategy refreshRecipientStrategy;
421 protected final AccessList acl;
422 protected File aclFile;
423 protected long aclFileLastModified = 0;
424
425 // PeerAdv tracking.
426 private PeerAdvertisement lastPeerAdv = null;
427 private int lastModCount = -1;
428
429 private final PipeAdvertisement localGroupWirePipeAdv;
430 private final PipeAdvertisement advGroupPropPipeAdv;
431
432 /**
433 * If <code>true</code> then this Peer View instance is closed and is
434 * shutting down.
435 **/
436 private volatile boolean closed = false;
437
438 /**
439 * Get an instance of PeerView for the specified PeerGroup and Service.
440 *
441 * @param group Peer Group in which this Peer View instance operates.
442 * @param advertisingGroup Peer Group in which this Peer View instance will
443 * advertise and broadcast its existance.
444 * @param rdvService The rdvService we are to use.
445 * @param name The identifying name for this Peer View instance.
446 **/
447 public PeerView(PeerGroup group, PeerGroup advertisingGroup, RendezVousServiceImpl rdvService, String name ) {
448
449 this.group = group;
450 this.advertisingGroup = advertisingGroup;
451 this.rdvService = rdvService;
452 this.name = name;
453
454 this.endpoint = group.getEndpointService();
455 aclFile = new File(Config.JXTA_HOME + "rendezvousACL.xml");
456 aclFileLastModified = aclFile.lastModified();
457
458 this.acl = new AccessList();
459 try {
460 acl.init(aclFile);
461 this.refreshTime = System.currentTimeMillis() + ACL_REFRESH_PERIOD;
462 } catch (IOException io) {
463 acl.setGrantAll(true);
464 this.refreshTime = Long.MAX_VALUE;
465 if (LOG.isEnabledFor(Level.INFO)) {
466 LOG.info("PeerView Access Control granting all permissions");;
467 }
468 }
469
470 this.uniqueGroupId = group.getPeerGroupID().getUniqueValue().toString();
471
472 timer = new Timer(true);
473 timer.schedule(new TimerThreadNamer("PeerView Timer for " + group.getPeerGroupID()), 0);
474
475 ConfigParams confAdv = group.getConfigAdvertisement();
476
477 // Get the config. If we do not have a config, we're done; we just keep
478 // the defaults (edge peer/no auto-rdv)
479 if (confAdv != null) {
480 Advertisement adv = null;
481
482 try {
483 XMLDocument configDoc = (XMLDocument) confAdv.getServiceParam(rdvService.getAssignedID());
484 if (null != configDoc) {
485 // XXX 20041027 backwards compatibility
486 configDoc.addAttribute( "type", RdvConfigAdv.getAdvertisementType() );
487
488 adv = AdvertisementFactory.newAdvertisement(configDoc);
489 }
490 } catch (java.util.NoSuchElementException failed) {
491 ;
492 }
493
494 if (adv instanceof RdvConfigAdv) {
495 RdvConfigAdv rdvConfigAdv = (RdvConfigAdv) adv;
496
497 permanentSeedHosts.addAll(Arrays.asList(rdvConfigAdv.getSeedRendezvous()));
498
499 useOnlySeeds = rdvConfigAdv.getUseOnlySeeds();
500
501 seedingURIs.addAll(Arrays.asList(rdvConfigAdv.getSeedingURIs()));
502
503 if (rdvConfigAdv.getSeedRendezvousConnectDelay() > 0) {
504 seedingRdvConnDelay = rdvConfigAdv.getSeedRendezvousConnectDelay();
505 }
506
507 probeRelays = rdvConfigAdv.getProbeRelays();
508
509 if (rdvConfigAdv.getMinHappyPeerView() > 0) {
510 minHappyPeerView = rdvConfigAdv.getMinHappyPeerView();
511 }
512 }
513 }
514
515 lastPeerAdv = group.getPeerAdvertisement();
516 lastModCount = lastPeerAdv.getModCount();
517
518 // create a new local RdvAdvertisement and set it to self.
519 RdvAdvertisement radv = createRdvAdvertisement(lastPeerAdv, name);
520
521 self = new PeerViewElement(endpoint, radv);
522
523 // if ( rdvService.isRendezVous() ) {
524 // addPeerViewElement( self );
525 // }
526
527 // setup endpoint listener
528 endpoint.addIncomingMessageListener(this, SERVICE_NAME, uniqueGroupId);
529
530 // add rendezvous listener
531 rdvService.addListener(this);
532
533 // initialize strategies
534 replyStrategy = new PeerViewRandomWithReplaceStrategy(localView);
535
536 kickRecipientStrategy = new PeerViewRandomStrategy(localView);
537
538 kickAdvertisementStrategy = new PeerViewRandomWithReplaceStrategy(localView);
539
540 refreshRecipientStrategy = new PeerViewSequentialStrategy(localView);
541
542 localGroupWirePipeAdv = makeWirePipeAdvertisement(group);
543
544 if (null != advertisingGroup) {
545 advGroupPropPipeAdv = makeWirePipeAdvertisement(advertisingGroup);
546 } else {
547 advGroupPropPipeAdv = null;
548 }
549
550 if (LOG.isEnabledFor(Level.INFO)) {
551 LOG.info("PeerView created for group \"" + group.getPeerGroupName() + "\" [" + group.getPeerGroupID() + "] name \"" + name + "\"");
552 }
553 }
554
555 /**
556 * {@inheritDoc}
557 *
558 * Listener for "PeerView"/<peergroup-unique-id> and propagate pipes.
559 **/
560 public void processIncomingMessage(Message msg, EndpointAddress srcAddr, EndpointAddress dstAddr) {
561
562 // IsEdgePeer is confusing because the is* predicates are used
563 // to refer to the properties of the message we're processing.
564 boolean localIsEdge = !rdvService.isRendezVous();
565
566 // check what kind of message this is (response or not).
567 boolean isResponse = false;
568 MessageElement me = msg.getMessageElement(MESSAGE_NAMESPACE, MESSAGE_ELEMENT_NAME);
569
570 if (me == null) {
571 me = msg.getMessageElement(MESSAGE_NAMESPACE, RESPONSE_ELEMENT_NAME);
572 if (me == null) {
573 if (LOG.isEnabledFor(Level.WARN)) {
574 LOG.warn("Discarding damaged " + msg + ".");
575 }
576 return;
577 } else {
578 isResponse = true;
579 }
580 }
581
582 Advertisement adv;
583
584 try {
585 adv = AdvertisementFactory.newAdvertisement(me.getMimeType(), me.getStream());
586 } catch (IOException failed) {
587 if (LOG.isEnabledFor(Level.WARN)) {
588 LOG.warn("Failed building rdv advertisement", failed);
589 }
590 return;
591 } catch (NoSuchElementException failed) {
592 if (LOG.isEnabledFor(Level.WARN)) {
593 LOG.warn("Element is not a rdv advertisement", failed);
594 }
595 return;
596 } catch ( IllegalArgumentException failed ) {
597 if (LOG.isEnabledFor(Level.WARN)) {
598 LOG.warn("Could not build rdv advertisement", failed);
599 }
600 return;
601 }
602
603 if (!(adv instanceof RdvAdvertisement)) {
604 if (LOG.isEnabledFor(Level.WARN)) {
605 LOG.warn("Response does not contain radv (" + adv.getAdvertisementType() + ")");
606 }
607 return;
608 }
609
610 RdvAdvertisement radv = (RdvAdvertisement) adv;
611 // See if we can find a src route adv in the message.s
612 me = msg.getMessageElement(MESSAGE_NAMESPACE, SRCROUTEADV_ELEMENT_NAME);
613 if (me != null) {
614 try {
615 Advertisement routeAdv = AdvertisementFactory.newAdvertisement(me.getMimeType(), me.getStream());
616
617 if (!(routeAdv instanceof RouteAdvertisement)) {
618 if (LOG.isEnabledFor(Level.WARN)) {
619 LOG.warn("Advertisement is not a RouteAdvertisement");
620 }
621 } else {
622 RouteAdvertisement rdvRouteAdv = (RouteAdvertisement) radv.getRouteAdv().clone();
623
624 // XXX we stich them together even if in the end it gets optimized away
625 RouteAdvertisement.stichRoute(rdvRouteAdv, (RouteAdvertisement) routeAdv);
626 radv.setRouteAdv(rdvRouteAdv);
627 }
628 } catch (IOException failed) {
629 if (LOG.isEnabledFor(Level.WARN)) {
630 LOG.warn("Failed getting stream for message element", failed);
631 }
632 }
633 }
634 me = null;
635
636 // Is this a message about ourself?
637 if (group.getPeerID().equals(radv.getPeerID())) {
638 if (LOG.isEnabledFor(Level.DEBUG)) {
639 LOG.debug("Received a PeerView message about self. Discard.");
640 }
641
642 return;
643 }
644
645 // Collect the various flags.
646
647 boolean isFailure = (msg.getMessageElement(MESSAGE_NAMESPACE, FAILURE_ELEMENT_NAME) != null);
648 boolean isCached = (msg.getMessageElement(MESSAGE_NAMESPACE, CACHED_RADV_ELEMENT_NAME) != null);
649 boolean isFromEdge = (msg.getMessageElement(MESSAGE_NAMESPACE, EDGE_ELEMENT_NAME) != null);
650 boolean isTrusted = acl.isAllowed(radv.getPeerID());
651 if (!localIsEdge && !isFromEdge && !isTrusted) {
652 if (LOG.isEnabledFor(Level.DEBUG)) {
653 LOG.debug("Rejecting Rendezvous connection: " + radv.getPeerID().toString());
654 }
655 return;
656 }
657
658 if (LOG.isEnabledFor(Level.DEBUG)) {
659 String srcPeer = srcAddr.toString();
660
661 if( "jxta".equals(srcAddr.getProtocolName())) {
662 try {
663 String idstr = ID.URIEncodingName + ":" + ID.URNNamespace + ":" + srcAddr.getProtocolAddress();
664
665 ID asID = IDFactory.fromURI( new URI(idstr) );
666
667 PeerViewElement pve = getPeerViewElement( asID );
668 if( null != pve ) {
669 srcPeer = "\"" + pve.getRdvAdvertisement().getName() + "\"";
670 }
671 } catch( URISyntaxException failed ) {
672 }
673 }
674
675 LOG.debug(
676 "[" + group.getPeerGroupID() + "] Received a" + (isCached ? " cached" : "") + (isResponse ? " response" : "")
677 + (isFailure ? " failure" : "") + " message" + (isFromEdge ? " from edge" : "") + " regarding \"" + radv.getName() + "\" from "
678 + srcPeer);
679 }
680
681 // if this is a notification failure. All we have to do is locally
682 // process the failure
683 if (isFailure) {
684 notifyFailure(radv.getPeerID(), false);
685 return;
686 }
687
688 if (!isFromEdge && !isCached && isTrusted) {
689 DiscoveryService discovery = group.getDiscoveryService();
690
691 if (discovery != null) {
692 try {
693 discovery.publish(radv, DEFAULT_RADV_LIFETIME, DEFAULT_RADV_EXPIRATION);
694 } catch (IOException ex) {
695 if (LOG.isEnabledFor(Level.WARN)) {
696 LOG.warn("Could not publish " + radv.getName(), ex);
697 }
698 }
699 }
700 }
701
702 // Figure out if we know that peer already. If we do, reuse the pve
703 // that we have.
704 boolean isNewbie = false;
705 boolean added = false;
706 PeerViewElement pve;
707 int viewSize = 0;
708
709 synchronized (localView) {
710 PeerViewElement newbie = new PeerViewElement(endpoint, radv);
711 pve = getPeerViewElement(newbie);
712
713 if (null == pve) {
714 pve = newbie;
715 isNewbie = true;
716 }
717
718 if (!isFromEdge && !isCached && isTrusted) {
719 if (!useOnlySeeds || isSeedRdv(radv)) {
720 if (isNewbie) {
721 added = addPeerViewElement(pve);
722 } else {
723 pve.setRdvAdvertisement(radv);
724 }
725 }
726 }
727
728 viewSize = localView.size();
729 }
730
731 // Do the rest of the add related tasks out of synch.
732 // We must not nest any possibly synchronized ops in
733 // the LocalView lock; it's the lowest level.
734
735 if (added) {
736 // Notify local listeners
737 generateEvent(PeerViewEvent.ADD, pve);
738 }
739
740 /*
741 * Now, see what if any message we have to send as a result.
742 * There are four kind of messages we can send:
743 *
744 * - A response with ourselves, if we're being probed and we're
745 * a rdv.
746 *
747 * - A probe to the peer whose adv we received, because we want
748 * confirmation that it's alive.
749 *
750 * - A response with a random adv from our cache if we're being probed
751 *
752 * We may send more than one message.
753 */
754
755 // Type 1: respond with self.
756 // We need to do that whenever we're being probed and we're an rdv,
757 // and the adv we got is that of the sender (!cached - otherwise we
758 // can't respond to the sender, it's a kick message that tells us about
759 // another peer, which is handled by Type 2 below).
760 // This could happen along with Type 2 below.
761 if (!isCached && !localIsEdge && !isResponse) {
762 boolean status = send(pve, self, true, false);
763
764 if (LOG.isEnabledFor(Level.DEBUG)) {
765 LOG.debug("Type 1 (Respond with self) : Sent to " + pve + " result=" + status);
766 }
767 }
768
769 // Type 2: probe it.
770 // We need to probe if we do not have it in our view and it is
771 // a cached adv (from a third party, so non-authoritative) which
772 // can be found either in a response or in a kick message.
773 // OR, if it is a probe from a peer that we do not know (in which
774 // case we will probe here if it pretends to be an rdv, and also
775 // respond (see Type 1, above) - only if it is NOT a response).
776 // If isNewbie && added, isCached cannot be true, so we do not
777 // need to check for added; (isCached && isNewbie) is enough.
778 // If isNewbie && added, response cannot be false, so there is
779 // no need to check for added; (isNewbie && ! reponse) is enough.
780 // Whatchout: do not always probe cached things we got in a response
781 // because we'd likely get another response with another cached
782 // adv, thus cascading through all rdvs.
783 // What we do is to use the information only if our view is way small.
784 // in order to garantee connectivity for the future.
785
786 if ((isCached && isNewbie && (!isResponse || (viewSize < minHappyPeerView))) || (!isCached && isNewbie && !isResponse && !isFromEdge)) {
787
788 // If useOnlySeeds, we're not allowed to use other than our
789 // seed rdvs, so do not probe anything we learn from 3rd
790 // party. FIXME: jice@jxta.org 20030828 - in theory, it
791 // is possible that we receive an unsollicited response,
792 // which would lead us to insert a possibly non-seed in
793 // our peerview. For now, we just take that chance.
794 if (!useOnlySeeds) {
795
796 boolean status = send(pve, self, false, false);
797
798 if (LOG.isEnabledFor(Level.DEBUG)) {
799 LOG.debug("Type 2 (Probe PVE) : Probed " + pve + " result=" + status);
800 }
801 }
802 }
803
804 // Type 3: respond with random cached adv because being probed (likely
805 // by an edge, but we don't care, even another rdv could benefit from it).
806 // This could happen along with Type 2 above although it is rather
807 // unlikely.
808 // Respond with a strategized adv from our view.
809 //
810 // This always happens along with Type 1 and sometimes along with
811 // Type 2 in the same time: we could send three messages. That would
812 // be if we receive a probe from another rdv that we we do not know
813 // yet and we're an rdv ourselves. So, we'll respond with ourselves,
814 // we will probe the sender since it pretends to be an rdv, and we
815 // also will send a chosen rdv from our view and send it (done here).
816 // Note, it could mean a cascade of probes to all rdvs: the recipient
817 // of our cached adv would then probe it, thus receiving another
818 // cached adv, which it would probe, etc.
819 // This phenomenon is prevented in Type 2 by probing cached peers
820 // such responses only if the view is small enough.
821 // NOTE: rdv peers do not need this all that much and we could
822 // avoid it for them, but it should not cause much problems so we
823 // might as well leave it for now.
824
825 if (!isCached && !isResponse) {
826 // Someone is looking for rdvs. try to help.
827
828 PeerViewElement sendpve = replyStrategy.next();
829
830 if ((sendpve != null) && !pve.equals(sendpve) && !self.equals(sendpve)) {
831 boolean status = send(pve, sendpve, true, false);
832
833 if (LOG.isEnabledFor(Level.DEBUG)) {
834 LOG.debug("Type 3 (Respond with random pve) : Sent " + sendpve + " to " + pve + " result=" + status);
835 }
836 }
837 }
838 }
839
840 /**
841 * {@inheritDoc}
842 **/
843 public void rendezvousEvent(RendezvousEvent event) {
844
845 if (closed) {
846 return;
847 }
848
849 boolean notifyFailure = false;
850
851 synchronized (this) {
852
853 int theEventType = event.getType();
854
855 if (LOG.isEnabledFor(Level.DEBUG)) {
856 LOG.debug("[" + group.getPeerGroupName() + "] Processing " + event);
857 }
858
859 refreshSelf();
860
861 if ((RendezvousEvent.BECAMERDV == theEventType) || (RendezvousEvent.BECAMEEDGE == theEventType)) {
862 // kill any existing watchdog task
863 if (null != watchdogTask) {
864 removeTask(watchdogTask);
865 watchdogTask.cancel();
866 watchdogTask = null;
867 }
868 }
869
870 switch (theEventType) {
871 case RendezvousEvent.RDVCONNECT:
872 case RendezvousEvent.RDVRECONNECT:
873 case RendezvousEvent.CLIENTCONNECT:
874 case RendezvousEvent.CLIENTRECONNECT:
875 case RendezvousEvent.RDVFAILED:
876 case RendezvousEvent.RDVDISCONNECT:
877 case RendezvousEvent.CLIENTFAILED:
878 case RendezvousEvent.CLIENTDISCONNECT:
879 break;
880
881 case RendezvousEvent.BECAMERDV:
882 openWirePipes();
883 watchdogTask = new WatchdogTask(this);
884 addTask(watchdogTask, WATCHDOG_PERIOD, WATCHDOG_PERIOD);
885 rescheduleKick(true);
886 break;
887
888 case RendezvousEvent.BECAMEEDGE:
889 openWirePipes();
890 if (!localView.isEmpty()) {
891 // FIXME bondolo 20040229 since we likely don't have a
892 // rendezvous connection, it is kind of silly to be sending
893 // this now. probably should wait until we get a rendezvous
894 // connection.
895 notifyFailure = true;
896 }
897 rescheduleKick(true);
898 break;
899
900 default:
901 if (LOG.isEnabledFor(Level.WARN)) {
902 LOG.warn("[" + group.getPeerGroupName() + "] Unexpected RDV event : " + event);
903 }
904 break;
905 }
906 }
907
908 // we can't do the notification under synchronization.
909 if (notifyFailure) {
910 notifyFailure(self, true);
911 }
912 }
913
914 public void start() {
915 // do nothing for now... all the good stuff happens as a result of
916 // rendezvous events.
917 }
918
919 public void stop() {
920
921 synchronized (this) {
922 // Only one thread gets to perform the shutdown.
923 if (closed) {
924 return;
925 }
926 closed = true;
927 }
928
929 // notify other rendezvous peers that we are going down (only
930 // if this peer is a rendezvous)
931 if (rdvService.isRendezVous()) {
932 notifyFailure(self, true);
933 }
934
935 // From now on we can nullify everything we want. Other threads check
936 // the closed flag where it matters.
937 synchronized (this) {
938 if (watchdogTask != null) {
939 removeTask(watchdogTask);
940 watchdogTask.cancel();
941 watchdogTask = null;
942 }
943
944 // Remove message listener.
945 endpoint.removeIncomingMessageListener(SERVICE_NAME, uniqueGroupId);
946
947 // Remove rendezvous listener.
948 rdvService.removeListener(this);
949
950 // Remove all our pending scheduled tasks
951 // Carefull with the indices while removing: do it backwards, it's
952 // cheaper and simpler.
953
954 synchronized (scheduledTasks) {
955 Iterator eachTask = scheduledTasks.iterator();
956
957 while (eachTask.hasNext()) {
958 try {
959 TimerTask task = (TimerTask) eachTask.next();
960
961 task.cancel();
962 eachTask.remove();
963 } catch (Exception ez1) {
964 if (LOG.isEnabledFor(Level.WARN)) {
965 LOG.warn("Cannot cancel task: ", ez1);
966 }
967 continue;
968 }
969 }
970 }
971
972 // Make sure that we close our WirePipes
973 closeWirePipes();
974
975 // Let go of the up and down peers.
976 downPeer = null;
977 upPeer = null;
978 localView.clear();
979
980 timer.cancel();
981
982 rpvListeners.clear();
983 }
984 }
985
986 protected void addTask(TimerTask task, long delay, long interval) {
987
988 synchronized (scheduledTasks) {
989 if (scheduledTasks.contains(task)) {
990 if (LOG.isEnabledFor(Level.WARN)) {
991 LOG.warn("Task list already contains specified task.");
992 }
993 }
994
995 scheduledTasks.add(task);
996 }
997
998 if (interval >= 1) {
999 timer.schedule(task, delay, interval);
1000 } else {
1001 timer.schedule(task, delay);
1002 }
1003 }
1004
1005 protected void removeTask(TimerTask task) {
1006 scheduledTasks.remove(task);
1007 }
1008
1009 /**
1010 * Adds the specified URI to the list of seeds. Even if useOnlySeeds is in
1011 * effect, this seed may now be used, as if it was part of the initial
1012 * configuration.
1013 *
1014 * @param seed the URI of the seed rendezvous.
1015 **/
1016 public void addSeed(URI seed) {
1017 permanentSeedHosts.add(seed);
1018 }
1019
1020 /**
1021 * Probe the specified peer immediately.
1022 *
1023 * <p/> Note: If "useOnlySeeds" is in effect and the peer is not a seed, any response to this probe will be ignored.
1024 **/
1025 public boolean probeAddress(EndpointAddress address, Object hint) {
1026
1027 PeerViewElement holdIt = null;
1028
1029 synchronized(localView) {
1030 holdIt = self;
1031 }
1032
1033 return send(address, hint, holdIt, false, false);
1034 }
1035
1036 /**
1037 * Send our own advertisement to all of the seed rendezvous.
1038 */
1039 public void seed() {
1040 long reseedRemaining = earliestReseed - TimeUtils.timeNow();
1041
1042 if (reseedRemaining > 0) {
1043 // Too early; the previous round is not even done.
1044 if (LOG.isEnabledFor(Level.INFO)) {
1045 LOG.info("Still Seeding for " + reseedRemaining + "ms.");
1046 }
1047 return;
1048 }
1049
1050 if (LOG.isEnabledFor(Level.INFO)) {
1051 LOG.info("New Seeding...");
1052 }
1053
1054 // Schedule sending propagated query to our local network neighbors.
1055 timedSend(self, (EndpointAddress) null, DEFAULT_SEEDING_PERIOD * 2);
1056
1057 List seedRdvs = new ArrayList(permanentSeedHosts);
1058
1059 if (useOnlySeeds || (localView.size() < minHappyPeerView)) {
1060 // We start try configured seed peers only after some time, so we make sure that
1061 // the topology will not tend to be centralized.
1062
1063 Iterator allSeedingURIs = seedingURIs.iterator();
1064 while(allSeedingURIs.hasNext()) {
1065 URI aURI = (URI) allSeedingURIs.next();
1066 try {
1067 seedRdvs.addAll(Arrays.asList(loadSeeds(aURI)));
1068 } catch( IOException failed ) {
1069 if (LOG.isEnabledFor(Level.WARN)) {
1070 LOG.warn("Failed loading seeding list from : " + aURI );
1071 }
1072 }
1073 }
1074 }
1075
1076 synchronized( activeSeedHosts ) {
1077 activeSeedHosts.clear();
1078 activeSeedHosts.addAll( seedRdvs );
1079 }
1080
1081 long iterations = 0;
1082
1083 if(localView.size() < minHappyPeerView) {
1084 // We only do these things if we don't have a "happy" Peer View.
1085 // If the Peer View is already "happy" then we will use only
1086 // Peer View referrals for learning of new entires.
1087
1088 Collections.shuffle(seedRdvs);
1089
1090 while ( !seedRdvs.isEmpty() ) {
1091 if (sendRandomByAddr(seedRdvs, DEFAULT_SEEDING_RDVPEERS, seedingRdvConnDelay + DEFAULT_SEEDING_PERIOD * iterations)) {
1092 ++iterations;
1093 }
1094 }
1095
1096 if ( !useOnlySeeds ) {
1097 // If use only seeds, we're not allowed to put in the peerview
1098 // anything but our seed rdvs. So, we've done everything that
1099 // was required.
1100
1101 // Schedule sending propagated query to our advertising group
1102 if (advertisingGroup != null) {
1103 // send it, but not immediately.
1104 scheduleAdvertisingGroupQuery(DEFAULT_SEEDING_PERIOD * 2);
1105 }
1106
1107 // send own advertisement to a random set of rendezvous
1108 List rdvs = discoverRdvAdverisements();
1109
1110 Collections.shuffle(rdvs);
1111
1112 while ( !rdvs.isEmpty() ) {
1113 if (sendRandomByAdv(rdvs, DEFAULT_SEEDING_RDVPEERS, DEFAULT_SEEDING_PERIOD * iterations)) {
1114 ++iterations;
1115 }
1116 }
1117
1118 if (probeRelays) {
1119 List relays = getRelayPeers();
1120
1121 Collections.shuffle(relays);
1122
1123 while ( !relays.isEmpty() ) {
1124 if (sendRandomByAddr(relays, DEFAULT_SEEDING_RDVPEERS, DEFAULT_SEEDING_PERIOD * iterations)) {
1125 ++iterations;
1126 }
1127 }
1128 }
1129 }
1130 }
1131
1132 earliestReseed = TimeUtils.toAbsoluteTimeMillis(seedingRdvConnDelay + (DEFAULT_SEEDING_PERIOD * iterations));
1133 }
1134
1135 /**
1136 * Evaluates if the given pve corresponds to one of our seed rdvs. This is
1137 * to support the useOnlySeeds flag. The test is not completely foolproof
1138 * since our list of seed rdvs is just transport addresses. We could be
1139 * given a pve that exhibits an address that corresponds to one of our seeds
1140 * but is fake. And we might later succeed in connecting to that rdv via one
1141 * the other, real addresses. As a result, useOnlySeeds is *not* a security
1142 * feature, just a convenience for certain kind of deployments. Seed
1143 * rdvs should include certificates for such a restriction to be a security
1144 * feature.
1145 */
1146 private boolean isSeedRdv(RdvAdvertisement rdvAdv) {
1147
1148 RouteAdvertisement radv = rdvAdv.getRouteAdv();
1149
1150 if (radv == null) {
1151 return false;
1152 }
1153
1154 AccessPointAdvertisement apAdv = radv.getDest();
1155
1156 if (apAdv == null) {
1157 return false;
1158 }
1159
1160 // The accessPointAdv returns a live (!) copy of the endpoint addresses.
1161 List addrList = new ArrayList(apAdv.getVectorEndpointAddresses());
1162
1163 if (addrList.isEmpty()) {
1164 return false;
1165 }
1166
1167 ListIterator eachAddr = addrList.listIterator();
1168
1169 // convert each string to a URI
1170 while (eachAddr.hasNext()) {
1171 String anAddr = (String) eachAddr.next();
1172
1173 try {
1174 // Convert to URI to compare with seedHosts
1175 eachAddr.set( new URI(anAddr));
1176 } catch (URISyntaxException badURI) {
1177 if (LOG.isEnabledFor(Level.WARN)) {
1178 LOG.warn("Skipping bad URI : " + anAddr, badURI);
1179 }
1180 }
1181 }
1182
1183 // seedList must be treated as read-only.
1184 // We do what we want with addrVect
1185
1186 addrList.retainAll(activeSeedHosts);
1187
1188 // What's left is the intersection of seedHosts and the set of
1189 // endpoint addresses in the given pve. If it is non-empty, then we
1190 // accept the pve as that of a seed host.
1191 return (!addrList.isEmpty());
1192 }
1193
1194 /**
1195 * Make sure that the PeerView properly changes behavior, when switching
1196 * from edge mode to rdv mode, and vice-versa.
1197 * Since openWirePipes() requires some other services such as the Pipe
1198 * Service, and since updateStatus is invoked this work must happen in
1199 * background, giving a chance to other services to be started.
1200 **/
1201 private class OpenPipesTask extends TimerTask {
1202
1203 /**
1204 * {@inheritDoc}
1205 **/
1206 public void run() {
1207 try {
1208 if (closed) {
1209 return;
1210 }
1211
1212 openWirePipes();
1213 } catch (Throwable all) {
1214 if (LOG.isEnabledFor(Level.FATAL)) {
1215 LOG.fatal("Uncaught Throwable in thread: " + Thread.currentThread().getName(), all);
1216 }
1217 } finally {
1218 removeTask(this);
1219 }
1220 }
1221 }
1222
1223 private void scheduleOpenPipes(long delay) {
1224
1225 if (LOG.isEnabledFor(Level.DEBUG)) {
1226 LOG.debug("Scheduling open pipes attempt in " + delay + "ms.");
1227 }
1228
1229 addTask(new OpenPipesTask(), delay, -1);
1230 }
1231
1232 /**
1233 * Send a PeerView Message to the specified peer.
1234 *
1235 * @param response indicates whether this is a response. Otherwise
1236 * we may create a distributed loop where peers keep perpetually
1237 * responding to each-other.
1238 * @param failure Construct the message as a failure notification.
1239 **/
1240 private boolean send(PeerViewElement dest, PeerViewElement pve, boolean response, boolean failure) {
1241
1242 Message msg = makeMessage(pve, response, failure);
1243
1244 boolean result = dest.sendMessage(msg, SERVICE_NAME, uniqueGroupId);
1245
1246 if (LOG.isEnabledFor(Level.DEBUG)) {
1247 LOG.debug("Sending " + msg + " to " + dest + " success = " + result);
1248 }
1249
1250 return result;
1251 }
1252
1253 /**
1254 * Send a PeerView Message to the specified peer.
1255 *
1256 * @param response indicates whether this is a response. Otherwise
1257 * we may create a distributed loop where peers keep perpetually
1258 * responding to each-other.
1259 * @param failure Construct the message as a failure notification.
1260 **/
1261 private boolean send(EndpointAddress dest, Object hint,
1262 PeerViewElement pve, boolean response, boolean failure) {
1263
1264 Message msg = makeMessage(pve, response, failure);
1265
1266 if (null != dest) {
1267 EndpointAddress realAddr = new EndpointAddress(dest, SERVICE_NAME, uniqueGroupId);
1268
1269 Messenger messenger = rdvService.endpoint.getMessengerImmediate(realAddr, hint);
1270
1271 if (null<