Docjar: A Java Source and Docuemnt Enginecom.*    java.*    javax.*    org.*    all    new    plug-in

Quick Search    Search Deep

Source code: com/ubermq/chord/jms/RemoteChordNode.java


1   package com.ubermq.chord.jms;
2   
3   import com.ubermq.*;
4   import com.ubermq.chord.*;
5   import com.ubermq.jms.client.*;
6   import EDU.oswego.cs.dl.util.concurrent.*;
7   import java.io.*;
8   import java.util.*;
9   import java.net.*;
10  import javax.jms.*;
11  
12  /**
13   * A proxy for a remote chord node. This implementation uses
14   * JMS and the ChordMessage object hierarchy for connectivity.<P>
15   *
16   * This object may be serialized across the network as a way to describe
17   * chord nodes to other chord participants.<P>
18   *
19   * @serial
20   */
21  public class RemoteChordNode
22      implements ChordNode, java.io.Serializable
23  {
24      // the identifier can be cached because it is
25      // in reality immutable
26      private final ChordIdentifier identifier;
27  
28      private final URI uri;
29      private final String commandTopic;
30  
31      public static final long serialVersionUID = 1L;
32  
33      // Transient, mutable state
34      private transient boolean connected = false;
35  
36      private transient Mutex queryMutex;
37  
38      private transient TopicConnection tc;
39      private transient TopicSession ts;
40      private transient TopicPublisher tpub;
41  
42      private transient Topic replyTopic;
43      private transient TopicSubscriber replySub;
44  
45      // a big map of URI -> remote nodes
46      private static final Map cachedNodes = new HashMap();
47  
48      /**
49       * Looks for a remote node with the given URI in the big cache.
50       */
51      private synchronized static RemoteChordNode lookup(URI uri)
52      {
53          return (RemoteChordNode)cachedNodes.get(uri);
54      }
55  
56      private synchronized static void cache(URI uri, RemoteChordNode n)
57      {
58          cachedNodes.put(uri, n);
59      }
60  
61      /**
62       * Constructs a remote chord node from a JMS topic connection
63       * factory. The node must be connected with the <code>connect</code>
64       * method before being used.
65       *
66       * @param URI the service URI. Every uberchord service URI is comprised
67       * of two parts, the connection descriptor, and the node identifier.
68       * For example, <pre>ubermq://localhost:3999/ident</pre> gives both
69       * the destination and the node identifier.
70       *
71       * @return a remote node for the specified
72       *
73       * @throws JMSException if the JMS connect attempt fails.
74       */
75      public synchronized static RemoteChordNode createRemoteInstance(ChordIdentifierFactory f,
76                                                                      URI uri)
77      {
78          RemoteChordNode lookup = lookup(uri);
79          if (lookup == null) {
80              lookup = new RemoteChordNode(f, uri);
81              cache(uri, lookup);
82          }
83          return lookup;
84      }
85  
86      /**
87       * Creates a remote chord node representation
88       * for a local chord node provider.
89       *
90       * @param p a local node
91       * @return a remote representation for p
92       */
93      public synchronized static RemoteChordNode getRemoteRepresentation(ChordNodeProvider p)
94      {
95          return createRemoteInstance(p.identifier().factory(),
96                                      p.getServiceURI());
97      }
98  
99      /**
100      * Given a non-specific URI of a message server, this method
101      * probes the server for connected nodes and returns the URI
102      * of one that is uniquely addressable.<P>
103      *
104      * @param a URI that can be used to connect to a message server.
105      * @return a collection of URIs that refers to every specifically addressable
106      * chord node at that server that responded within a finite
107      * time frame.
108      */
109     public static List findNodeAtURI(URI uri)
110         throws JMSException
111     {
112         TopicConnection tc = new URLTopicConnectionFactory(uri.toString()).createTopicConnection();
113         TopicSession ts = tc.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
114 
115         Topic l = ts.createTopic(LocalChordNode.CHORD_NODE_LOCATOR_TOPIC);
116         Topic tt = ts.createTemporaryTopic();
117 
118         TopicPublisher p = ts.createPublisher(l);
119         TopicSubscriber s = ts.createSubscriber(tt);
120 
121         tc.start();
122 
123         Message m = ts.createMessage();
124         m.setJMSReplyTo(tt);
125         p.publish(m);
126 
127         List uris = new ArrayList();
128         Message urim = s.receive(DEFAULT_QUERY_TIMEOUT);
129         while(urim != null)
130         {
131             try
132             {
133                 String responder = urim.getStringProperty(LocalChordNode.CHORD_NODE_LOCATOR_URI_PROP);
134                 uris.add(URI.create(responder));
135             }
136             catch (JMSException e) {
137                 // ignore this message then
138             }
139 
140             urim = s.receive(DEFAULT_QUERY_TIMEOUT);
141         }
142 
143         s.close();
144         p.close();
145         ts.close();
146         tc.close();
147 
148         return uris;
149     }
150 
151     /**
152      * The default query timeout to wait for a response from the
153      * remote node.
154      */
155     public static final long DEFAULT_QUERY_TIMEOUT = 5000L;
156 
157     private RemoteChordNode(ChordIdentifierFactory f,
158                             URI remoteURI)
159     {
160         ChordIdentifier id = f.getIdentifier(remoteURI);
161         if (id == null) {
162             // find node at this URI.
163             try
164             {
165                 this.uri = (URI)findNodeAtURI(remoteURI).get(0);
166                 this.identifier = f.getIdentifier(uri);
167                 if (uri == null ||
168                     identifier == null)
169                     throw new java.lang.IllegalStateException("No chord nodes found at the URI " + uri);
170             }
171             catch (JMSException e) {
172                 throw new java.lang.IllegalStateException(e.getMessage());
173             }
174         } else {
175             this.uri = remoteURI;
176             this.identifier = id;
177         }
178 
179         this.queryMutex = new Mutex();
180         this.commandTopic = LocalChordNode.getCommandTopic(identifier);
181         this.connected = false;
182     }
183 
184     /**
185      * Read resolution provides us a way to go through
186      * our cached local connections, rather than creating new ones
187      * every time.
188      */
189     private Object readResolve() throws ObjectStreamException
190     {
191         RemoteChordNode lookup = lookup(this.uri);
192         if (lookup == null) {
193             lookup = this;
194             lookup.queryMutex = new Mutex();
195             cache(this.uri, this);
196         }
197         return lookup;
198     }
199 
200     /**
201      * Connects to the service provider for the remote node.
202      */
203     public void connect()
204         throws JMSException
205     {
206         tc = new URLTopicConnectionFactory(uri.toString()).createTopicConnection();
207         ts = tc.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
208 
209         replyTopic = ts.createTemporaryTopic();
210         replySub = ts.createSubscriber(replyTopic);
211 
212         tpub = ts.createPublisher(ts.createTopic(commandTopic));
213 
214         tc.start();
215         this.connected = true;
216     }
217 
218     public void close()
219     {
220         try
221         {
222             tpub.close();
223             ts.close();
224             tc.close();
225 
226             this.connected = false;
227         }
228         catch (JMSException e) {
229             // we are closing, it's ok
230         }
231     }
232 
233     /**
234      * Returns true if the <code>connect</code> method has been invoked
235      * successfully.
236      * @return true if connected, false otherwise.
237      */
238     public boolean isConnected()
239     {
240         return connected;
241     }
242 
243     public String toString()
244     {
245         return "remote id=" + identifier();
246     }
247 
248     public String toHtml()
249     {
250         StringBuffer sb = new StringBuffer("<html>");
251         sb.append(uri.toString());
252 
253         Iterator iter = ((Collection)query(LocalChordNode.CHORD_LOCAL_KEYS)).iterator();
254         if (iter.hasNext())
255             sb.append("<br><br><b>Local Contents:</b><br>");
256 
257         while (iter.hasNext())
258         {
259             Object element = iter.next();
260             sb.append("<tt>");
261             sb.append(element.toString());
262             sb.append("</tt><br>");
263         }
264 
265         sb.append("</html>");
266         return sb.toString();
267     }
268 
269     /**
270      * Returns the identifier for this node.
271      *
272      * @return   the node identifier in the identifier space.
273      */
274     public ChordIdentifier identifier()
275     {
276         return identifier;
277     }
278 
279     /**
280      * Returns the successor of this node.
281      *
282      * @return   a ChordNode that is the direct successor of this node.
283      */
284     public ChordNode successor()
285     {
286         // TODO: possibly optimize - we are sending the whole
287         // finger table over the network, when we really only
288         // have to send a single node.
289         return fingers()[0];
290     }
291 
292     public ChordNode predecessor()
293     {
294         return (ChordNode)query(LocalChordNode.CHORD_PREDECESSOR);
295     }
296 
297     /**
298      * Instructs the node that the specified node may be its predecessor.
299      *
300      * @param n a node, which is likely the predecessor for this node.
301      */
302     public void notify(ChordNode n)
303     {
304         ensureConnected();
305 
306         try
307         {
308             synchronized(tpub) {
309                 tpub.publish(ChordMessage.createNotifyMessage(ts,
310                                                               false,
311                                                               RemoteChordNode.getRemoteRepresentation((ChordNodeProvider)n)).getJMSMessage());
312             }
313         }
314         catch (JMSException e) {
315             throw new java.lang.IllegalStateException("Remote node is unavailable.");
316         }
317     }
318 
319     public void notifyGoingAway(ChordNode n)
320     {
321         ensureConnected();
322 
323         try
324         {
325             synchronized(tpub) {
326                 tpub.publish(ChordMessage.createNotifyMessage(ts,
327                                                               true,
328                                                               RemoteChordNode.getRemoteRepresentation((ChordNodeProvider)n)).getJMSMessage());
329             }
330         }
331         catch (JMSException e) {
332             throw new java.lang.IllegalStateException("Remote node is unavailable.");
333         }
334     }
335 
336     /**
337      * Returns the element of the finger table
338      * that most closely precedes the given identifier.
339      *
340      * @return a ChordNode representing the closest finger to the identifier.
341      */
342     public ChordNode closestPrecedingFinger(ChordIdentifier id)
343     {
344         ChordNode[] fingers = fingers();
345         for (int i = fingers.length - 1; i >= 0; i--)
346         {
347             if (Interval.elementOf(fingers[i].identifier(),
348                                    identifier,
349                                    id,
350                                    false, true))
351                 return fingers[i];
352         }
353 
354         return this;
355     }
356 
357     /**
358      * Joins the node to the specified infrastructure.  This causes the node
359      * to perform any logic that is required to inform itself or
360      * other nodes in the infrastructure about its presence.
361      *
362      * @param i the infrastructure that the node is joining
363      * @param m the size of the finger table
364      */
365     public void join(ChordInfrastructure i)
366     {
367         throw new UnsupportedOperationException("Remote cache nodes cannot join an infrastructure.");
368     }
369 
370     public void leave(ChordInfrastructure i)
371     {
372         throw new UnsupportedOperationException("Remote cache nodes cannot leave an infrastructure.");
373     }
374 
375     /**
376      * Returns the finger table for this node. The finger table is
377      * a set of nodes, such that the <i>i</i><super>th</super> entry
378      * contains the identity of the first node that succeeds this node by at least
379      * 2 <super>i-1</super> on the identifier circle.
380      *
381      * @return   a ChordNode[] containing the fingers for this node.
382      */
383     public ChordNode[] fingers()
384     {
385         ChordNode[] fingers = (ChordNode[])query(LocalChordNode.CHORD_FINGER_TABLE_KEY);
386         if (fingers == null) {
387             throw new java.lang.IllegalStateException("This node is in isolation. The finger table is empty.");
388         }
389 
390         return fingers;
391     }
392 
393     /**
394      * Stores the specified object at the given key, on this node.
395      * The node may refuse the key if the hash value is not consistent.
396      *
397      * @throws IllegalStateException if the node is read-only
398      * @throws IllegalArgumentException if storing the key-value pair
399      * would break the chord invariants.
400      */
401     public void store(Object key, Object value)
402     {
403         ensureConnected();
404 
405         try
406         {
407             ChordMessage cm = ChordMessage.createStoreMessage(ts,
408                                                               key,
409                                                               value);
410             synchronized(tpub) {
411                 tpub.publish(cm.getJMSMessage());
412             }
413         }
414         catch (JMSException e) {
415             throw new java.lang.IllegalStateException("Destination node is unavailable.");
416         }
417     }
418 
419     /**
420      * Queries the node for the object indexed at the specified key.
421      *
422      */
423     public synchronized Object query(Object key)
424     {
425         ensureConnected();
426 
427         try
428         {
429             ChordQueryMessage cm = ChordMessage.createQueryMessage(ts, replyTopic, key);
430 
431             // get the query mutex
432             queryMutex.acquire();
433 
434             // publish the query & wait for the reply, synchronously.
435             assert isConnected();
436             synchronized(tpub) {
437                 tpub.publish(cm.getJMSMessage());
438             }
439 
440             // get the reply
441             Message jmsMessage = replySub.receive(DEFAULT_QUERY_TIMEOUT);
442             if (jmsMessage != null)
443             {
444                 ChordMessage m = ChordMessage.createFromIncoming(jmsMessage);
445                 assert m instanceof ChordValueMessage;
446 
447                 ChordValueMessage v = ((ChordValueMessage)m);
448                 assert v.getQueryId() == cm.getQueryId() : "expected result on " + replyTopic + " for " + cm.getQueryId() + " but was " + v.getQueryId();
449 
450                 // we have a reply. our model is for a single
451                 // outstanding query per object at a time.
452                 Utility.getLogger().info("got response on " + replyTopic + " for " + cm.getQueryId());
453                 assert replySub.receiveNoWait() == null : "extra reply message";
454                 return v.getValue();
455             }
456 
457             // we didn't get a reply.
458             //TODO: handle the failure of the destination node more
459             // gracefully
460             assert false;
461             return null;
462         }
463         catch (JMSException e) {
464             throw new java.lang.IllegalStateException("Destination node is unavailable.");
465         }
466         catch (InterruptedException ie) {
467             throw new UnknownError("Interrupted while acquiring query mutex.");
468         }
469         finally {
470             queryMutex.release();
471         }
472     }
473 
474     private synchronized void ensureConnected()
475     {
476         // connect if necessary
477         try
478         {
479             if (!isConnected())
480                 connect();
481         }
482         catch (JMSException e) {
483             throw new java.lang.IllegalStateException("Unable to lazily connect. " + e.getMessage());
484         }
485     }
486 
487     public int hashCode()
488     {
489         return identifier.hashCode();
490     }
491 
492     public boolean equals(Object o)
493     {
494         if (o instanceof ChordNode)
495         {
496             return ((ChordNode)o).identifier().equals(identifier());
497         }
498         else return false;
499     }
500 }