Source code: com/ubermq/chord/jms/LocalChordNode.java
1 package com.ubermq.chord.jms;
2
3 import EDU.oswego.cs.dl.util.concurrent.*;
4 import com.ubermq.*;
5 import com.ubermq.chord.*;
6 import com.ubermq.jms.client.*;
7 import com.ubermq.jms.server.*;
8 import com.ubermq.kernel.*;
9 import java.net.*;
10 import java.rmi.*;
11 import java.util.*;
12 import javax.jms.*;
13
14 /**
15 * A local chord node that provides storage functionality to the
16 * infrastructure. <P>
17 *
18 * This implementation can be interacted with via JMS chord messages.
19 */
20 public class LocalChordNode
21 extends AbstractChordNode
22 implements ChordNodeProvider,
23 MessageListener
24 {
25 private final MessageServer ms;
26 private final URI serviceURI;
27
28 private final Map data;
29
30 private final TopicConnection tc;
31 private final TopicSession ts;
32 private final TopicSubscriber tsub;
33 private final TopicSubscriber announceSub;
34 private final TopicPublisher tpub;
35
36 /**
37 * The default identifier factory, usable for most applications.
38 */
39 public static final ChordIdentifierFactory DEFAULT_IDENTIFIER_FACTORY =
40 BoundedChordIdentifier.getFactory(6);
41
42 /**
43 * The prefix for chord command topics.
44 */
45 public static final String CHORD_COMMAND_PREFIX = "com.ubermq.chord.command.";
46
47 /**
48 * Broadcasting a message to this topic with the Query property set to
49 * true will invoke a set of replies on the same topic that have the URI
50 * property set to a valid addressable node URI.
51 */
52 public static final String CHORD_NODE_LOCATOR_TOPIC = "com.ubermq.chord.nodes";
53 public static final String CHORD_NODE_LOCATOR_QUERY_PROP = "query";
54 public static final String CHORD_NODE_LOCATOR_URI_PROP = "uri";
55
56 /**
57 * A special key that contains a List of ChordNode objects describing
58 * this node's finger table. This key can be queried through normal
59 * query operations.
60 */
61 public static final String CHORD_FINGER_TABLE_KEY = "com.ubermq.chord.finger-table";
62
63 /**
64 * The predecessor node special query key.
65 */
66 public static final String CHORD_PREDECESSOR = "com.ubermq.chord.predecessor";
67
68 /**
69 * The node identifier.
70 */
71 public static final String CHORD_IDENTIFIER = "com.ubermq.chord.identifier";
72
73 /**
74 * A Collection of keys stored at this node.
75 */
76 public static final String CHORD_LOCAL_KEYS = "com.ubermq.chord.local-keys";
77
78 /**
79 * A clock daemon for periodic refresh operations.
80 */
81 private static final ClockDaemon cd = new ClockDaemon();
82
83 /**
84 * Period used for stabilization operations.
85 */
86 private static final long STABILIZE_PERIOD = 5000L,
87 FIX_FINGER_PERIOD = 5000;
88
89 /**
90 * Starts a local in-process message server with the given set of properties,
91 * and returns the node created. The node is initially not part of an
92 * infrastructure. <P>
93 *
94 * The node must be joined to an existing
95 * infrastructure by the caller, by using the <code>join</code> method.
96 *
97 * @param properties a set of properties with which to start the local
98 * message server.
99 */
100 public static ChordNodeProvider createServerNode(MessageServer ms,
101 ChordIdentifierFactory identifierFactory)
102 throws RemoteException
103 {
104 // create a local node
105 try
106 {
107 final ChordNodeProvider p = new LocalChordNode(ms,
108 identifierFactory.createRandomIdentifier(),
109 identifierFactory.getFingerTableSize());
110
111 // stabilize and fixFingers periodically
112 cd.executePeriodically(STABILIZE_PERIOD,
113 new Runnable() {
114 public void run() {
115 try {
116 p.stabilize();
117 } catch(Exception x) {
118 com.ubermq.Utility.getLogger().throwing("","",x);
119 }
120 }
121 }, false);
122 cd.executePeriodically(FIX_FINGER_PERIOD,
123 new Runnable() {
124 public void run() {
125 try {
126 p.fixFingers();
127 } catch(Exception x) {
128 com.ubermq.Utility.getLogger().throwing("","",x);
129 }
130 }
131 }, false);
132
133 return p;
134 }
135 catch (javax.jms.JMSException e) {
136 throw new RemoteException(e.getMessage());
137 }
138 }
139
140 /**
141 * Constructs a chord server node at the local machine.
142 * This requires a local JMS message server.
143 *
144 * @param ms a message server running in-process.
145 * @param id the node identifier.
146 *
147 * @throws IllegalArgumentException if another node on the
148 * message server has the same identifier.
149 */
150 public LocalChordNode(MessageServer ms,
151 ChordIdentifier id,
152 int m)
153 throws JMSException
154 {
155 super(id, m, RemoteChordNode.createRemoteInstance(id.factory(), getServiceURI(ms.getServiceURI(), id)));
156 this.ms = ms;
157 this.data = new HashMap();
158 this.serviceURI = getServiceURI(ms.getServiceURI(), id);
159
160 // check if anyone else has this ID
161 if (RemoteChordNode.findNodeAtURI(ms.getServiceURI()).contains(getServiceURI()))
162 throw new IllegalArgumentException("Identifier collision");
163
164 // connect
165 this.tc = new PipeTopicConnectionFactory(ms).createTopicConnection();
166 this.ts = tc.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
167
168 this.tpub = ts.createPublisher(null);
169
170 this.tsub = ts.createSubscriber(ts.createTopic(getCommandTopic(id)));
171 this.tsub.setMessageListener(this);
172
173 this.announceSub = ts.createSubscriber(ts.createTopic(CHORD_NODE_LOCATOR_TOPIC));
174 announceSub.setMessageListener(new MessageListener() {
175 public void onMessage(Message p0)
176 {
177 try
178 {
179 assert p0.getJMSReplyTo() != null;
180
181 Message m = ts.createMessage();
182 m.setStringProperty(CHORD_NODE_LOCATOR_URI_PROP,
183 getServiceURI().toString());
184 tpub.publish((Topic)p0.getJMSReplyTo(), m);
185 }
186 catch (JMSException e) {
187 // not a query message... ignore
188 }
189 }
190 });
191
192 tc.start();
193 }
194
195 /**
196 * Returns the topic that should be used to communicate
197 * with a chord node of the given identifier.
198 *
199 * @return a topic name
200 */
201 public static String getCommandTopic(ChordIdentifier id)
202 {
203 return CHORD_COMMAND_PREFIX + id;
204 }
205
206 /**
207 * Closes the node's resources.
208 */
209 public void close()
210 {
211 try
212 {
213 tpub.close();
214 tsub.close();
215 announceSub.close();
216 ts.close();
217 tc.close();
218 }
219 catch (JMSException e) {
220 // going away anyway
221 }
222 }
223
224 /**
225 * Stores the specified object at the given key, on this node.
226 * The node may refuse the key if the hash value is not consistent.
227 *
228 * @throws IllegalStateException if the node is read-only
229 * @throws IllegalArgumentException if storing the key-value pair
230 * would break the chord invariants.
231 */
232 public void store(Object key, Object value)
233 {
234 data.put(key, value);
235 }
236
237 /**
238 * Returns the URI that clients should connect to
239 * for communication with this node.
240 */
241 public URI getServiceURI()
242 {
243 return serviceURI;
244 }
245
246 /**
247 * Returns the service URI that corresponds to the given
248 * server base URI and chord node, by convention.
249 *
250 * @param serverURI a server URI
251 * @param id a chord identifier
252 * @return the URI that can be used to address the
253 * node having identifier <code>id</code> at the specified
254 * server location.
255 */
256 public static URI getServiceURI(URI serverURI,
257 ChordIdentifier id)
258 {
259 try
260 {
261 return new URI(serverURI.getScheme(),
262 serverURI.getUserInfo(),
263 serverURI.getHost(),
264 serverURI.getPort(),
265 "/" + id.toString(),
266 null,
267 null);
268 }
269 catch (URISyntaxException e) {
270 throw new java.lang.IllegalStateException(e.getMessage());
271 }
272 }
273
274 /**
275 * Queries the node for the object indexed at the specified key.
276 *
277 * @throws ItemNotFoundException if there is no object at this node
278 * for the given key.
279 */
280 public Object query(Object key)
281 {
282 // "special" keys
283 if (key.equals(CHORD_PREDECESSOR))
284 return predecessor();
285 else if (key.equals(CHORD_FINGER_TABLE_KEY))
286 return fingers();
287 else if (key.equals(CHORD_IDENTIFIER))
288 return identifier();
289 else if (key.equals(CHORD_LOCAL_KEYS))
290 return keys();
291
292 // arbitrary data.
293 return data.get(key);
294 }
295
296 /**
297 * Returns the collection of keys that are stored locally at
298 * this node. This method is implemented by subclasses for
299 * use by algorithms implemented in this object.
300 */
301 protected Collection keys()
302 {
303 Set s = new HashSet();
304 s.addAll(data.keySet());
305 return s;
306 }
307
308 /**
309 * Moves an item from the local data store to another
310 * node as specified. This is an abstract method so
311 * subclasses can make choices like retaining
312 * the moved item, or ignoring the move request altogether.
313 */
314 protected void moveItem(Object key, ChordNode destination)
315 {
316 destination.store(key, data.remove(key));
317 com.ubermq.Utility.getLogger().fine("moved object at " + key + " to " + destination);
318 }
319
320 /**
321 * Responds to JMS messages, an alternate way to store
322 * or query from this node. This should typically only be called by
323 * the messaging infrastructure.<P>
324 *
325 * @param p0 a JMS representation of a chord message
326 */
327 public void onMessage(Message p0)
328 {
329 ChordMessage m = ChordMessage.createFromIncoming(p0);
330 try
331 {
332 m.execute(ts,
333 tpub,
334 this);
335 }
336 catch (JMSException e) {
337 Utility.getLogger().throwing("","",e);
338 }
339 }
340
341 public String toString()
342 {
343 return "Local Chord Node id=" + identifier() + " url=" + getServiceURI();
344 }
345
346 /**
347 * Provides capabilities to host a chord node here.<P>
348 *
349 * args: host-port number-of-nodes [well-known-entry-point]
350 *
351 */
352 public static void main(String[] args)
353 {
354 Properties p = new Properties();
355 p.put(ServerConfig.ADMIN_ENABLE, "false");
356 p.put(ConfigConstants.SERVER_PORT, args[0]);
357 MessageServer ms = new MessageServer(p);
358 ms.run();
359 System.out.println(ms.getServiceURI());
360
361 // start chord node
362 int n = Integer.valueOf(args[1]).intValue();
363 URI joinURI = null;
364 try
365 {
366 if (args.length > 2)
367 joinURI = (URI)RemoteChordNode.findNodeAtURI(URI.create(args[2])).get(0);
368 }
369 catch (JMSException e) {
370 e.printStackTrace();
371 joinURI = null;
372 }
373 System.out.println("WANTS to join: " + joinURI);
374
375 // create n nodes.
376 for (int i = 0; i < n; i++)
377 {
378 try
379 {
380 ChordNodeProvider theNode = LocalChordNode.createServerNode(ms, DEFAULT_IDENTIFIER_FACTORY);
381 if (joinURI != null) {
382 theNode.join(JMSChordInfrastructure.getInfrastructure(DEFAULT_IDENTIFIER_FACTORY, joinURI));
383 } else {
384 joinURI = theNode.getServiceURI();
385 }
386
387 System.out.println("UberChord node running at " + ms.getServiceUrl() + " with id " + theNode.identifier());
388 }
389 catch (Exception e) {
390 e.printStackTrace();
391 }
392 }
393
394 }
395 }