Source code: com/ubermq/chord/jms/ChordQueryMessage.java
1 package com.ubermq.chord.jms;
2
3 import com.ubermq.chord.*;
4 import javax.jms.*;
5
6 /**
7 * A chord query command.
8 */
9 public final class ChordQueryMessage
10 extends ChordMessage
11 {
12 private Object key;
13 private Topic replyTopic;
14 private long queryId;
15
16 public static final String CHORD_QUERY_ID_PROPERTY = "query-id";
17
18 /**
19 * Constructs a chord query message.
20 * @throws IllegalArgumentException if the message is invalid.
21 */
22 ChordQueryMessage(Message m)
23 {
24 super(m);
25
26 try
27 {
28 this.key = ((ObjectMessage)m).getObject();
29 this.replyTopic = (Topic)m.getJMSReplyTo();
30 this.queryId = m.getLongProperty(CHORD_QUERY_ID_PROPERTY);
31 }
32 catch (JMSException e) {
33 throw new IllegalArgumentException("Invalid chord query message.");
34 }
35 }
36
37 /**
38 * Constructs an outgoing query message.
39 *
40 * @param session a session, used for message creation
41 * @param replyTopic the topic that the value message will be sent to
42 * @param key the key to look up
43 *
44 */
45 ChordQueryMessage(Session session,
46 Topic replyTopic,
47 Object key)
48 throws JMSException
49 {
50 super(session.createObjectMessage(),
51 CHORD_QUERY);
52
53 this.key = key;
54 this.replyTopic = replyTopic;
55 this.queryId = com.ubermq.Utility.allocateLocallyUniqueLong();
56
57 ObjectMessage m = (ObjectMessage)getJMSMessage();
58 m.setObject((java.io.Serializable)key);
59 m.setLongProperty(CHORD_QUERY_ID_PROPERTY, queryId);
60 m.setJMSReplyTo(replyTopic);
61 }
62
63 /**
64 * Returns the key to query.
65 * @return a key
66 */
67 public Object getKey()
68 {
69 return key;
70 }
71
72 public long getQueryId()
73 {
74 return queryId;
75 }
76
77 public void execute(TopicSession session,
78 TopicPublisher pub,
79 ChordNodeProvider p)
80 throws JMSException
81 {
82 // obtain value
83 Object value = p.query(getKey());
84
85 // construct reply, and send it
86 try
87 {
88 ChordMessage reply = ChordMessage.createValueMessage(session,
89 queryId,
90 key,
91 value);
92 pub.publish(replyTopic, reply.getJMSMessage());
93 }
94 catch (ClassCastException e) {
95 throw new IllegalArgumentException(e.getMessage());
96 }
97 }
98 }
99