Source code: com/aendvari/hermes/broker/MessageBrokerConnection.java
1 /*
2 * MessageBrokerConnection.java
3 *
4 * Copyright (c) 2001, 2002 Aendvari, Ltd. All Rights Reserved.
5 *
6 * See the file LICENSE for terms of use.
7 *
8 */
9
10 package com.aendvari.hermes.broker;
11
12 /**
13 * <p>Represents a client connection to a {@link MessageBroker}.</p>
14 *
15 * <p>The connection has a <code>MessageBrokerContext</code> associated with it. The
16 * context contains client specific data (such as HTTP information, see
17 * {@link com.aendvari.hermes.broker.http.HttpMessageBrokerContext}). This context
18 * is provided with each published {@link Message}.</p>
19 *
20 * <p>The {@link MessageBroker} and <code>MessageBrokerConnection</code> provide a simple transaction
21 * facility. Messages published while the transaction is active are held until the transaction is
22 * commited or rolled back. When commited, all the messages published are sent. When rolled back,
23 * all the messages are ignored.</p>
24 *
25 * @author Trevor Milne
26 *
27 */
28
29 public class MessageBrokerConnection
30 {
31 /** The {@link MessageBroker} that this connection is for. */
32 protected MessageBroker broker;
33
34 /** The {@link MessageBrokerContext} associated with this connection. */
35 protected MessageBrokerContext brokerContext;
36
37 /** Handles message transactions. */
38 protected MessageTransaction transaction;
39
40
41 /* Constructors. */
42
43
44 /**
45 * Constructs a <code>MessageBrokerConnection</code> with a default context.
46 *
47 * @param setBroker The {@link MessageBroker} this connection is for.
48 *
49 */
50
51 protected MessageBrokerConnection(MessageBroker setBroker)
52 {
53 // set the message broker
54 broker = setBroker;
55
56 // create default broker
57 brokerContext = new MessageBrokerContext();
58 brokerContext.setBroker(broker);
59
60 // create a transaction
61 transaction = new MessageTransaction(this);
62 }
63
64
65 /* Accessors. */
66
67
68 /**
69 * Returns the {@link MessageBrokerContext} of this connection.
70 *
71 * @return The {@link MessageBrokerContext} of this connection.
72 *
73 */
74
75 public MessageBrokerContext getContext()
76 {
77 return brokerContext;
78 }
79
80 /**
81 * Sets the {@link MessageBrokerContext} of this connection.
82 *
83 * @param setContext The {@link MessageBrokerContext} for this connection.
84 *
85 */
86
87 public void setContext(MessageBrokerContext setContext)
88 {
89 brokerContext = setContext;
90 brokerContext.setBroker(broker);
91 }
92
93 /**
94 * Returns the {@link MessageTransaction} of this connection.
95 *
96 * @return The {@link MessageTransaction} of this connection.
97 *
98 */
99
100 public MessageTransaction getTransaction()
101 {
102 return transaction;
103 }
104
105
106 /* Topic management. */
107
108
109 /**
110 * Returns a {@link MessageTopic} of the specified path. The <code>MessageTopic</code> returned
111 * may only be used with this <code>MessageBrokerConnection</code>. If the topic already exists,
112 * a reference to that topic is returned.
113 *
114 * @param topic The topic path.
115 *
116 */
117
118 public MessageTopic getTopic(String topic)
119 {
120 return broker.getTopic(topic);
121 }
122
123 /**
124 * Returns a {@link MessageTopic}. The path parameter is relative to the parent topic supplied.
125 * If the topic already exists, a reference to that topic is returned.
126 *
127 * @param parentTopic The parent {@link MessageTopic}.
128 * @param topic The topic path, relative to the parent topic.
129 *
130 */
131
132 public MessageTopic getTopic(MessageTopic parentTopic, String topic)
133 {
134 return broker.getTopic(parentTopic, topic);
135 }
136
137
138 /* Subscriptions. */
139
140
141 /**
142 * Subscribes the listener to the specified topic.
143 *
144 * @param topic The {@link MessageTopic} to subscribe to.
145 * @param listener The {@link MessageListener} to add to the topic.
146 *
147 */
148
149 public void subscribe(MessageTopic topic, MessageListener listener)
150 {
151 // subscribe the listener
152 broker.subscribe(topic, listener);
153 }
154
155 /**
156 * Unsubscribes the listener from the specified topic.
157 *
158 * @param topic The {@link MessageTopic} to unsubscribe from.
159 * @param listener The {@link MessageListener} to remove from the topic.
160 *
161 */
162
163 public void unsubscribe(MessageTopic topic, MessageListener listener)
164 {
165 // unsubscribe the listener
166 broker.unsubscribe(topic, listener);
167 }
168
169 /**
170 * Subscribes the listener to the specific topic. The topic is specified by it's
171 * path. A search is performed to find the topic in the hierarchy.
172 *
173 * @param topic The topic to subscribe to.
174 * @param listener The {@link MessageListener} to add to the topic.
175 *
176 */
177
178 public void subscribe(String topic, MessageListener listener)
179 {
180 // get the topic specified
181 MessageTopic topicObject = getTopic(topic);
182
183 // subscribe the listener
184 broker.subscribe(topicObject, listener);
185 }
186
187 /**
188 * Unsubscribes the listener form the specific topic. The topic is specified by it's
189 * path. A search is performed to find the topic in the hierarchy.
190 *
191 * @param topic The topic to unsubscribe from.
192 * @param listener The {@link MessageListener} to remove from the topic.
193 *
194 */
195
196 public void unsubscribe(String topic, MessageListener listener)
197 {
198 // get the topic specified
199 MessageTopic topicObject = getTopic(topic);
200
201 // unsubscribe the listener
202 broker.unsubscribe(topicObject, listener);
203 }
204
205
206 /* Publishing. */
207
208
209 /**
210 * Publishes a message to the specific topic.
211 *
212 * @param topic The {@link MessageTopic} to publish the message to.
213 * @param message The {@link Message} to publish.
214 *
215 */
216
217 public void publish(MessageTopic topic, Message message)
218 {
219 // publish the message to the transaction if active
220 if (transaction.isActive())
221 {
222 transaction.publish(topic, message);
223 return;
224 }
225
226 // otherwise treat as normal publish
227 broker.publish(topic, message);
228 }
229
230
231 /* Creation. */
232
233
234 /**
235 * Creates a {@link Message} for this connection.
236 *
237 * @return The {@link Message} for this connection.
238 *
239 */
240
241 public Message createMessage()
242 {
243 // obtain instance
244 Message message = new Message();
245
246 // set connection/context
247 message.setContext(brokerContext);
248
249 return message;
250 }
251
252 /**
253 * Creates a {@link MessageBatch} for this connection.
254 *
255 * @return The {@link MessageBatch} for this connection.
256 *
257 */
258
259 public MessageBatch createBatch()
260 {
261 return new MessageBatch(this);
262 }
263 }
264