Source code: er/changenotification/ERCNNotificationCoordinator.java
1 //
2 // ERCNNotificationCoordinator.java
3 // Project ERChangeNotificationJMS
4 //
5 // Created by tatsuya on Sat Aug 31 2002
6 //
7 package er.changenotification;
8
9 import com.webobjects.foundation.*;
10 import com.webobjects.appserver.*;
11 import com.webobjects.eocontrol.*;
12 import com.webobjects.eoaccess.*;
13 import java.util.Properties;
14 import er.extensions.ERXLogger;
15
16 import javax.jms.*;
17 import javax.naming.InitialContext;
18 import javax.naming.Context;
19 import org.exolab.jms.client.JmsTopicConnectionFactory;
20 import org.exolab.jms.jndi.JndiConstants;
21 import org.exolab.jms.jndi.rmi.RmiJndiInitialContextFactory;
22
23 /**
24 * ERCNNotificationCoordinator is the primary controller of the change
25 * notification for enterprise objects. It manages the single connection
26 * to the JMS server, and creates ERCNPublisher and ERCNSubscriber objects
27 * and registers them as the observers to handle change notifications.
28 * <p>
29 * When the application finishes launching, it checks properties and
30 * initializes the framework. It can also perform necessary clean-up
31 * operations when the application is about to terminate.
32 * <p>
33 * The framework works transparently and you will not have to call any
34 * methods on the framework. Just add this framework to application project as
35 * an external framework and put necessary properties to your property file.
36 * <p>
37 * The current implementation supports concurrent request handling (multi
38 * threaded operations.) It only supports the changes in the default
39 * EOObjectStoreCoordinator.
40 * <p>
41 * Properties: <br>
42 * Put the following properties into WebObjects.properties file under
43 * your home directory, or into Properties file and register it under
44 * the applications project's Resouces group.
45 * <p>
46 * <pre>
47 *
48 * # entities *not* to synchronize
49 * #er.changenotification.entitiesNotToSynchronize = (TalentPhoto)
50 * er.changenotification.entitiesNotToSynchronize = ()
51 *
52 * # change types to track; Can contain inserted, updated and deleted.
53 * er.changenotification.changeTypesToTrack = (inserted, updated, deleted)
54 *
55 * # JMS topic name (Destination object) to pass the notifications.
56 * # Specify one and register it from the OpenJMS administration tool or
57 * # configuration file.
58 * er.changenotification.jms.topicName = business logic group 1
59 *
60 * # whether or not the JMS subscriber is durable;
61 * # prevents to miss change notifications by temporaly
62 * # network disruptions.
63 * #
64 * # false - suggested for development
65 * # true - suggested for deployment
66 * #
67 * # If it's set to true, you need properly to shut down the applications
68 * # (e.g. shut down it from JavaMonitor or calling application's
69 * # terminate() method), otherwise JMS provider will try to keep
70 * # all changes even after application is shut down.
71 * er.changenotification.jms.durableSubscribers = false
72 *
73 * </pre>
74 */
75 public class ERCNNotificationCoordinator implements ExceptionListener {
76
77 /** logging support */
78 public static final ERXLogger log = ERXLogger.getERXLogger(ERCNNotificationCoordinator.class);
79
80 private static final ERCNNotificationCoordinator _coordinator = new ERCNNotificationCoordinator();
81
82 private ERCNPublisher _publisher;
83 private ERCNSubscriber _subscriber;
84
85 private NSArray _entitiesNotToSynchronize;
86 private NSArray _changeTypesToTrack;
87
88 private Context _context;
89 private TopicConnection _connection;
90 private Topic _topic;
91
92 private String _topicName;
93 private boolean _isSubscriberDurable;
94
95 private boolean _isInitialized = false;
96 private boolean _isTerminated = false;
97
98 static {
99 log.debug("Registering the observer to initialize ERChangeNotification Framework");
100
101 NSNotificationCenter.defaultCenter().addObserver(
102 _coordinator,
103 new NSSelector("initialize", new Class[] { NSNotification.class } ),
104 WOApplication.ApplicationDidFinishLaunchingNotification,
105 null);
106 }
107
108 private ERCNNotificationCoordinator() {
109 super();
110 }
111
112 public static ERCNNotificationCoordinator coordinator() {
113 return _coordinator;
114 }
115
116 public NSArray entitiesNotToSynchronize() {
117 return _entitiesNotToSynchronize;
118 }
119
120 public void setEntitiesNotToSynchronize(NSArray newEntitiesNotToSynchronize) {
121 if (newEntitiesNotToSynchronize != null)
122 _entitiesNotToSynchronize = newEntitiesNotToSynchronize;
123 else
124 _entitiesNotToSynchronize = NSArray.EmptyArray;
125 }
126
127 public NSArray changeTypesToTrack() {
128 return _changeTypesToTrack;
129 }
130
131 public void setChageTypesToTrack(NSArray newChageTypesToTrack) {
132 if (newChageTypesToTrack != null)
133 _changeTypesToTrack = newChageTypesToTrack;
134 else
135 _changeTypesToTrack = NSArray.EmptyArray;
136 }
137
138 protected Topic topic() {
139 return _topic;
140 }
141
142 protected TopicConnection connection() {
143 return _connection;
144 }
145
146 protected String id() {
147 WOApplication app = WOApplication.application();
148 String host = app.host();
149 Number port = app.port(); // Don't forget to apply Max's change
150 String appName = app.name();
151 return host + ":" + port + "/" + appName;
152 }
153
154 public synchronized void initialize(NSNotification notification) {
155 if (_isInitialized) return;
156
157 log.info("Initializing ERChangeNotification Framework");
158
159 setEntitiesNotToSynchronize((NSArray)NSPropertyListSerialization.propertyListFromString(
160 System.getProperty("er.changenotification.entitiesNotToSynchronize")));
161 setChageTypesToTrack((NSArray)NSPropertyListSerialization.propertyListFromString(
162 System.getProperty("er.changenotification.changeTypesToTrack")));
163 _topicName = System.getProperty("er.changenotification.jms.topicName");
164 _isSubscriberDurable = "true".equals(System.getProperty("er.changenotification.jms.durableSubscribers"));
165
166 // Uses OpenJMS embedded JNDI server to locate the TopicConnectionFactory
167 // and Destination objects. Of course, you can use your own JNDI servers
168 // if you want.
169 // Set RMI as the protocol. Note that OpenJMS also supports TCP, HTTP and SSL
170 // protocols as well.
171 String host = "localhost";
172 String port = "1099";
173 String jndiName = "JndiServer";
174 String protocol = "rmi";
175 String protocolType = RmiJndiInitialContextFactory.class.getName();
176
177 Properties props = new Properties();
178 props.put(Context.PROVIDER_URL, protocol + "://" + host + ":" + port + "/" + jndiName);
179 props.put(Context.INITIAL_CONTEXT_FACTORY, protocolType);
180
181 log.debug("props: " + props);
182
183 try {
184 // Open the connection to the JMS server.
185 _context = new InitialContext(props);
186 TopicConnectionFactory factory = (TopicConnectionFactory)_context.lookup("JmsTopicConnectionFactory");
187 if (factory == null)
188 throw new RuntimeException("Failed to locate connection factory");
189
190 _topic = (Topic)_context.lookup(_topicName);
191 if (_topic == null)
192 throw new RuntimeException("Failed to locate topic \"" + _topic +"\"");
193
194 _connection = factory.createTopicConnection();
195
196 // Set itself as the exception listener.
197 _connection.setExceptionListener(this);
198
199 } catch (Exception ex) {
200 log.error("An exception occured: " + ex.getClass().getName() + " - " + ex.getMessage());
201 ex.printStackTrace();
202 return;
203 }
204
205 // Create the notification publisher object and register it as the observer for
206 // the EOObjectStoreCoordinator changes.
207 _publisher = new ERCNPublisher(this);
208 NSNotificationCenter.defaultCenter().addObserver(
209 _publisher,
210 new NSSelector("publishChange", new Class[] { NSNotification.class } ),
211 EOObjectStoreCoordinator.ObjectsChangedInStoreNotification,
212 EOObjectStoreCoordinator.defaultCoordinator());
213
214 // Create the notification subscriber object and register it as the observer for
215 // the distributed change notifications.
216 _subscriber = new ERCNSubscriber(this, _isSubscriberDurable);
217 try {
218 _subscriber.topicSubscriber().setMessageListener(_subscriber);
219 } catch (JMSException ex) {
220 log.error("An exception occured: " + ex.getClass().getName() + " - " + ex.getMessage());
221 ex.printStackTrace();
222 return;
223 }
224
225 // Start to receive notifications from the connection.
226 try {
227 _connection.start();
228 } catch (JMSException ex) {
229 log.error("An exception occured: " + ex.getClass().getName() + " - " + ex.getMessage());
230 ex.printStackTrace();
231 return;
232 }
233
234 _isInitialized = true;
235 }
236
237 /**
238 * releases JMS resouces, including closing the connection.
239 * <p>
240 * This method is supposed to be called by the applicaiton's
241 * terminate method.
242 */
243 // ENHANCEME: Should remove observers as well.
244 public synchronized void terminate() {
245 if (_isTerminated) return;
246
247 try {
248 _connection.stop();
249 } catch (JMSException ex) {
250 log.error("An exception occured: " + ex.getClass().getName() + " - " + ex.getMessage());
251 ex.printStackTrace();
252 }
253
254 _publisher.terminate();
255 _subscriber.terminate();
256
257 log.debug("Closing the JMS connection.");
258 try {
259 _connection.close();
260 } catch (JMSException ex) {
261 log.error("An exception occured: " + ex.getClass().getName() + " - " + ex.getMessage());
262 ex.printStackTrace();
263 }
264
265 _isTerminated = true;
266 }
267
268 public void finalize() throws Throwable {
269 if (! _isTerminated) terminate();
270 super.finalize();
271 }
272
273 // ENHANCEME: Should handle connection errors; try to reconnect when the
274 // connection is interrupted.
275 public void onException(JMSException exception) {
276 log.error("An exception occured: " + exception.getClass().getName() + " - " + exception.getMessage());
277 exception.printStackTrace();
278 }
279
280 public static EODatabaseContext databaseContextForEntityNamed(String entityName, EOEditingContext editingContext) {
281 return EOUtilities.databaseContextForModelNamed(editingContext,
282 EOModelGroup.defaultGroup().entityNamed(entityName).model().name());
283 }
284
285 }