Source code: com/ubermq/chord/jms/ChordMessage.java
1 package com.ubermq.chord.jms;
2
3 import com.ubermq.chord.*;
4 import java.util.*;
5 import javax.jms.*;
6
7 /**
8 * A chord message represents a query or
9 * store operation that occurs on a remote node. <P>
10 *
11 * Internally, chord messages are represented as JMS
12 * messages, and can be easily converted back and forth.<P>
13 */
14 public class ChordMessage
15 {
16 private Message m;
17 private int command;
18
19 /**
20 * The JMS property that contains the chord command operation value.
21 */
22 public static final String CHORD_COMMAND_PROPERTY = "com.ubermq.chord.command";
23
24 /**
25 * A NOOP performs no operation. It is mainly used to verify
26 * that a node is functioning properly.
27 */
28 public static final int CHORD_NOOP = 0;
29
30 /**
31 * A store operation is an Object message that contains a List
32 * of exactly two items: a key and the value to store at the key.
33 */
34 public static final int CHORD_STORE = 1;
35
36 /**
37 * A query operation is an Object message that contains the key
38 * in the message body, and specifies a JMSReplyTo topic on which
39 * the associated value should be sent.
40 */
41 public static final int CHORD_QUERY = 2;
42
43 /**
44 * A chord message that contains a value that is the result of
45 * a query operation. The value is an Object message containing
46 * a List containing the key originally queried, and the value that
47 * was discovered, if any.
48 */
49 public static final int CHORD_VALUE = 3;
50
51 /**
52 * A chord message that notifies a node of a new immediate
53 * predecessor. The value is an Object message containing
54 * the ChordNode of the new predecessor.
55 */
56 public static final int CHORD_NOTIFY = 4;
57
58 /**
59 * Creates a chord message from an incoming
60 * JMS message.
61 *
62 * @throws IllegalArgumentException if the message is not a
63 * chord message.
64 */
65 ChordMessage(Message m)
66 {
67 this.m = m;
68 try
69 {
70 this.command = m.getIntProperty(CHORD_COMMAND_PROPERTY);
71 }
72 catch (JMSException e) {
73 throw new IllegalArgumentException("Message does not appear to be a chord message.");
74 }
75 }
76
77 /**
78 * Creates a new outgoing chord message with the given
79 * JMS message and command code.
80 *
81 * @param m a newly created outbound message
82 * @param command the chord command code
83 */
84 ChordMessage(Message m,
85 int command)
86 throws JMSException
87 {
88 this.m = m;
89 this.command = command;
90 m.setIntProperty(CHORD_COMMAND_PROPERTY, command);
91 }
92
93 public static ChordMessage createFromIncoming(Message m)
94 {
95 try
96 {
97 switch(m.getIntProperty(CHORD_COMMAND_PROPERTY))
98 {
99 case ChordMessage.CHORD_NOOP:
100 return new ChordMessage(m);
101 case ChordMessage.CHORD_STORE:
102 return new ChordStoreMessage(m);
103 case ChordMessage.CHORD_QUERY:
104 return new ChordQueryMessage(m);
105 case ChordMessage.CHORD_VALUE:
106 return new ChordValueMessage(m);
107 case ChordMessage.CHORD_NOTIFY:
108 return new ChordNotifyMessage(m);
109 default:
110 throw new IllegalArgumentException("Invalid chord message.");
111 }
112 }
113 catch (JMSException e) {
114 throw new IllegalArgumentException(e.getMessage());
115 }
116 }
117
118 /**
119 * Creates a chord message that represents a store operation.
120 *
121 * @param session a session, used for message creation
122 * @param key the key to store under
123 * @param value the value to store
124 *
125 * @return a chord message that can be sent over JMS
126 *
127 * @throws JMSException if the message cannot be created
128 */
129 public static ChordMessage createStoreMessage(Session session,
130 Object key,
131 Object value)
132 throws JMSException
133 {
134 return new ChordStoreMessage(session, key, value);
135 }
136
137 /**
138 * Creates a chord message that represents a store operation.
139 *
140 * @param session a session, used for message creation
141 * @param replyTopic the topic that the value message will be sent to
142 * @param key the key to look up
143 *
144 * @return a chord message that can be sent over JMS
145 *
146 * @throws JMSException if the message cannot be created
147 */
148 public static ChordQueryMessage createQueryMessage(Session session,
149 Topic replyTopic,
150 Object key)
151 throws JMSException
152 {
153 return new ChordQueryMessage(session, replyTopic, key);
154 }
155
156 /**
157 * Creates a chord message that represents the reply that is sent
158 * as a result of a query operation.
159 *
160 * @param session a session, used for message creation
161 * @param replyTopic the topic that the value message will be sent to
162 * @param key the key to look up
163 *
164 * @return a chord message that can be sent over JMS
165 *
166 * @throws JMSException if the message cannot be created
167 */
168 public static ChordMessage createValueMessage(Session session,
169 long queryId,
170 Object key,
171 Object value)
172 throws JMSException
173 {
174 return new ChordValueMessage(session, queryId, key, value);
175 }
176
177 /**
178 * Creates a notification message, either a standard <code>notify</code>
179 * or a <codE>notifyGoingAway</code> depending on the goingAway flag
180 * specified.
181 */
182 public static ChordMessage createNotifyMessage(Session session,
183 boolean goingAway,
184 ChordNode node)
185 throws JMSException
186 {
187 return new ChordNotifyMessage(session, goingAway, node);
188 }
189
190 /**
191 * Returns the chord command (one of the <code>CHORD_xxx</code> constants)
192 * that this object represents.
193 *
194 * @return a <code>CHORD_xxx</code> value.
195 */
196 int getCommand()
197 {
198 return command;
199 }
200
201 /**
202 * Provides the underlying JMS message that can be sent
203 * over a JMS transport layer.
204 *
205 * @return a JMS message representing a chord command
206 *
207 */
208 public Message getJMSMessage()
209 {
210 return m;
211 }
212
213 /**
214 * Executes the contained command on the local node
215 * specified.
216 *
217 * @param session a topic session
218 * @param pub an unbound topic publisher for sending replies
219 * @param p a local node provider
220 * @throws JMSException if an error occurs in the JMS layer.
221 */
222 public void execute(TopicSession session,
223 TopicPublisher pub,
224 ChordNodeProvider p)
225 throws JMSException
226 {
227 }
228
229 }