Source code: com/aendvari/hermes/broker/MessageBroker.java
1 /*
2 * MessageBroker.java
3 *
4 * Copyright (c) 2001, 2002 Aendvari, Ltd. All Rights Reserved.
5 *
6 * See the file LICENSE for terms of use.
7 *
8 */
9
10 package com.aendvari.hermes.broker;
11
12 import java.util.Iterator;
13 import java.util.LinkedList;
14
15 import com.aendvari.common.osm.Osm;
16 import com.aendvari.common.osm.OsmNode;
17 import com.aendvari.common.osm.SimpleOsmPath;
18
19 /**
20 * <p>Handles the subscription, publishing, and dispatching of messages.</p>
21 *
22 * <p>Clients access the broker through {@link MessageBrokerConnection MessageBrokerConnections}.
23 * The connection provides the facilities for the client to subscribe and publish to the broker.</p>
24 *
25 * @author Trevor Milne
26 *
27 */
28
29 public class MessageBroker
30 {
31 /**
32 * Contains the topic hierarchy. Subscribers are stored in collections within
33 * nodes of the tree.
34 */
35
36 protected Osm topics;
37
38 /**
39 * When enabled, logs any messages published through this broker.
40 *
41 */
42
43 protected MessageLogger logger;
44
45
46 /* Constructors. */
47
48
49 /**
50 * Constructs a <code>MessageBroker</code>.
51 *
52 */
53
54 public MessageBroker()
55 {
56 // create topic hierarchy
57 topics = new Osm();
58
59 // create logger
60 logger = new MessageLogger();
61 }
62
63
64 /* Accessors. */
65
66
67 /**
68 * Returns the {@link MessageLogger} of this broker.
69 *
70 * @return The {@link MessageLogger} of this broker.
71 *
72 */
73
74 public MessageLogger getLogger()
75 {
76 return logger;
77 }
78
79
80 /* Connection management. */
81
82
83 /**
84 * Creates a {@link MessageBrokerConnection} for this broker.
85 *
86 * @return The {@link MessageBrokerConnection} for this broker.
87 *
88 */
89
90 public MessageBrokerConnection createConnection()
91 {
92 // create a connection with a default context
93 return new MessageBrokerConnection(this);
94 }
95
96
97 /* Topic management. */
98
99
100 /**
101 * Returns a {@link MessageTopic} of the supplied path. The <code>MessageTopic</code>
102 * returned may only be used with this <code>MessageBroker</code>. If the topic already
103 * exists, a reference to that topic is returned.
104 *
105 * @param topic The topic path.
106 *
107 */
108
109 protected synchronized MessageTopic getTopic(String topic)
110 {
111 // get the node for the topic
112 OsmNode topicNode = SimpleOsmPath.getNode(topics, topic, true);
113
114 // get the topic object
115 MessageTopic topicObject = (MessageTopic)topicNode.getNodeValue(MessageTopic.class);
116
117 // set the node (in case a new one was created)
118 topicObject.setNode(topicNode);
119
120 return topicObject;
121 }
122
123 /**
124 * Returns a {@link MessageTopic} of the supplied relative path. The path parameter
125 * is relative to the parent topic supplied. If the topic already exists, a reference
126 * to that topic is returned.
127 *
128 * @param parentTopic The parent {@link MessageTopic}.
129 * @param topic The topic path, relative to the parent topic.
130 *
131 */
132
133 protected synchronized MessageTopic getTopic(MessageTopic parentTopic, String topic)
134 {
135 // get the node for the topic
136 OsmNode topicNode = SimpleOsmPath.getNode(parentTopic.getNode(), topic, true);
137
138 // get the topic object
139 MessageTopic topicObject = (MessageTopic)topicNode.getNodeValue(MessageTopic.class);
140
141 // set the node (in case a new one was created)
142 topicObject.setNode(topicNode);
143
144 return topicObject;
145 }
146
147
148 /* Subscriptions. */
149
150
151 /**
152 * Subscribes the listener to the specific topic.
153 *
154 * @param topic The {@link MessageTopic} to subscribe to.
155 * @param listener The {@link MessageListener} to add to the topic.
156 *
157 */
158
159 protected synchronized void subscribe(MessageTopic topic, MessageListener listener)
160 {
161 // subscribe the listener
162 topic.subscribe(listener);
163 }
164
165 /**
166 * Unsubscribes the listener from the specific topic.
167 *
168 * @param topic The {@link MessageTopic} to unsubscribe from.
169 * @param listener The {@link MessageListener} to remove from the topic.
170 *
171 */
172
173 protected synchronized void unsubscribe(MessageTopic topic, MessageListener listener)
174 {
175 // subscribe the listener
176 topic.unsubscribe(listener);
177 }
178
179
180 /* Publishing. */
181
182
183 /**
184 * Publishes a message to the specific topic.
185 *
186 * @param topic The {@link MessageTopic} to publish the message to.
187 * @param message The {@link Message} to publish.
188 *
189 */
190
191 protected synchronized void publish(MessageTopic topic, Message message)
192 {
193 LinkedList nodes = new LinkedList();
194
195 // set the destination of the message
196 message.setDestination(topic);
197
198 // traverse node tree upwards, and collect the nodes
199 OsmNode node = topic.getNode();
200
201 do
202 {
203 nodes.addFirst(node);
204 node = node.getParentNode();
205 }
206 while (node != null);
207
208 // traverse back down tree, sending message to each level
209 Iterator nodeIterator = nodes.iterator();
210
211 // skip the root node
212 nodeIterator.next();
213
214 while (nodeIterator.hasNext())
215 {
216 // get the node at this level
217 node = (OsmNode)nodeIterator.next();
218
219 // get the topic object
220 MessageTopic topicObject = (MessageTopic)node.getNodeValue(MessageTopic.class);
221
222 // set the node (in case a new one was created)
223 topicObject.setNode(node);
224
225 // set the location of the message
226 message.setLocation(topicObject);
227
228 // log message
229 if (logger.isActive())
230 {
231 logger.write(message);
232 }
233
234 // publish the message to the topic
235 topicObject.publish(message);
236
237 // stop sending if message was cancelled
238 if (message.isCancelled())
239 {
240 break;
241 }
242 }
243 }
244 }
245