Docjar: A Java Source and Docuemnt Enginecom.*    java.*    javax.*    org.*    all    new    plug-in

Quick Search    Search Deep

Source code: com/aendvari/hermes/broker/MessageBroker.java


1   /*
2    * MessageBroker.java
3    *
4    * Copyright (c) 2001, 2002 Aendvari, Ltd. All Rights Reserved.
5    *
6    * See the file LICENSE for terms of use.
7    *
8    */
9   
10  package com.aendvari.hermes.broker;
11  
12  import java.util.Iterator;
13  import java.util.LinkedList;
14  
15  import com.aendvari.common.osm.Osm;
16  import com.aendvari.common.osm.OsmNode;
17  import com.aendvari.common.osm.SimpleOsmPath;
18  
19  /**
20   * <p>Handles the subscription, publishing, and dispatching of messages.</p>
21   *
22   * <p>Clients access the broker through {@link MessageBrokerConnection MessageBrokerConnections}.
23   * The connection provides the facilities for the client to subscribe and publish to the broker.</p>
24   *
25   * @author  Trevor Milne
26   *
27   */
28  
29  public class MessageBroker
30  {
31    /**
32     * Contains the topic hierarchy. Subscribers are stored in collections within
33     * nodes of the tree.
34     */
35  
36    protected Osm topics;
37  
38    /**
39     * When enabled, logs any messages published through this broker.
40     *
41     */
42  
43    protected MessageLogger logger;
44  
45  
46    /* Constructors. */
47  
48  
49    /**
50     * Constructs a <code>MessageBroker</code>.
51     *
52     */
53  
54    public MessageBroker()
55    {
56      // create topic hierarchy
57      topics = new Osm();
58  
59      // create logger
60      logger = new MessageLogger();
61    }
62  
63  
64    /* Accessors. */
65  
66  
67    /**
68     * Returns the {@link MessageLogger} of this broker.
69     *
70     * @return                  The {@link MessageLogger} of this broker.
71     *
72     */
73  
74    public MessageLogger getLogger()
75    {
76      return logger;
77    }
78  
79  
80    /* Connection management. */
81  
82  
83    /**
84     * Creates a {@link MessageBrokerConnection} for this broker.
85     *
86     * @return                  The {@link MessageBrokerConnection} for this broker.
87     *
88     */
89  
90    public MessageBrokerConnection createConnection()
91    {
92      // create a connection with a default context
93      return new MessageBrokerConnection(this);
94    }
95  
96  
97    /* Topic management. */
98  
99  
100   /**
101    * Returns a {@link MessageTopic} of the supplied path. The <code>MessageTopic</code>
102    * returned may only be used with this <code>MessageBroker</code>. If the topic already
103    * exists, a reference to that topic is returned.
104    *
105    * @param    topic            The topic path.
106    *
107    */
108 
109   protected synchronized MessageTopic getTopic(String topic)
110   {
111     // get the node for the topic
112     OsmNode topicNode = SimpleOsmPath.getNode(topics, topic, true);
113 
114     // get the topic object
115     MessageTopic topicObject = (MessageTopic)topicNode.getNodeValue(MessageTopic.class);
116 
117     // set the node (in case a new one was created)
118     topicObject.setNode(topicNode);
119 
120     return topicObject;
121   }
122 
123   /**
124    * Returns a {@link MessageTopic} of the supplied relative path. The path parameter
125    * is relative to the parent topic supplied. If the topic already exists, a reference
126    * to that topic is returned.
127    *
128    * @param    parentTopic          The parent {@link MessageTopic}.
129    * @param    topic            The topic path, relative to the parent topic.
130    *
131    */
132 
133   protected synchronized MessageTopic getTopic(MessageTopic parentTopic, String topic)
134   {
135     // get the node for the topic
136     OsmNode topicNode = SimpleOsmPath.getNode(parentTopic.getNode(), topic, true);
137 
138     // get the topic object
139     MessageTopic topicObject = (MessageTopic)topicNode.getNodeValue(MessageTopic.class);
140 
141     // set the node (in case a new one was created)
142     topicObject.setNode(topicNode);
143 
144     return topicObject;
145   }
146 
147 
148   /* Subscriptions. */
149 
150 
151   /**
152    * Subscribes the listener to the specific topic.
153    *
154    * @param    topic            The {@link MessageTopic} to subscribe to.
155    * @param    listener          The {@link MessageListener} to add to the topic.
156    *
157    */
158 
159   protected synchronized void subscribe(MessageTopic topic, MessageListener listener)
160   {
161     // subscribe the listener
162     topic.subscribe(listener);
163   }
164 
165   /**
166    * Unsubscribes the listener from the specific topic.
167    *
168    * @param    topic            The {@link MessageTopic} to unsubscribe from.
169    * @param    listener          The {@link MessageListener} to remove from the topic.
170    *
171    */
172 
173   protected synchronized void unsubscribe(MessageTopic topic, MessageListener listener)
174   {
175     // subscribe the listener
176     topic.unsubscribe(listener);
177   }
178 
179 
180   /* Publishing. */
181 
182 
183   /**
184    * Publishes a message to the specific topic.
185    *
186    * @param    topic            The {@link MessageTopic} to publish the message to.
187    * @param    message            The {@link Message} to publish.
188    *
189    */
190 
191   protected synchronized void publish(MessageTopic topic, Message message)
192   {
193     LinkedList nodes = new LinkedList();
194 
195     // set the destination of the message
196     message.setDestination(topic);
197 
198     // traverse node tree upwards, and collect the nodes
199     OsmNode node = topic.getNode();
200 
201     do
202     {
203       nodes.addFirst(node);
204       node = node.getParentNode();
205     }
206     while (node != null);
207 
208     // traverse back down tree, sending message to each level
209     Iterator nodeIterator = nodes.iterator();
210 
211     // skip the root node
212     nodeIterator.next();
213 
214     while (nodeIterator.hasNext())
215     {
216       // get the node at this level
217       node = (OsmNode)nodeIterator.next();
218 
219       // get the topic object
220       MessageTopic topicObject = (MessageTopic)node.getNodeValue(MessageTopic.class);
221 
222       // set the node (in case a new one was created)
223       topicObject.setNode(node);
224 
225       // set the location of the message
226       message.setLocation(topicObject);
227 
228       // log message
229       if (logger.isActive())
230       {
231         logger.write(message);
232       }
233 
234       // publish the message to the topic
235       topicObject.publish(message);
236 
237       // stop sending if message was cancelled
238       if (message.isCancelled())
239       {
240         break;
241       }
242     }
243   }
244 }
245