Source code: com/aendvari/hermes/broker/MessageRequestor.java
1 /*
2 * MessageRequestor.java
3 *
4 * Copyright (c) 2001, 2002 Aendvari, Ltd. All Rights Reserved.
5 *
6 * See the file LICENSE for terms of use.
7 *
8 */
9
10 package com.aendvari.hermes.broker;
11
12 /**
13 * <p>Provides a mechanism to publish a message and listen for a response.</p>
14 *
15 * <p>This effectively makes the message publish/listen process work similar to
16 * a method call. The listener receiving the initial message would check the
17 * "replyTo" property of the {@link Message} it receives. If a {@link MessageTopic}
18 * is provided, then a response message is expected.</p>
19 *
20 * <p>The properties of the published and return messages must be agreed upon by
21 * both parties.</p>
22 *
23 * @author Trevor Milne
24 *
25 */
26
27 public class MessageRequestor implements MessageListener
28 {
29 /** The {@link MessageBrokerConnection} to publish through. */
30 protected MessageBrokerConnection connection;
31
32 /** The {@link MessageTopic} to publish requests to. */
33 protected MessageTopic requestTopic;
34
35 /** The response received from the request. */
36 protected Message response;
37
38 /** The next available response identifier. */
39 protected static int nextResponseIdentifier;
40
41
42 /* Constants. */
43
44
45 /** A prefix for the temporary topic receiving the response. */
46 protected static final String ResponseTopic = "temporary-topic-";
47
48
49 /* Constructors. */
50
51
52 /**
53 * Constructs a <code>MessageRequestor</code> for the given connection and topic.
54 *
55 * @param setConnection The {@link MessageBrokerConnection} to publish through.
56 * @param setTopic The {@link MessageTopic} to publish requests to.
57 *
58 */
59
60 public MessageRequestor(MessageBrokerConnection setConnection, MessageTopic setTopic)
61 {
62 connection = setConnection;
63 requestTopic = setTopic;
64
65 response = null;
66 nextResponseIdentifier = 0;
67 }
68
69
70 /* Message requests. */
71
72
73 /**
74 * Publishes the specified message and returns the response.
75 *
76 * @param request The {@link Message} to publish.
77 *
78 * @return The reponse {@link Message}. Null, if not response is made.
79 *
80 */
81
82 public Message request(Message request)
83 {
84 // create a unique identifier
85 int responseIdentifier = 0;
86
87 synchronized (this)
88 {
89 responseIdentifier = nextResponseIdentifier++;
90 }
91
92 // create a temporary topic to listen for the response
93 String responseTopicString = ResponseTopic + responseIdentifier;
94 MessageTopic responseTopic = connection.getTopic(responseTopicString);
95
96 // set message up to send a reply
97 request.setReplyTo(responseTopic);
98 request.setCorrelationId(responseIdentifier);
99
100 // send response to self
101 connection.subscribe(responseTopic, this);
102
103 // publish request
104 connection.publish(requestTopic, request);
105
106 // unsubscribe from the temporary topic
107 connection.unsubscribe(responseTopic, this);
108
109 return response;
110 }
111
112
113 /* MessageListener. */
114
115
116 /**
117 * Listens for a response.
118 *
119 * @param message The {@link Message} received.
120 *
121 */
122
123 public void onMessage(Message message)
124 {
125 response = message;
126 }
127 }
128