Docjar: A Java Source and Docuemnt Enginecom.*    java.*    javax.*    org.*    all    new    plug-in

Quick Search    Search Deep

Source code: com/ubermq/chord/jms/LocalChordNode.java


1   package com.ubermq.chord.jms;
2   
3   import EDU.oswego.cs.dl.util.concurrent.*;
4   import com.ubermq.*;
5   import com.ubermq.chord.*;
6   import com.ubermq.jms.client.*;
7   import com.ubermq.jms.server.*;
8   import com.ubermq.kernel.*;
9   import java.net.*;
10  import java.rmi.*;
11  import java.util.*;
12  import javax.jms.*;
13  
14  /**
15   * A local chord node that provides storage functionality to the
16   * infrastructure. <P>
17   *
18   * This implementation can be interacted with via JMS chord messages.
19   */
20  public class LocalChordNode
21      extends AbstractChordNode
22      implements ChordNodeProvider,
23      MessageListener
24  {
25      private final MessageServer ms;
26      private final URI serviceURI;
27  
28      private final Map data;
29  
30      private final TopicConnection tc;
31      private final TopicSession ts;
32      private final TopicSubscriber tsub;
33      private final TopicSubscriber announceSub;
34      private final TopicPublisher tpub;
35  
36      /**
37       * The default identifier factory, usable for most applications.
38       */
39      public static final ChordIdentifierFactory DEFAULT_IDENTIFIER_FACTORY =
40          BoundedChordIdentifier.getFactory(6);
41  
42      /**
43       * The prefix for chord command topics.
44       */
45      public static final String CHORD_COMMAND_PREFIX = "com.ubermq.chord.command.";
46  
47      /**
48       * Broadcasting a message to this topic with the Query property set to
49       * true will invoke a set of replies on the same topic that have the URI
50       * property set to a valid addressable node URI.
51       */
52      public static final String CHORD_NODE_LOCATOR_TOPIC = "com.ubermq.chord.nodes";
53      public static final String CHORD_NODE_LOCATOR_QUERY_PROP = "query";
54      public static final String CHORD_NODE_LOCATOR_URI_PROP = "uri";
55  
56      /**
57       * A special key that contains a List of ChordNode objects describing
58       * this node's finger table. This key can be queried through normal
59       * query operations.
60       */
61      public static final String CHORD_FINGER_TABLE_KEY = "com.ubermq.chord.finger-table";
62  
63      /**
64       * The predecessor node special query key.
65       */
66      public static final String CHORD_PREDECESSOR = "com.ubermq.chord.predecessor";
67  
68      /**
69       * The node identifier.
70       */
71      public static final String CHORD_IDENTIFIER = "com.ubermq.chord.identifier";
72  
73      /**
74       * A Collection of keys stored at this node.
75       */
76      public static final String CHORD_LOCAL_KEYS = "com.ubermq.chord.local-keys";
77  
78      /**
79       * A clock daemon for periodic refresh operations.
80       */
81      private static final ClockDaemon cd = new ClockDaemon();
82  
83      /**
84       * Period used for stabilization operations.
85       */
86      private static final long STABILIZE_PERIOD = 5000L,
87          FIX_FINGER_PERIOD = 5000;
88  
89      /**
90       * Starts a local in-process message server with the given set of properties,
91       * and returns the node created. The node is initially not part of an
92       * infrastructure. <P>
93       *
94       * The node must be joined to an existing
95       * infrastructure by the caller, by using the <code>join</code> method.
96       *
97       * @param properties a set of properties with which to start the local
98       * message server.
99       */
100     public static ChordNodeProvider createServerNode(MessageServer ms,
101                                                      ChordIdentifierFactory identifierFactory)
102         throws RemoteException
103     {
104         // create a local node
105         try
106         {
107             final ChordNodeProvider p = new LocalChordNode(ms,
108                                                            identifierFactory.createRandomIdentifier(),
109                                                            identifierFactory.getFingerTableSize());
110 
111             // stabilize and fixFingers periodically
112             cd.executePeriodically(STABILIZE_PERIOD,
113                                    new Runnable() {
114                         public void run() {
115                             try {
116                                 p.stabilize();
117                             } catch(Exception x) {
118                                 com.ubermq.Utility.getLogger().throwing("","",x);
119                             }
120                         }
121                     }, false);
122             cd.executePeriodically(FIX_FINGER_PERIOD,
123                                    new Runnable() {
124                         public void run() {
125                             try {
126                                 p.fixFingers();
127                             } catch(Exception x) {
128                                 com.ubermq.Utility.getLogger().throwing("","",x);
129                             }
130                         }
131                     }, false);
132 
133             return p;
134         }
135         catch (javax.jms.JMSException e) {
136             throw new RemoteException(e.getMessage());
137         }
138     }
139 
140     /**
141      * Constructs a chord server node at the local machine.
142      * This requires a local JMS message server.
143      *
144      * @param ms a message server running in-process.
145      * @param id the node identifier.
146      *
147      * @throws IllegalArgumentException if another node on the
148      * message server has the same identifier.
149      */
150     public LocalChordNode(MessageServer ms,
151                           ChordIdentifier id,
152                           int m)
153         throws JMSException
154     {
155         super(id, m, RemoteChordNode.createRemoteInstance(id.factory(), getServiceURI(ms.getServiceURI(), id)));
156         this.ms = ms;
157         this.data = new HashMap();
158         this.serviceURI = getServiceURI(ms.getServiceURI(), id);
159 
160         // check if anyone else has this ID
161         if (RemoteChordNode.findNodeAtURI(ms.getServiceURI()).contains(getServiceURI()))
162             throw new IllegalArgumentException("Identifier collision");
163 
164         // connect
165         this.tc = new PipeTopicConnectionFactory(ms).createTopicConnection();
166         this.ts = tc.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
167 
168         this.tpub = ts.createPublisher(null);
169 
170         this.tsub = ts.createSubscriber(ts.createTopic(getCommandTopic(id)));
171         this.tsub.setMessageListener(this);
172 
173         this.announceSub = ts.createSubscriber(ts.createTopic(CHORD_NODE_LOCATOR_TOPIC));
174         announceSub.setMessageListener(new MessageListener() {
175                     public void onMessage(Message p0)
176                     {
177                         try
178                         {
179                             assert p0.getJMSReplyTo() != null;
180 
181                             Message m = ts.createMessage();
182                             m.setStringProperty(CHORD_NODE_LOCATOR_URI_PROP,
183                                                 getServiceURI().toString());
184                             tpub.publish((Topic)p0.getJMSReplyTo(), m);
185                         }
186                         catch (JMSException e) {
187                             // not a query message... ignore
188                         }
189                     }
190                 });
191 
192         tc.start();
193     }
194 
195     /**
196      * Returns the topic that should be used to communicate
197      * with a chord node of the given identifier.
198      *
199      * @return a topic name
200      */
201     public static String getCommandTopic(ChordIdentifier id)
202     {
203         return CHORD_COMMAND_PREFIX + id;
204     }
205 
206     /**
207      * Closes the node's resources.
208      */
209     public void close()
210     {
211         try
212         {
213             tpub.close();
214             tsub.close();
215             announceSub.close();
216             ts.close();
217             tc.close();
218         }
219         catch (JMSException e) {
220             // going away anyway
221         }
222     }
223 
224     /**
225      * Stores the specified object at the given key, on this node.
226      * The node may refuse the key if the hash value is not consistent.
227      *
228      * @throws IllegalStateException if the node is read-only
229      * @throws IllegalArgumentException if storing the key-value pair
230      * would break the chord invariants.
231      */
232     public void store(Object key, Object value)
233     {
234         data.put(key, value);
235     }
236 
237     /**
238      * Returns the URI that clients should connect to
239      * for communication with this node.
240      */
241     public URI getServiceURI()
242     {
243         return serviceURI;
244     }
245 
246     /**
247      * Returns the service URI that corresponds to the given
248      * server base URI and chord node, by convention.
249      *
250      * @param serverURI a server URI
251      * @param id a chord identifier
252      * @return the URI that can be used to address the
253      * node having identifier <code>id</code> at the specified
254      * server location.
255      */
256     public static URI getServiceURI(URI serverURI,
257                                     ChordIdentifier id)
258     {
259         try
260         {
261             return new URI(serverURI.getScheme(),
262                            serverURI.getUserInfo(),
263                            serverURI.getHost(),
264                            serverURI.getPort(),
265                            "/" + id.toString(),
266                            null,
267                            null);
268         }
269         catch (URISyntaxException e) {
270             throw new java.lang.IllegalStateException(e.getMessage());
271         }
272     }
273 
274     /**
275      * Queries the node for the object indexed at the specified key.
276      *
277      * @throws ItemNotFoundException if there is no object at this node
278      * for the given key.
279      */
280     public Object query(Object key)
281     {
282         // "special" keys
283         if (key.equals(CHORD_PREDECESSOR))
284             return predecessor();
285         else if (key.equals(CHORD_FINGER_TABLE_KEY))
286             return fingers();
287         else if (key.equals(CHORD_IDENTIFIER))
288             return identifier();
289         else if (key.equals(CHORD_LOCAL_KEYS))
290             return keys();
291 
292         // arbitrary data.
293         return data.get(key);
294     }
295 
296     /**
297      * Returns the collection of keys that are stored locally at
298      * this node.  This method is implemented by subclasses for
299      * use by algorithms implemented in this object.
300      */
301     protected Collection keys()
302     {
303         Set s = new HashSet();
304         s.addAll(data.keySet());
305         return s;
306     }
307 
308     /**
309      * Moves an item from the local data store to another
310      * node as specified. This is an abstract method so
311      * subclasses can make choices like retaining
312      * the moved item, or ignoring the move request altogether.
313      */
314     protected void moveItem(Object key, ChordNode destination)
315     {
316         destination.store(key, data.remove(key));
317         com.ubermq.Utility.getLogger().fine("moved object at " + key + " to " + destination);
318     }
319 
320     /**
321      * Responds to JMS messages, an alternate way to store
322      * or query from this node. This should typically only be called by
323      * the messaging infrastructure.<P>
324      *
325      * @param    p0    a JMS representation of a chord message
326      */
327     public void onMessage(Message p0)
328     {
329         ChordMessage m = ChordMessage.createFromIncoming(p0);
330         try
331         {
332                 m.execute(ts,
333                           tpub,
334                           this);
335         }
336         catch (JMSException e) {
337             Utility.getLogger().throwing("","",e);
338         }
339     }
340 
341     public String toString()
342     {
343         return "Local Chord Node id=" + identifier() + " url=" + getServiceURI();
344     }
345 
346     /**
347      * Provides capabilities to host a chord node here.<P>
348      *
349      * args: host-port number-of-nodes [well-known-entry-point]
350      *
351      */
352     public static void main(String[] args)
353     {
354         Properties p = new Properties();
355         p.put(ServerConfig.ADMIN_ENABLE, "false");
356         p.put(ConfigConstants.SERVER_PORT, args[0]);
357         MessageServer ms = new MessageServer(p);
358         ms.run();
359         System.out.println(ms.getServiceURI());
360 
361         // start chord node
362         int n = Integer.valueOf(args[1]).intValue();
363         URI joinURI = null;
364         try
365         {
366             if (args.length > 2)
367                 joinURI = (URI)RemoteChordNode.findNodeAtURI(URI.create(args[2])).get(0);
368         }
369         catch (JMSException e) {
370             e.printStackTrace();
371             joinURI = null;
372         }
373         System.out.println("WANTS to join: " + joinURI);
374 
375         // create n nodes.
376         for (int i = 0; i < n; i++)
377         {
378             try
379             {
380                 ChordNodeProvider theNode = LocalChordNode.createServerNode(ms, DEFAULT_IDENTIFIER_FACTORY);
381                 if (joinURI != null) {
382                     theNode.join(JMSChordInfrastructure.getInfrastructure(DEFAULT_IDENTIFIER_FACTORY, joinURI));
383                 } else {
384                     joinURI = theNode.getServiceURI();
385                 }
386 
387                 System.out.println("UberChord node running at " + ms.getServiceUrl() + " with id " + theNode.identifier());
388             }
389             catch (Exception e) {
390                 e.printStackTrace();
391             }
392         }
393 
394     }
395 }