Source code: com/ubermq/chord/jms/RemoteChordNode.java
1 package com.ubermq.chord.jms;
2
3 import com.ubermq.*;
4 import com.ubermq.chord.*;
5 import com.ubermq.jms.client.*;
6 import EDU.oswego.cs.dl.util.concurrent.*;
7 import java.io.*;
8 import java.util.*;
9 import java.net.*;
10 import javax.jms.*;
11
12 /**
13 * A proxy for a remote chord node. This implementation uses
14 * JMS and the ChordMessage object hierarchy for connectivity.<P>
15 *
16 * This object may be serialized across the network as a way to describe
17 * chord nodes to other chord participants.<P>
18 *
19 * @serial
20 */
21 public class RemoteChordNode
22 implements ChordNode, java.io.Serializable
23 {
24 // the identifier can be cached because it is
25 // in reality immutable
26 private final ChordIdentifier identifier;
27
28 private final URI uri;
29 private final String commandTopic;
30
31 public static final long serialVersionUID = 1L;
32
33 // Transient, mutable state
34 private transient boolean connected = false;
35
36 private transient Mutex queryMutex;
37
38 private transient TopicConnection tc;
39 private transient TopicSession ts;
40 private transient TopicPublisher tpub;
41
42 private transient Topic replyTopic;
43 private transient TopicSubscriber replySub;
44
45 // a big map of URI -> remote nodes
46 private static final Map cachedNodes = new HashMap();
47
48 /**
49 * Looks for a remote node with the given URI in the big cache.
50 */
51 private synchronized static RemoteChordNode lookup(URI uri)
52 {
53 return (RemoteChordNode)cachedNodes.get(uri);
54 }
55
56 private synchronized static void cache(URI uri, RemoteChordNode n)
57 {
58 cachedNodes.put(uri, n);
59 }
60
61 /**
62 * Constructs a remote chord node from a JMS topic connection
63 * factory. The node must be connected with the <code>connect</code>
64 * method before being used.
65 *
66 * @param URI the service URI. Every uberchord service URI is comprised
67 * of two parts, the connection descriptor, and the node identifier.
68 * For example, <pre>ubermq://localhost:3999/ident</pre> gives both
69 * the destination and the node identifier.
70 *
71 * @return a remote node for the specified
72 *
73 * @throws JMSException if the JMS connect attempt fails.
74 */
75 public synchronized static RemoteChordNode createRemoteInstance(ChordIdentifierFactory f,
76 URI uri)
77 {
78 RemoteChordNode lookup = lookup(uri);
79 if (lookup == null) {
80 lookup = new RemoteChordNode(f, uri);
81 cache(uri, lookup);
82 }
83 return lookup;
84 }
85
86 /**
87 * Creates a remote chord node representation
88 * for a local chord node provider.
89 *
90 * @param p a local node
91 * @return a remote representation for p
92 */
93 public synchronized static RemoteChordNode getRemoteRepresentation(ChordNodeProvider p)
94 {
95 return createRemoteInstance(p.identifier().factory(),
96 p.getServiceURI());
97 }
98
99 /**
100 * Given a non-specific URI of a message server, this method
101 * probes the server for connected nodes and returns the URI
102 * of one that is uniquely addressable.<P>
103 *
104 * @param a URI that can be used to connect to a message server.
105 * @return a collection of URIs that refers to every specifically addressable
106 * chord node at that server that responded within a finite
107 * time frame.
108 */
109 public static List findNodeAtURI(URI uri)
110 throws JMSException
111 {
112 TopicConnection tc = new URLTopicConnectionFactory(uri.toString()).createTopicConnection();
113 TopicSession ts = tc.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
114
115 Topic l = ts.createTopic(LocalChordNode.CHORD_NODE_LOCATOR_TOPIC);
116 Topic tt = ts.createTemporaryTopic();
117
118 TopicPublisher p = ts.createPublisher(l);
119 TopicSubscriber s = ts.createSubscriber(tt);
120
121 tc.start();
122
123 Message m = ts.createMessage();
124 m.setJMSReplyTo(tt);
125 p.publish(m);
126
127 List uris = new ArrayList();
128 Message urim = s.receive(DEFAULT_QUERY_TIMEOUT);
129 while(urim != null)
130 {
131 try
132 {
133 String responder = urim.getStringProperty(LocalChordNode.CHORD_NODE_LOCATOR_URI_PROP);
134 uris.add(URI.create(responder));
135 }
136 catch (JMSException e) {
137 // ignore this message then
138 }
139
140 urim = s.receive(DEFAULT_QUERY_TIMEOUT);
141 }
142
143 s.close();
144 p.close();
145 ts.close();
146 tc.close();
147
148 return uris;
149 }
150
151 /**
152 * The default query timeout to wait for a response from the
153 * remote node.
154 */
155 public static final long DEFAULT_QUERY_TIMEOUT = 5000L;
156
157 private RemoteChordNode(ChordIdentifierFactory f,
158 URI remoteURI)
159 {
160 ChordIdentifier id = f.getIdentifier(remoteURI);
161 if (id == null) {
162 // find node at this URI.
163 try
164 {
165 this.uri = (URI)findNodeAtURI(remoteURI).get(0);
166 this.identifier = f.getIdentifier(uri);
167 if (uri == null ||
168 identifier == null)
169 throw new java.lang.IllegalStateException("No chord nodes found at the URI " + uri);
170 }
171 catch (JMSException e) {
172 throw new java.lang.IllegalStateException(e.getMessage());
173 }
174 } else {
175 this.uri = remoteURI;
176 this.identifier = id;
177 }
178
179 this.queryMutex = new Mutex();
180 this.commandTopic = LocalChordNode.getCommandTopic(identifier);
181 this.connected = false;
182 }
183
184 /**
185 * Read resolution provides us a way to go through
186 * our cached local connections, rather than creating new ones
187 * every time.
188 */
189 private Object readResolve() throws ObjectStreamException
190 {
191 RemoteChordNode lookup = lookup(this.uri);
192 if (lookup == null) {
193 lookup = this;
194 lookup.queryMutex = new Mutex();
195 cache(this.uri, this);
196 }
197 return lookup;
198 }
199
200 /**
201 * Connects to the service provider for the remote node.
202 */
203 public void connect()
204 throws JMSException
205 {
206 tc = new URLTopicConnectionFactory(uri.toString()).createTopicConnection();
207 ts = tc.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
208
209 replyTopic = ts.createTemporaryTopic();
210 replySub = ts.createSubscriber(replyTopic);
211
212 tpub = ts.createPublisher(ts.createTopic(commandTopic));
213
214 tc.start();
215 this.connected = true;
216 }
217
218 public void close()
219 {
220 try
221 {
222 tpub.close();
223 ts.close();
224 tc.close();
225
226 this.connected = false;
227 }
228 catch (JMSException e) {
229 // we are closing, it's ok
230 }
231 }
232
233 /**
234 * Returns true if the <code>connect</code> method has been invoked
235 * successfully.
236 * @return true if connected, false otherwise.
237 */
238 public boolean isConnected()
239 {
240 return connected;
241 }
242
243 public String toString()
244 {
245 return "remote id=" + identifier();
246 }
247
248 public String toHtml()
249 {
250 StringBuffer sb = new StringBuffer("<html>");
251 sb.append(uri.toString());
252
253 Iterator iter = ((Collection)query(LocalChordNode.CHORD_LOCAL_KEYS)).iterator();
254 if (iter.hasNext())
255 sb.append("<br><br><b>Local Contents:</b><br>");
256
257 while (iter.hasNext())
258 {
259 Object element = iter.next();
260 sb.append("<tt>");
261 sb.append(element.toString());
262 sb.append("</tt><br>");
263 }
264
265 sb.append("</html>");
266 return sb.toString();
267 }
268
269 /**
270 * Returns the identifier for this node.
271 *
272 * @return the node identifier in the identifier space.
273 */
274 public ChordIdentifier identifier()
275 {
276 return identifier;
277 }
278
279 /**
280 * Returns the successor of this node.
281 *
282 * @return a ChordNode that is the direct successor of this node.
283 */
284 public ChordNode successor()
285 {
286 // TODO: possibly optimize - we are sending the whole
287 // finger table over the network, when we really only
288 // have to send a single node.
289 return fingers()[0];
290 }
291
292 public ChordNode predecessor()
293 {
294 return (ChordNode)query(LocalChordNode.CHORD_PREDECESSOR);
295 }
296
297 /**
298 * Instructs the node that the specified node may be its predecessor.
299 *
300 * @param n a node, which is likely the predecessor for this node.
301 */
302 public void notify(ChordNode n)
303 {
304 ensureConnected();
305
306 try
307 {
308 synchronized(tpub) {
309 tpub.publish(ChordMessage.createNotifyMessage(ts,
310 false,
311 RemoteChordNode.getRemoteRepresentation((ChordNodeProvider)n)).getJMSMessage());
312 }
313 }
314 catch (JMSException e) {
315 throw new java.lang.IllegalStateException("Remote node is unavailable.");
316 }
317 }
318
319 public void notifyGoingAway(ChordNode n)
320 {
321 ensureConnected();
322
323 try
324 {
325 synchronized(tpub) {
326 tpub.publish(ChordMessage.createNotifyMessage(ts,
327 true,
328 RemoteChordNode.getRemoteRepresentation((ChordNodeProvider)n)).getJMSMessage());
329 }
330 }
331 catch (JMSException e) {
332 throw new java.lang.IllegalStateException("Remote node is unavailable.");
333 }
334 }
335
336 /**
337 * Returns the element of the finger table
338 * that most closely precedes the given identifier.
339 *
340 * @return a ChordNode representing the closest finger to the identifier.
341 */
342 public ChordNode closestPrecedingFinger(ChordIdentifier id)
343 {
344 ChordNode[] fingers = fingers();
345 for (int i = fingers.length - 1; i >= 0; i--)
346 {
347 if (Interval.elementOf(fingers[i].identifier(),
348 identifier,
349 id,
350 false, true))
351 return fingers[i];
352 }
353
354 return this;
355 }
356
357 /**
358 * Joins the node to the specified infrastructure. This causes the node
359 * to perform any logic that is required to inform itself or
360 * other nodes in the infrastructure about its presence.
361 *
362 * @param i the infrastructure that the node is joining
363 * @param m the size of the finger table
364 */
365 public void join(ChordInfrastructure i)
366 {
367 throw new UnsupportedOperationException("Remote cache nodes cannot join an infrastructure.");
368 }
369
370 public void leave(ChordInfrastructure i)
371 {
372 throw new UnsupportedOperationException("Remote cache nodes cannot leave an infrastructure.");
373 }
374
375 /**
376 * Returns the finger table for this node. The finger table is
377 * a set of nodes, such that the <i>i</i><super>th</super> entry
378 * contains the identity of the first node that succeeds this node by at least
379 * 2 <super>i-1</super> on the identifier circle.
380 *
381 * @return a ChordNode[] containing the fingers for this node.
382 */
383 public ChordNode[] fingers()
384 {
385 ChordNode[] fingers = (ChordNode[])query(LocalChordNode.CHORD_FINGER_TABLE_KEY);
386 if (fingers == null) {
387 throw new java.lang.IllegalStateException("This node is in isolation. The finger table is empty.");
388 }
389
390 return fingers;
391 }
392
393 /**
394 * Stores the specified object at the given key, on this node.
395 * The node may refuse the key if the hash value is not consistent.
396 *
397 * @throws IllegalStateException if the node is read-only
398 * @throws IllegalArgumentException if storing the key-value pair
399 * would break the chord invariants.
400 */
401 public void store(Object key, Object value)
402 {
403 ensureConnected();
404
405 try
406 {
407 ChordMessage cm = ChordMessage.createStoreMessage(ts,
408 key,
409 value);
410 synchronized(tpub) {
411 tpub.publish(cm.getJMSMessage());
412 }
413 }
414 catch (JMSException e) {
415 throw new java.lang.IllegalStateException("Destination node is unavailable.");
416 }
417 }
418
419 /**
420 * Queries the node for the object indexed at the specified key.
421 *
422 */
423 public synchronized Object query(Object key)
424 {
425 ensureConnected();
426
427 try
428 {
429 ChordQueryMessage cm = ChordMessage.createQueryMessage(ts, replyTopic, key);
430
431 // get the query mutex
432 queryMutex.acquire();
433
434 // publish the query & wait for the reply, synchronously.
435 assert isConnected();
436 synchronized(tpub) {
437 tpub.publish(cm.getJMSMessage());
438 }
439
440 // get the reply
441 Message jmsMessage = replySub.receive(DEFAULT_QUERY_TIMEOUT);
442 if (jmsMessage != null)
443 {
444 ChordMessage m = ChordMessage.createFromIncoming(jmsMessage);
445 assert m instanceof ChordValueMessage;
446
447 ChordValueMessage v = ((ChordValueMessage)m);
448 assert v.getQueryId() == cm.getQueryId() : "expected result on " + replyTopic + " for " + cm.getQueryId() + " but was " + v.getQueryId();
449
450 // we have a reply. our model is for a single
451 // outstanding query per object at a time.
452 Utility.getLogger().info("got response on " + replyTopic + " for " + cm.getQueryId());
453 assert replySub.receiveNoWait() == null : "extra reply message";
454 return v.getValue();
455 }
456
457 // we didn't get a reply.
458 //TODO: handle the failure of the destination node more
459 // gracefully
460 assert false;
461 return null;
462 }
463 catch (JMSException e) {
464 throw new java.lang.IllegalStateException("Destination node is unavailable.");
465 }
466 catch (InterruptedException ie) {
467 throw new UnknownError("Interrupted while acquiring query mutex.");
468 }
469 finally {
470 queryMutex.release();
471 }
472 }
473
474 private synchronized void ensureConnected()
475 {
476 // connect if necessary
477 try
478 {
479 if (!isConnected())
480 connect();
481 }
482 catch (JMSException e) {
483 throw new java.lang.IllegalStateException("Unable to lazily connect. " + e.getMessage());
484 }
485 }
486
487 public int hashCode()
488 {
489 return identifier.hashCode();
490 }
491
492 public boolean equals(Object o)
493 {
494 if (o instanceof ChordNode)
495 {
496 return ((ChordNode)o).identifier().equals(identifier());
497 }
498 else return false;
499 }
500 }