Docjar: A Java Source and Docuemnt Enginecom.*    java.*    javax.*    org.*    all    new    plug-in

Quick Search    Search Deep

Source code: net/jxta/impl/rendezvous/rpv/PeerView.java


1   /*
2    * Copyright (c) 2002-2004 Sun Microsystems, Inc.  All rights reserved.
3    *
4    * Redistribution and use in source and binary forms, with or without
5    * modification, are permitted provided that the following conditions
6    * are met:
7    *
8    * 1. Redistributions of source code must retain the above copyright
9    *    notice, this list of conditions and the following disclaimer.
10   *
11   * 2. Redistributions in binary form must reproduce the above copyright
12   *    notice, this list of conditions and the following disclaimer in
13   *    the documentation and/or other materials provided with the
14   *    distribution.
15   *
16   * 3. The end-user documentation included with the redistribution,
17   *    if any, must include the following acknowledgment:
18   *       "This product includes software developed by the
19   *       Sun Microsystems, Inc. for Project JXTA."
20   *    Alternately, this acknowledgment may appear in the software itself,
21   *    if and wherever such third-party acknowledgments normally appear.
22   *
23   * 4. The names "Sun", "Sun Microsystems, Inc.", "JXTA" and
24   *    "Project JXTA" must not be used to endorse or promote products
25   *    derived from this software without prior written permission.
26   *    For written permission, please contact Project JXTA at
27   *    http://www.jxta.org.
28   *
29   * 5. Products derived from this software may not be called "JXTA",
30   *    nor may "JXTA" appear in their name, without prior written
31   *    permission of Sun.
32   *
33   * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
34   * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
35   * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
36   * DISCLAIMED.  IN NO EVENT SHALL SUN MICROSYSTEMS OR
37   * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
38   * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
39   * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
40   * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
41   * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
42   * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
43   * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
44   * SUCH DAMAGE.
45   * ====================================================================
46   *
47   * This software consists of voluntary contributions made by many
48   * individuals on behalf of Project JXTA.  For more
49   * information on Project JXTA, please see
50   * <http://www.jxta.org/>.
51   *
52   * This license is based on the BSD license adopted by the Apache
53   * Foundation.
54   *
55   * $Id: PeerView.java,v 1.92 2004/12/07 20:17:41 bondolo Exp $
56   */
57  
58  package net.jxta.impl.rendezvous.rpv;
59  
60  import java.io.BufferedReader;
61  import java.io.InputStream;
62  import java.io.InputStreamReader;
63  import java.net.URI;
64  import java.net.URL;
65  import java.net.URLConnection;
66  import java.util.ArrayList;
67  import java.util.Arrays;
68  import java.util.Collections;
69  import java.util.Enumeration;
70  import java.util.HashSet;
71  import java.util.Iterator;
72  import java.util.List;
73  import java.util.ListIterator;
74  import java.util.Random;
75  import java.util.Set;
76  import java.util.SortedSet;
77  import java.util.Timer;
78  import java.util.TimerTask;
79  import java.util.TreeSet;
80  import java.util.Vector;
81  
82  import java.io.IOException;
83  import java.io.File;
84  import java.net.URISyntaxException;
85  import java.util.NoSuchElementException;
86  
87  import org.apache.log4j.Level;
88  import org.apache.log4j.Logger;
89  
90  import net.jxta.discovery.DiscoveryService;
91  import net.jxta.document.Advertisement;
92  import net.jxta.document.AdvertisementFactory;
93  import net.jxta.document.MimeMediaType;
94  import net.jxta.document.XMLDocument;
95  import net.jxta.document.TextDocument;
96  import net.jxta.document.TextElement;
97  import net.jxta.endpoint.EndpointAddress;
98  import net.jxta.endpoint.EndpointListener;
99  import net.jxta.endpoint.EndpointService;
100 import net.jxta.endpoint.Message;
101 import net.jxta.endpoint.Messenger;
102 import net.jxta.endpoint.MessageElement;
103 import net.jxta.endpoint.MessageTransport;
104 import net.jxta.endpoint.StringMessageElement;
105 import net.jxta.endpoint.TextDocumentMessageElement;
106 import net.jxta.id.ID;
107 import net.jxta.id.IDFactory;
108 import net.jxta.peer.PeerID;
109 import net.jxta.peergroup.PeerGroup;
110 import net.jxta.pipe.InputPipe;
111 import net.jxta.pipe.OutputPipe;
112 import net.jxta.pipe.PipeID;
113 import net.jxta.pipe.PipeMsgEvent;
114 import net.jxta.pipe.PipeMsgListener;
115 import net.jxta.pipe.PipeService;
116 import net.jxta.protocol.AccessPointAdvertisement;
117 import net.jxta.protocol.ConfigParams;
118 import net.jxta.protocol.PeerAdvertisement;
119 import net.jxta.protocol.PipeAdvertisement;
120 import net.jxta.protocol.RdvAdvertisement;
121 import net.jxta.protocol.RouteAdvertisement;
122 import net.jxta.rendezvous.RendezvousEvent;
123 import net.jxta.rendezvous.RendezvousListener;
124 
125 import net.jxta.impl.access.AccessList;
126 import net.jxta.impl.config.Config;
127 import net.jxta.impl.endpoint.relay.RelayClient;
128 import net.jxta.impl.protocol.RdvConfigAdv;
129 import net.jxta.impl.rendezvous.RendezVousServiceImpl;
130 import net.jxta.impl.util.TimeUtils;
131 import net.jxta.impl.util.TimerThreadNamer;
132 
133 
134 /**
135  * This class models a Rendezvous Peer View (RPV):
136  * ordered collection of all other Rendezvous Peers visible to
137  * this Peer.
138  *
139  * <p/>Presently this class implements a random "diffusion" algorithm
140  * where each Peer periodically selects a randomly selected peer
141  * advertisement from its view and sends it over to a randomly
142  * selected peer from its view. Over time, this causes every peer to
143  * learn about every other peer, resulting in a "consistent" peer
144  * view.
145  *
146  * <p/>This diffusion process is bootstrapped by every peer sending their
147  * own peer advertisements to some well-known, stable, "seed" peers on
148  * startup.
149  **/
150 public final class PeerView implements EndpointListener, RendezvousListener {
151     
152     /**
153      *  Log4J Logger
154      **/
155     private static final transient Logger LOG = Logger.getLogger(PeerView.class.getName());
156     
157     /**
158      *  Our service name
159      **/
160     static final String SERVICE_NAME = "PeerView";
161     
162     /**
163      *  Namespace used for rdv message elements.
164      **/
165     static final String MESSAGE_NAMESPACE = "jxta";
166     
167     /**
168      * Element name of outgoing messages.
169      **/
170     static final String MESSAGE_ELEMENT_NAME = "PeerView.PeerAdv";
171     
172     /**
173      * Element name of responses.
174      */
175     static final String RESPONSE_ELEMENT_NAME = "PeerView.PeerAdv.Response";
176     
177     /**
178      * Element name that specifies by its presence whether the enclosed
179      * RdvAdvertisement is a hint (cached), or is known to be active.
180      * In practice, when sending its own RdvAdvertisement,a peer does not
181      * set this element, but when sending another peer's RdvAdvertisement,
182      * this element is set.
183      */
184     static final String CACHED_RADV_ELEMENT_NAME = "PeerView.Cached";
185     static final MessageElement CACHED_RADV_ELEMENT = new StringMessageElement(CACHED_RADV_ELEMENT_NAME, Boolean.TRUE.toString(), null);
186     
187     /**
188      * (optional) Element name that specifies the route advertisement of the
189      * source of the message.
190      */
191     static final String SRCROUTEADV_ELEMENT_NAME = "PeerView.SrcRouteAdv";
192     
193     /**
194      * Element name that specifies by its presence whether the sender
195      * is configured as an edge peer. Edge peers do not become part of
196      * any peer's view.
197      */
198     static final String EDGE_ELEMENT_NAME = "PeerView.EdgePeer";
199     static final MessageElement EDGE_ELEMENT = new StringMessageElement(EDGE_ELEMENT_NAME, Boolean.TRUE.toString(), null);
200     
201     /**
202      * Element name that specifies by its presence whether the enclosed
203      * RdvAdvertisement is about a failure.
204      */
205     static final String FAILURE_ELEMENT_NAME = "PeerView.Failure";
206     static final MessageElement FAILURE_ELEMENT = new StringMessageElement(FAILURE_ELEMENT_NAME, Boolean.TRUE.toString(), null);
207     
208     /**
209      * This is the interval between adv exchange in seconds. This is
210      * the main tunable runtime parameter for the diffusion
211      * process. An interval that is too low will improve view
212      * consistency at the expense of gratuitous network traffic. On
213      * the other hand, an interval that is too high will cause the
214      * view to become inconsistent. It is desirable to err on the side
215      * of extra traffic.
216      */
217     private static final long DEFAULT_SEEDING_PERIOD = 5 * TimeUtils.ASECOND;
218     
219     private static final long WATCHDOG_PERIOD = 30 * TimeUtils.ASECOND;
220     private static final long ACL_REFRESH_PERIOD = 30 * TimeUtils.AMINUTE;
221     private static final long WATCHDOG_GRACE_DELAY = 5 * TimeUtils.AMINUTE;
222     
223     private static final long DEFAULT_BOOTSTRAP_KICK_INTERVAL = 3 * TimeUtils.ASECOND;
224     
225     private static final int MIN_BOOTLEVEL = 0;
226     private static final int BOOTLEVEL_INCREMENT = 1;
227     private static final int MAX_EDGE_PEER_BOOTLEVEL = 8;
228     private static final int MAX_RDV_PEER_BOOTLEVEL = 6;
229     
230     /**
231      * Default lifetimes for RdvAdvertisement that are published.
232      **/
233     private static final long DEFAULT_RADV_LIFETIME = 1 * TimeUtils.ADAY;
234     private static final long DEFAULT_RADV_EXPIRATION = 1 * TimeUtils.ANHOUR;
235     
236     /**
237      * DEFAULT_SEEDING_RDVPEERS
238      *
239      * This value is the maximum number of rendezvous peers that will be
240      * send our own advertisement at boot time.
241      **/
242     private static final int DEFAULT_SEEDING_RDVPEERS = 5;
243     
244     private final PeerGroup group;
245     private final PeerGroup advertisingGroup;
246     private final RendezVousServiceImpl rdvService;
247     private final EndpointService endpoint;
248     private long refreshTime =0;
249     
250     /**
251      *  The name of this PeerView.
252      *
253      *  <p>FIXME 20040623 bondolo This should be a CodatID.
254      **/
255     private final String name;
256     
257     /**
258      * Delay in relative milliseconds to apply before contacting seeding rdvs.
259      * 0 is supposed to be reserved by RdvConfig to mean "use the default".
260      * However, it is in fact a valid value and also the one we want for the default.
261      * The only problem with that is that it is not possible to configure this value
262      * explicitly, should it one day not be the default. The issue is actually in RdvConfig.
263      **/
264     private long seedingRdvConnDelay = 0;
265     
266     /**
267      *  Whether relays should be probed for rendezvous advertisements.
268      **/
269     private boolean probeRelays = true;
270     
271     /**
272      *  If the peerview is smaller than this we will try harder to find
273      *  additional peerview members.
274      **/
275     private int minHappyPeerView = 4;
276     
277     /**
278      * Whether we are restricted to using seed rdvs only.
279      **/
280     private boolean useOnlySeeds = false;
281     
282     /**
283      * These are seed peers which were specified as part of the configuration
284      * data or programmatically. These seeds are never deleted.
285      *
286      *  <ul>
287      *      <li>Values are {@link java.net.URI}.</li>
288      *  </ul>
289      */
290     private Set permanentSeedHosts = Collections.synchronizedSet( new HashSet() );
291     
292     /**
293      * The active list of seed peers. Formed by the union of the permanent
294      * seed hosts and the lists of seed peers downloaded from the seeding
295      * URIs.
296      *
297      *  <ul>
298      *      <li>Values are {@link java.net.URI}.</li>
299      *  </ul>
300      */
301     private Set activeSeedHosts = Collections.synchronizedSet( new HashSet() );
302     
303     /**
304      * These URIs specify location of seed peer lists. The URIs will be resolved
305      * via URLConnection and are assumed to refer to plain text lists of URIs.
306      *
307      *  <ul>
308      *      <li>Values are {@link java.net.URI}.</li>
309      *  </ul>
310      */
311     private Set seedingURIs = Collections.synchronizedSet( new HashSet() );
312     
313     /**
314      * A single timer is used to periodically kick each PeerView
315      * into activity. For the Random PeerView, this activity consists
316      * of selecting a PeerViewElement at random from its view and
317      * sending it across to a randomly-selected peer from its view.
318      *
319      * <p/>FIXME 20021121 lomax
320      *
321      * <p/>The original idea of using a common timer in order to save threads IS a
322      * very good idea. However, limitations, and actually, bugs, in java.util.Timer
323      * creates the following problems when using a static Timer:
324      *
325      *  <ul>
326      *  <li>Memory Leak: Cancelling a TimerTask does not remove it from the
327      *     execution queue of the Timer until the Timer is cancelled or the
328      *     TimerTask is fired. Since most of the TimerTasks are inner classes
329      *     this can mean that the PeerView is held around for a long time.</li>
330      *
331      *  <li>java.util.Timer is not only not real-time (which is more or less fine
332      *     for the PeerView, but it sequentially invokes tasks (only one Thread
333      *     per Timer). As a result, tasks that takes a long time to run delays
334      *     other tasks.</li>
335      *  </ul>
336      *
337      * <p/>The PeerView would function better with a better Timer, but JDK does
338      * not provide a standard timer that would fullfill the needs of the
339      * PeerView. Maybe we should implement a JXTA Timer, since lots of the JXTA
340      * services, by being very asynchronous, rely on the same kind of timer
341      * semantics as the PeerView. Until then, creating a Timer per instance of
342      * the PeerView (i.e. per instance of joined PeerGroup) is the best
343      * workaround.
344      **/
345     private final Timer timer;
346     
347     /**
348      * A random number generator.
349      */
350     private final static Random random = new Random();
351     
352     /**
353      * List of scheduled tasks
354      **/
355     private final Set scheduledTasks = Collections.synchronizedSet(new HashSet());
356     
357     /**
358      *  Describes the frequency and amount of effort we will spend updating
359      *  the peerview.
360      **/
361     private int bootLevel = MIN_BOOTLEVEL;
362     
363     /**
364      *  Earliest absolute time in milliseconds at which we will allow a reseed
365      *  to take place.
366      **/
367     private long earliestReseed = 0L;
368     
369     private final String uniqueGroupId;
370     
371     private final Set rpvListeners = Collections.synchronizedSet(new HashSet());
372     
373     /**
374      *  Used for querying for pves.
375      **/
376     private InputPipe wirePipeInputPipe = null;
377     
378     /**
379      *  Used for querying for pves.
380      **/
381     private OutputPipe wirePipeOutputPipe = null;
382     
383     /**
384      *  Used for notifications about pve failures.
385      **/
386     private InputPipe localGroupWirePipeInputPipe = null;
387     
388     /**
389      *  Used for notifications about pve failures.
390      **/
391     private OutputPipe localGroupWirePipeOutputPipe = null;
392     
393     /**
394      *  A task which monitors the up and down peers in the peerview.
395      **/
396     private WatchdogTask watchdogTask = null;
397     
398     /**
399      * This is the accumulated view by an instance of this class.
400      *
401      * <p/>Values are {@see net.jxta.impl.rendezvous.rpv.PeerViewElement}
402      */
403     private final SortedSet localView = Collections.synchronizedSortedSet(new TreeSet());
404     
405     /**
406      *  PVE for ourself.
407      *
408      *  FIXME bondolo 20041015 This should be part of the local view.
409      **/
410     private final PeerViewElement self;
411     private PeerViewElement upPeer = null;
412     private PeerViewElement downPeer = null;
413     
414     private final PeerViewStrategy replyStrategy;
415     
416     private final PeerViewStrategy kickRecipientStrategy;
417     
418     private final PeerViewStrategy kickAdvertisementStrategy;
419     
420     private final PeerViewStrategy refreshRecipientStrategy;
421     protected final AccessList acl;
422     protected  File aclFile;
423     protected long aclFileLastModified = 0;
424     
425     // PeerAdv tracking.
426     private PeerAdvertisement lastPeerAdv = null;
427     private int lastModCount = -1;
428     
429     private final PipeAdvertisement localGroupWirePipeAdv;
430     private final PipeAdvertisement advGroupPropPipeAdv;
431     
432     /**
433      *  If <code>true</code> then this Peer View instance is closed and is
434      *  shutting down.
435      **/
436     private volatile boolean closed = false;
437     
438     /**
439      *  Get an instance of PeerView for the specified PeerGroup and Service.
440      *
441      *  @param group Peer Group in which this Peer View instance operates.
442      *  @param advertisingGroup Peer Group in which this Peer View instance will
443      *  advertise and broadcast its existance.
444      *  @param rdvService The rdvService we are to use.
445      *  @param name The identifying name for this Peer View instance.
446      **/
447     public PeerView(PeerGroup group, PeerGroup advertisingGroup, RendezVousServiceImpl rdvService, String name ) {
448         
449         this.group = group;
450         this.advertisingGroup = advertisingGroup;
451         this.rdvService = rdvService;
452         this.name = name;
453         
454         this.endpoint = group.getEndpointService();
455         aclFile = new File(Config.JXTA_HOME + "rendezvousACL.xml");
456         aclFileLastModified = aclFile.lastModified();
457         
458         this.acl = new AccessList();
459         try {
460             acl.init(aclFile);
461             this.refreshTime = System.currentTimeMillis() + ACL_REFRESH_PERIOD;
462         } catch (IOException io) {
463             acl.setGrantAll(true);
464             this.refreshTime = Long.MAX_VALUE;
465             if (LOG.isEnabledFor(Level.INFO)) {
466                  LOG.info("PeerView Access Control granting all permissions");;
467             }
468         }
469 
470         this.uniqueGroupId = group.getPeerGroupID().getUniqueValue().toString();
471 
472         timer = new Timer(true);
473         timer.schedule(new TimerThreadNamer("PeerView Timer for " + group.getPeerGroupID()), 0);
474 
475         ConfigParams confAdv = group.getConfigAdvertisement();
476 
477         // Get the config. If we do not have a config, we're done; we just keep
478         // the defaults (edge peer/no auto-rdv)
479         if (confAdv != null) {
480             Advertisement adv = null;
481             
482             try {
483                 XMLDocument configDoc = (XMLDocument) confAdv.getServiceParam(rdvService.getAssignedID());
484                 if (null != configDoc) {
485                     // XXX 20041027 backwards compatibility
486                     configDoc.addAttribute( "type", RdvConfigAdv.getAdvertisementType() );
487                     
488                     adv = AdvertisementFactory.newAdvertisement(configDoc);
489                 }
490             } catch (java.util.NoSuchElementException failed) {
491                 ;
492             }
493             
494             if (adv instanceof RdvConfigAdv) {
495                 RdvConfigAdv rdvConfigAdv = (RdvConfigAdv) adv;
496                 
497                 permanentSeedHosts.addAll(Arrays.asList(rdvConfigAdv.getSeedRendezvous()));
498                 
499                 useOnlySeeds = rdvConfigAdv.getUseOnlySeeds();
500                 
501                 seedingURIs.addAll(Arrays.asList(rdvConfigAdv.getSeedingURIs()));
502                 
503                 if (rdvConfigAdv.getSeedRendezvousConnectDelay() > 0) {
504                     seedingRdvConnDelay = rdvConfigAdv.getSeedRendezvousConnectDelay();
505                 }
506                 
507                 probeRelays = rdvConfigAdv.getProbeRelays();
508                 
509                 if (rdvConfigAdv.getMinHappyPeerView() > 0) {
510                     minHappyPeerView = rdvConfigAdv.getMinHappyPeerView();
511                 }
512             }
513         }
514         
515         lastPeerAdv = group.getPeerAdvertisement();
516         lastModCount = lastPeerAdv.getModCount();
517         
518         // create a new local RdvAdvertisement and set it to self.
519         RdvAdvertisement radv = createRdvAdvertisement(lastPeerAdv, name);
520         
521         self = new PeerViewElement(endpoint, radv);
522         
523         // if ( rdvService.isRendezVous() ) {
524         // addPeerViewElement( self );
525         // }
526         
527         // setup endpoint listener
528         endpoint.addIncomingMessageListener(this, SERVICE_NAME, uniqueGroupId);
529         
530         // add rendezvous listener
531         rdvService.addListener(this);
532         
533         // initialize strategies
534         replyStrategy = new PeerViewRandomWithReplaceStrategy(localView);
535         
536         kickRecipientStrategy = new PeerViewRandomStrategy(localView);
537         
538         kickAdvertisementStrategy = new PeerViewRandomWithReplaceStrategy(localView);
539         
540         refreshRecipientStrategy = new PeerViewSequentialStrategy(localView);
541         
542         localGroupWirePipeAdv = makeWirePipeAdvertisement(group);
543         
544         if (null != advertisingGroup) {
545             advGroupPropPipeAdv = makeWirePipeAdvertisement(advertisingGroup);
546         } else {
547             advGroupPropPipeAdv = null;
548         }
549         
550         if (LOG.isEnabledFor(Level.INFO)) {
551             LOG.info("PeerView created for group \"" + group.getPeerGroupName() + "\" [" + group.getPeerGroupID() + "] name \"" + name + "\"");
552         }
553     }
554     
555     /**
556      * {@inheritDoc}
557      *
558      *  Listener for "PeerView"/<peergroup-unique-id> and propagate pipes.
559      **/
560     public void processIncomingMessage(Message msg, EndpointAddress srcAddr, EndpointAddress dstAddr) {
561         
562         // IsEdgePeer is confusing because the is* predicates are used
563         // to refer to the properties of the message we're processing.
564         boolean localIsEdge = !rdvService.isRendezVous();
565         
566         // check what kind of message this is (response or not).
567         boolean isResponse = false;
568         MessageElement me = msg.getMessageElement(MESSAGE_NAMESPACE, MESSAGE_ELEMENT_NAME);
569         
570         if (me == null) {
571             me = msg.getMessageElement(MESSAGE_NAMESPACE, RESPONSE_ELEMENT_NAME);
572             if (me == null) {
573                 if (LOG.isEnabledFor(Level.WARN)) {
574                     LOG.warn("Discarding damaged " + msg + ".");
575                 }
576                 return;
577             } else {
578                 isResponse = true;
579             }
580         }
581         
582         Advertisement adv;
583         
584         try {
585             adv = AdvertisementFactory.newAdvertisement(me.getMimeType(), me.getStream());
586         } catch (IOException failed) {
587             if (LOG.isEnabledFor(Level.WARN)) {
588                 LOG.warn("Failed building rdv advertisement", failed);
589             }
590             return;
591         } catch (NoSuchElementException failed) {
592             if (LOG.isEnabledFor(Level.WARN)) {
593                 LOG.warn("Element is not a rdv advertisement", failed);
594             }
595             return;
596         } catch ( IllegalArgumentException failed ) {
597             if (LOG.isEnabledFor(Level.WARN)) {
598                 LOG.warn("Could not build rdv advertisement", failed);
599             }
600             return;
601         }
602         
603         if (!(adv instanceof RdvAdvertisement)) {
604             if (LOG.isEnabledFor(Level.WARN)) {
605                 LOG.warn("Response does not contain radv (" + adv.getAdvertisementType() + ")");
606             }
607             return;
608         }
609         
610         RdvAdvertisement radv = (RdvAdvertisement) adv;
611         // See if we can find a src route adv in the message.s
612         me = msg.getMessageElement(MESSAGE_NAMESPACE, SRCROUTEADV_ELEMENT_NAME);
613         if (me != null) {
614             try {
615                 Advertisement routeAdv = AdvertisementFactory.newAdvertisement(me.getMimeType(), me.getStream());
616                 
617                 if (!(routeAdv instanceof RouteAdvertisement)) {
618                     if (LOG.isEnabledFor(Level.WARN)) {
619                         LOG.warn("Advertisement is not a RouteAdvertisement");
620                     }
621                 } else {
622                     RouteAdvertisement rdvRouteAdv = (RouteAdvertisement) radv.getRouteAdv().clone();
623                     
624                     // XXX we stich them together even if in the end it gets optimized away
625                     RouteAdvertisement.stichRoute(rdvRouteAdv, (RouteAdvertisement) routeAdv);
626                     radv.setRouteAdv(rdvRouteAdv);
627                 }
628             } catch (IOException failed) {
629                 if (LOG.isEnabledFor(Level.WARN)) {
630                     LOG.warn("Failed getting stream for message element", failed);
631                 }
632             }
633         }
634         me = null;
635         
636         // Is this a message about ourself?
637         if (group.getPeerID().equals(radv.getPeerID())) {
638             if (LOG.isEnabledFor(Level.DEBUG)) {
639                 LOG.debug("Received a PeerView message about self. Discard.");
640             }
641             
642             return;
643         }
644         
645         // Collect the various flags.
646         
647         boolean isFailure = (msg.getMessageElement(MESSAGE_NAMESPACE, FAILURE_ELEMENT_NAME) != null);
648         boolean isCached = (msg.getMessageElement(MESSAGE_NAMESPACE, CACHED_RADV_ELEMENT_NAME) != null);
649         boolean isFromEdge = (msg.getMessageElement(MESSAGE_NAMESPACE, EDGE_ELEMENT_NAME) != null);
650         boolean isTrusted = acl.isAllowed(radv.getPeerID());
651         if (!localIsEdge && !isFromEdge && !isTrusted) {
652             if (LOG.isEnabledFor(Level.DEBUG)) {
653                  LOG.debug("Rejecting Rendezvous connection: " + radv.getPeerID().toString());
654             }
655             return;
656         }
657 
658         if (LOG.isEnabledFor(Level.DEBUG)) {
659             String srcPeer = srcAddr.toString();
660             
661             if( "jxta".equals(srcAddr.getProtocolName())) {
662                 try {
663                     String idstr = ID.URIEncodingName + ":" + ID.URNNamespace + ":" + srcAddr.getProtocolAddress();
664                     
665                     ID asID = IDFactory.fromURI( new URI(idstr) );
666                     
667                     PeerViewElement pve = getPeerViewElement( asID );
668                     if( null != pve ) {
669                         srcPeer = "\"" + pve.getRdvAdvertisement().getName() + "\"";
670                     }
671                 } catch( URISyntaxException failed ) {
672                 }
673             }
674             
675             LOG.debug(
676             "[" + group.getPeerGroupID() + "] Received a" + (isCached ? " cached" : "") + (isResponse ? " response" : "")
677             + (isFailure ? " failure" : "") + " message" + (isFromEdge ? " from edge" : "") + " regarding \"" + radv.getName() + "\" from "
678             + srcPeer);
679         }
680         
681         // if this is a notification failure. All we have to do is locally
682         // process the failure
683         if (isFailure) {
684             notifyFailure(radv.getPeerID(), false);
685             return;
686         }
687         
688         if (!isFromEdge && !isCached && isTrusted) {
689             DiscoveryService discovery = group.getDiscoveryService();
690             
691             if (discovery != null) {
692                 try {
693                     discovery.publish(radv, DEFAULT_RADV_LIFETIME, DEFAULT_RADV_EXPIRATION);
694                 } catch (IOException ex) {
695                     if (LOG.isEnabledFor(Level.WARN)) {
696                         LOG.warn("Could not publish " + radv.getName(), ex);
697                     }
698                 }
699             }
700         }
701         
702         // Figure out if we know that peer already. If we do, reuse the pve
703         // that we have.
704         boolean isNewbie = false;
705         boolean added = false;
706         PeerViewElement pve;
707         int viewSize = 0;
708         
709         synchronized (localView) {
710             PeerViewElement newbie = new PeerViewElement(endpoint, radv);
711             pve = getPeerViewElement(newbie);
712             
713             if (null == pve) {
714                 pve = newbie;
715                 isNewbie = true;
716             }
717             
718             if (!isFromEdge && !isCached && isTrusted) {
719                 if (!useOnlySeeds || isSeedRdv(radv)) {
720                     if (isNewbie) {
721                         added = addPeerViewElement(pve);
722                     } else {
723                         pve.setRdvAdvertisement(radv);
724                     }
725                 }
726             }
727             
728             viewSize = localView.size();
729         }
730         
731         // Do the rest of the add related tasks out of synch.
732         // We must not nest any possibly synchronized ops in
733         // the LocalView lock; it's the lowest level.
734         
735         if (added) {
736             // Notify local listeners
737             generateEvent(PeerViewEvent.ADD, pve);
738         }
739         
740         /*
741          * Now, see what if any message we have to send as a result.
742          * There are four kind of messages we can send:
743          *
744          * - A response with ourselves, if we're being probed and we're
745          * a rdv.
746          *
747          * - A probe to the peer whose adv we received, because we want
748          * confirmation that it's alive.
749          *
750          * - A response with a random adv from our cache if we're being probed
751          *
752          * We may send more than one message.
753          */
754         
755         // Type 1: respond with self.
756         // We need to do that whenever we're being probed and we're an rdv,
757         // and the adv we got is that of the sender (!cached - otherwise we
758         // can't respond to the sender, it's a kick message that tells us about
759         // another peer, which is handled by Type 2 below).
760         // This could happen along with Type 2 below.
761         if (!isCached && !localIsEdge && !isResponse) {
762             boolean status = send(pve, self, true, false);
763             
764             if (LOG.isEnabledFor(Level.DEBUG)) {
765                 LOG.debug("Type 1 (Respond with self) : Sent to " + pve + " result=" + status);
766             }
767         }
768         
769         // Type 2: probe it.
770         // We need to probe if we do not have it in our view and it is
771         // a cached adv (from a third party, so non-authoritative) which
772         // can be found either in a response or in a kick message.
773         // OR, if it is a probe from a peer that we do not know (in which
774         // case we will probe here if it pretends to be an rdv, and also
775         // respond (see Type 1, above) - only if it is NOT a response).
776         // If isNewbie && added, isCached cannot be true, so we do not
777         // need to check for added; (isCached && isNewbie) is enough.
778         // If isNewbie && added, response cannot be false, so there is
779         // no need to check for added; (isNewbie && ! reponse) is enough.
780         // Whatchout: do not always probe cached things we got in a response
781         // because we'd likely get another response with another cached
782         // adv, thus cascading through all rdvs.
783         // What we do is to use the information only if our view is way small.
784         // in order to garantee connectivity for the future.
785         
786         if ((isCached && isNewbie && (!isResponse || (viewSize < minHappyPeerView))) || (!isCached && isNewbie && !isResponse && !isFromEdge)) {
787             
788             // If useOnlySeeds, we're not allowed to use other than our
789             // seed rdvs, so do not probe anything we learn from 3rd
790             // party.  FIXME: jice@jxta.org 20030828 - in theory, it
791             // is possible that we receive an unsollicited response,
792             // which would lead us to insert a possibly non-seed in
793             // our peerview. For now, we just take that chance.
794             if (!useOnlySeeds) {
795                 
796                 boolean status = send(pve, self, false, false);
797                 
798                 if (LOG.isEnabledFor(Level.DEBUG)) {
799                     LOG.debug("Type 2 (Probe PVE) : Probed " + pve + " result=" + status);
800                 }
801             }
802         }
803         
804         // Type 3: respond with random cached adv because being probed (likely
805         // by an edge, but we don't care, even another rdv could benefit from it).
806         // This could happen along with Type 2 above although it is rather
807         // unlikely.
808         // Respond with a strategized adv from our view.
809         //
810         // This always happens along with Type 1 and sometimes along with
811         // Type 2 in the same time: we could send three messages. That would
812         // be if we receive a probe from another rdv that we we do not know
813         // yet and we're an rdv ourselves. So, we'll respond with ourselves,
814         // we will probe the sender since it pretends to be an rdv, and we
815         // also will send a chosen rdv from our view and send it (done here).
816         // Note, it could mean a cascade of probes to all rdvs: the recipient
817         // of our cached adv would then probe it, thus receiving another
818         // cached adv, which it would probe, etc.
819         // This phenomenon is prevented in Type 2 by probing cached peers
820         // such responses only if the view is small enough.
821         // NOTE: rdv peers do not need this all that much and we could
822         // avoid it for them, but it should not cause much problems so we
823         // might as well leave it for  now.
824         
825         if (!isCached && !isResponse) {
826             // Someone is looking for rdvs. try to help.
827             
828             PeerViewElement sendpve = replyStrategy.next();
829             
830             if ((sendpve != null) && !pve.equals(sendpve) && !self.equals(sendpve)) {
831                 boolean status = send(pve, sendpve, true, false);
832                 
833                 if (LOG.isEnabledFor(Level.DEBUG)) {
834                     LOG.debug("Type 3 (Respond with random pve) : Sent " + sendpve + " to " + pve + " result=" + status);
835                 }
836             }
837         }
838     }
839     
840     /**
841      * {@inheritDoc}
842      **/
843     public void rendezvousEvent(RendezvousEvent event) {
844         
845         if (closed) {
846             return;
847         }
848         
849         boolean notifyFailure = false;
850         
851         synchronized (this) {
852             
853             int theEventType = event.getType();
854             
855             if (LOG.isEnabledFor(Level.DEBUG)) {
856                 LOG.debug("[" + group.getPeerGroupName() + "] Processing  " + event);
857             }
858             
859             refreshSelf();
860             
861             if ((RendezvousEvent.BECAMERDV == theEventType) || (RendezvousEvent.BECAMEEDGE == theEventType)) {
862                 // kill any existing watchdog task
863                 if (null != watchdogTask) {
864                     removeTask(watchdogTask);
865                     watchdogTask.cancel();
866                     watchdogTask = null;
867                 }
868             }
869             
870             switch (theEventType) {
871                 case RendezvousEvent.RDVCONNECT:
872                 case RendezvousEvent.RDVRECONNECT:
873                 case RendezvousEvent.CLIENTCONNECT:
874                 case RendezvousEvent.CLIENTRECONNECT:
875                 case RendezvousEvent.RDVFAILED:
876                 case RendezvousEvent.RDVDISCONNECT:
877                 case RendezvousEvent.CLIENTFAILED:
878                 case RendezvousEvent.CLIENTDISCONNECT:
879                     break;
880                     
881                 case RendezvousEvent.BECAMERDV:
882                     openWirePipes();
883                     watchdogTask = new WatchdogTask(this);
884                     addTask(watchdogTask, WATCHDOG_PERIOD, WATCHDOG_PERIOD);
885                     rescheduleKick(true);
886                     break;
887                     
888                 case RendezvousEvent.BECAMEEDGE:
889                     openWirePipes();
890                     if (!localView.isEmpty()) {
891                         // FIXME bondolo 20040229 since we likely don't have a
892                         // rendezvous connection, it is kind of silly to be sending
893                         // this now. probably should wait until we get a rendezvous
894                         // connection.
895                         notifyFailure = true;
896                     }
897                     rescheduleKick(true);
898                     break;
899                     
900                 default:
901                     if (LOG.isEnabledFor(Level.WARN)) {
902                         LOG.warn("[" + group.getPeerGroupName() + "] Unexpected RDV event : " + event);
903                     }
904                     break;
905             }
906         }
907         
908         // we can't do the notification under synchronization.
909         if (notifyFailure) {
910             notifyFailure(self, true);
911         }
912     }
913     
914     public void start() {
915         // do nothing for now... all the good stuff happens as a result of
916         // rendezvous events.
917     }
918     
919     public void stop() {
920         
921         synchronized (this) {
922             // Only one thread gets to perform the shutdown.
923             if (closed) {
924                 return;
925             }
926             closed = true;
927         }
928         
929         // notify other rendezvous peers that we are going down (only
930         // if this peer is a rendezvous)
931         if (rdvService.isRendezVous()) {
932             notifyFailure(self, true);
933         }
934         
935         // From now on we can nullify everything we want. Other threads check
936         // the closed flag where it matters.
937         synchronized (this) {
938             if (watchdogTask != null) {
939                 removeTask(watchdogTask);
940                 watchdogTask.cancel();
941                 watchdogTask = null;
942             }
943             
944             // Remove message listener.
945             endpoint.removeIncomingMessageListener(SERVICE_NAME, uniqueGroupId);
946             
947             // Remove rendezvous listener.
948             rdvService.removeListener(this);
949             
950             // Remove all our pending scheduled tasks
951             // Carefull with the indices while removing: do it backwards, it's
952             // cheaper and simpler.
953             
954             synchronized (scheduledTasks) {
955                 Iterator eachTask = scheduledTasks.iterator();
956                 
957                 while (eachTask.hasNext()) {
958                     try {
959                         TimerTask task = (TimerTask) eachTask.next();
960                         
961                         task.cancel();
962                         eachTask.remove();
963                     } catch (Exception ez1) {
964                         if (LOG.isEnabledFor(Level.WARN)) {
965                             LOG.warn("Cannot cancel task: ", ez1);
966                         }
967                         continue;
968                     }
969                 }
970             }
971             
972             // Make sure that we close our WirePipes
973             closeWirePipes();
974             
975             // Let go of the up and down peers.
976             downPeer = null;
977             upPeer = null;
978             localView.clear();
979             
980             timer.cancel();
981             
982             rpvListeners.clear();
983         }
984     }
985     
986     protected void addTask(TimerTask task, long delay, long interval) {
987         
988         synchronized (scheduledTasks) {
989             if (scheduledTasks.contains(task)) {
990                 if (LOG.isEnabledFor(Level.WARN)) {
991                     LOG.warn("Task list already contains specified task.");
992                 }
993             }
994             
995             scheduledTasks.add(task);
996         }
997         
998         if (interval >= 1) {
999             timer.schedule(task, delay, interval);
1000        } else {
1001            timer.schedule(task, delay);
1002        }
1003    }
1004    
1005    protected void removeTask(TimerTask task) {
1006        scheduledTasks.remove(task);
1007    }
1008    
1009    /**
1010     * Adds the specified URI to the list of seeds. Even if useOnlySeeds is in
1011     * effect, this seed may now be used, as if it was part of the initial
1012     * configuration.
1013     *
1014     * @param seed the URI of the seed rendezvous.
1015     **/
1016    public void addSeed(URI seed) {
1017        permanentSeedHosts.add(seed);
1018    }
1019    
1020    /**
1021     * Probe the specified peer immediately.
1022     *
1023     * <p/> Note: If "useOnlySeeds" is in effect and the peer is not a seed, any response to this probe will be ignored.
1024     **/
1025    public boolean probeAddress(EndpointAddress address, Object hint) {
1026        
1027        PeerViewElement holdIt = null;
1028        
1029        synchronized(localView) {
1030            holdIt = self;
1031        }
1032        
1033        return send(address, hint, holdIt, false, false);
1034    }
1035    
1036    /**
1037     * Send our own advertisement to all of the seed rendezvous.
1038     */
1039    public void seed() {
1040        long reseedRemaining = earliestReseed - TimeUtils.timeNow();
1041        
1042        if (reseedRemaining > 0) {
1043            // Too early; the previous round is not even done.
1044            if (LOG.isEnabledFor(Level.INFO)) {
1045                LOG.info("Still Seeding for " + reseedRemaining + "ms.");
1046            }
1047            return;
1048        }
1049        
1050        if (LOG.isEnabledFor(Level.INFO)) {
1051            LOG.info("New Seeding...");
1052        }
1053        
1054        // Schedule sending propagated query to our local network neighbors.
1055        timedSend(self, (EndpointAddress) null, DEFAULT_SEEDING_PERIOD * 2);
1056        
1057        List seedRdvs = new ArrayList(permanentSeedHosts);
1058        
1059        if (useOnlySeeds || (localView.size() < minHappyPeerView)) {
1060            // We start try configured seed peers only after some time, so we make sure that
1061            // the topology will not tend to be centralized.
1062            
1063            Iterator allSeedingURIs = seedingURIs.iterator();
1064            while(allSeedingURIs.hasNext()) {
1065                URI aURI = (URI) allSeedingURIs.next();
1066                try {
1067                    seedRdvs.addAll(Arrays.asList(loadSeeds(aURI)));
1068                } catch( IOException failed ) {
1069                    if (LOG.isEnabledFor(Level.WARN)) {
1070                        LOG.warn("Failed loading seeding list from : " + aURI );
1071                    }
1072                }
1073            }
1074        }
1075        
1076        synchronized( activeSeedHosts ) {
1077            activeSeedHosts.clear();
1078            activeSeedHosts.addAll( seedRdvs );
1079        }
1080        
1081        long iterations = 0;
1082        
1083        if(localView.size() < minHappyPeerView) {
1084            // We only do these things if we don't have a "happy" Peer View.
1085            // If the Peer View is already "happy" then we will use only
1086            // Peer View referrals for learning of new entires.
1087            
1088            Collections.shuffle(seedRdvs);
1089            
1090            while ( !seedRdvs.isEmpty() ) {
1091                if (sendRandomByAddr(seedRdvs, DEFAULT_SEEDING_RDVPEERS, seedingRdvConnDelay + DEFAULT_SEEDING_PERIOD * iterations)) {
1092                    ++iterations;
1093                }
1094            }
1095            
1096            if ( !useOnlySeeds ) {
1097                // If use only seeds, we're not allowed to put in the peerview
1098                // anything but our seed rdvs. So, we've done everything that
1099                // was required.
1100                
1101                // Schedule sending propagated query to our advertising group
1102                if (advertisingGroup != null) {
1103                    // send it, but not immediately.
1104                    scheduleAdvertisingGroupQuery(DEFAULT_SEEDING_PERIOD * 2);
1105                }
1106                
1107                // send own advertisement to a random set of rendezvous
1108                List rdvs = discoverRdvAdverisements();
1109                
1110                Collections.shuffle(rdvs);
1111                
1112                while ( !rdvs.isEmpty() ) {
1113                    if (sendRandomByAdv(rdvs, DEFAULT_SEEDING_RDVPEERS, DEFAULT_SEEDING_PERIOD * iterations)) {
1114                        ++iterations;
1115                    }
1116                }
1117                
1118                if (probeRelays) {
1119                    List relays = getRelayPeers();
1120                    
1121                    Collections.shuffle(relays);
1122                    
1123                    while ( !relays.isEmpty() ) {
1124                        if (sendRandomByAddr(relays, DEFAULT_SEEDING_RDVPEERS, DEFAULT_SEEDING_PERIOD * iterations)) {
1125                            ++iterations;
1126                        }
1127                    }
1128                }
1129            }
1130        }
1131        
1132        earliestReseed = TimeUtils.toAbsoluteTimeMillis(seedingRdvConnDelay + (DEFAULT_SEEDING_PERIOD * iterations));
1133    }
1134    
1135    /**
1136     * Evaluates if the given pve corresponds to one of our seed rdvs. This is 
1137     * to support the useOnlySeeds flag. The test is not completely foolproof 
1138     * since our list of seed rdvs is just transport addresses. We could be
1139     * given a pve that exhibits an address that corresponds to one of our seeds 
1140     * but is fake. And we might later succeed in connecting to that rdv via one
1141     * the other, real addresses. As a result, useOnlySeeds is *not* a security 
1142     * feature, just a convenience for certain kind of deployments. Seed
1143     * rdvs should include certificates for such a restriction to be a security 
1144     * feature.
1145     */
1146    private boolean isSeedRdv(RdvAdvertisement rdvAdv) {
1147        
1148        RouteAdvertisement radv = rdvAdv.getRouteAdv();
1149        
1150        if (radv == null) {
1151            return false;
1152        }
1153        
1154        AccessPointAdvertisement apAdv = radv.getDest();
1155        
1156        if (apAdv == null) {
1157            return false;
1158        }
1159        
1160        // The accessPointAdv returns a live (!) copy of the endpoint addresses.
1161        List addrList = new ArrayList(apAdv.getVectorEndpointAddresses());
1162        
1163        if (addrList.isEmpty()) {
1164            return false;
1165        }
1166        
1167        ListIterator eachAddr = addrList.listIterator();
1168        
1169        // convert each string to a URI
1170        while (eachAddr.hasNext()) {
1171            String anAddr = (String) eachAddr.next();
1172            
1173            try {
1174                // Convert to URI to compare with seedHosts
1175                eachAddr.set( new URI(anAddr));
1176            } catch (URISyntaxException badURI) {
1177                if (LOG.isEnabledFor(Level.WARN)) {
1178                    LOG.warn("Skipping bad URI : " + anAddr, badURI);
1179                }
1180            }
1181        }
1182        
1183        // seedList must be treated as read-only.
1184        // We do what we want with addrVect
1185        
1186        addrList.retainAll(activeSeedHosts);
1187        
1188        // What's left is the intersection of seedHosts and the set of
1189        // endpoint addresses in the given pve. If it is non-empty, then we
1190        // accept the pve as that of a seed host.
1191        return (!addrList.isEmpty());
1192    }
1193    
1194    /**
1195     * Make sure that the PeerView properly changes behavior, when switching
1196     * from edge mode to rdv mode, and vice-versa.
1197     * Since openWirePipes() requires some other services such as the Pipe
1198     * Service, and since updateStatus is invoked this work must happen in
1199     * background, giving a chance to other services to be started.
1200     **/
1201    private class OpenPipesTask extends TimerTask {
1202        
1203        /**
1204         *  {@inheritDoc}
1205         **/
1206        public void run() {
1207            try {
1208                if (closed) {
1209                    return;
1210                }
1211                
1212                openWirePipes();
1213            } catch (Throwable all) {
1214                if (LOG.isEnabledFor(Level.FATAL)) {
1215                    LOG.fatal("Uncaught Throwable in thread: " + Thread.currentThread().getName(), all);
1216                }
1217            } finally {
1218                removeTask(this);
1219            }
1220        }
1221    }
1222    
1223    private void scheduleOpenPipes(long delay) {
1224        
1225        if (LOG.isEnabledFor(Level.DEBUG)) {
1226            LOG.debug("Scheduling open pipes attempt in " + delay + "ms.");
1227        }
1228        
1229        addTask(new OpenPipesTask(), delay, -1);
1230    }
1231    
1232    /**
1233     * Send a PeerView Message to the specified peer.
1234     *
1235     * @param response indicates whether this is a response. Otherwise
1236     * we may create a distributed loop where peers keep perpetually
1237     * responding to each-other.
1238     * @param failure Construct the message as a failure notification.
1239     **/
1240    private boolean send(PeerViewElement dest, PeerViewElement pve, boolean response, boolean failure) {
1241        
1242        Message msg = makeMessage(pve, response, failure);
1243        
1244        boolean result = dest.sendMessage(msg, SERVICE_NAME, uniqueGroupId);
1245        
1246        if (LOG.isEnabledFor(Level.DEBUG)) {
1247            LOG.debug("Sending " + msg + " to " + dest + " success = " + result);
1248        }
1249        
1250        return result;
1251    }
1252    
1253    /**
1254     * Send a PeerView Message to the specified peer.
1255     *
1256     * @param response indicates whether this is a response. Otherwise
1257     * we may create a distributed loop where peers keep perpetually
1258     * responding to each-other.
1259     * @param failure Construct the message as a failure notification.
1260     **/
1261    private boolean send(EndpointAddress dest, Object hint,
1262    PeerViewElement pve, boolean response, boolean failure) {
1263        
1264        Message msg = makeMessage(pve, response, failure);
1265        
1266        if (null != dest) {
1267            EndpointAddress realAddr = new EndpointAddress(dest, SERVICE_NAME, uniqueGroupId);
1268            
1269            Messenger messenger = rdvService.endpoint.getMessengerImmediate(realAddr, hint);
1270            
1271            if (null<