Docjar: A Java Source and Docuemnt Enginecom.*    java.*    javax.*    org.*    all    new    plug-in

Quick Search    Search Deep

Source code: er/changenotification/ERCNNotificationCoordinator.java


1   //
2   // ERCNNotificationCoordinator.java
3   // Project ERChangeNotificationJMS
4   //
5   // Created by tatsuya on Sat Aug 31 2002
6   //
7   package er.changenotification;
8   
9   import com.webobjects.foundation.*;
10  import com.webobjects.appserver.*;
11  import com.webobjects.eocontrol.*;
12  import com.webobjects.eoaccess.*;
13  import java.util.Properties;
14  import er.extensions.ERXLogger;
15  
16  import javax.jms.*;
17  import javax.naming.InitialContext;
18  import javax.naming.Context;
19  import org.exolab.jms.client.JmsTopicConnectionFactory;
20  import org.exolab.jms.jndi.JndiConstants;
21  import org.exolab.jms.jndi.rmi.RmiJndiInitialContextFactory;
22  
23  /**
24   * ERCNNotificationCoordinator is the primary controller of the change 
25   * notification for enterprise objects. It manages the single connection  
26   * to the JMS server, and creates ERCNPublisher and ERCNSubscriber objects  
27   * and registers them as the observers to handle change notifications. 
28   * <p>
29   * When the application finishes launching, it checks properties and 
30   * initializes the framework. It can also perform necessary clean-up 
31   * operations when the application is about to terminate. 
32   * <p>
33   * The framework works transparently and you will not have to call any 
34   * methods on the framework. Just add this framework to application project as 
35   * an external framework and put necessary properties to your property file. 
36   * <p>
37   * The current implementation supports concurrent request handling (multi 
38   * threaded operations.)  It only supports the changes in the default 
39   * EOObjectStoreCoordinator. 
40   * <p>
41   * Properties: <br>
42   * Put the following properties into WebObjects.properties file under 
43   * your home directory, or into Properties file and register it under 
44   * the applications project's Resouces group. 
45   * <p>
46   * <pre>
47   * 
48   * # entities *not* to synchronize
49   * #er.changenotification.entitiesNotToSynchronize = (TalentPhoto)
50   * er.changenotification.entitiesNotToSynchronize = ()
51   * 
52   * # change types to track; Can contain inserted, updated and deleted. 
53   * er.changenotification.changeTypesToTrack = (inserted, updated, deleted)
54   * 
55   * # JMS topic name (Destination object) to pass the notifications. 
56   * # Specify one and register it from the OpenJMS administration tool or 
57   * # configuration file. 
58   * er.changenotification.jms.topicName = business logic group 1
59   * 
60   * # whether or not the JMS subscriber is durable; 
61   * # prevents to miss change notifications by temporaly 
62   * # network disruptions. 
63   * # 
64   * # false - suggested for development 
65   * # true  - suggested for deployment
66   * # 
67   * # If it's set to true, you need properly to shut down the applications 
68   * # (e.g. shut down it from JavaMonitor or calling application's 
69   * # terminate() method), otherwise JMS provider will try to keep  
70   * # all changes even after application is shut down. 
71   * er.changenotification.jms.durableSubscribers = false
72   * 
73   * </pre>
74   */
75  public class ERCNNotificationCoordinator implements ExceptionListener {
76  
77      /** logging support */
78      public static final ERXLogger log = ERXLogger.getERXLogger(ERCNNotificationCoordinator.class);
79  
80      private static final ERCNNotificationCoordinator _coordinator = new ERCNNotificationCoordinator();
81  
82      private ERCNPublisher _publisher;
83      private ERCNSubscriber _subscriber;
84  
85      private NSArray _entitiesNotToSynchronize;
86      private NSArray _changeTypesToTrack;
87  
88      private Context _context;
89      private TopicConnection _connection;
90      private Topic _topic;
91  
92      private String _topicName;
93      private boolean _isSubscriberDurable;
94  
95      private boolean _isInitialized = false;
96      private boolean _isTerminated = false;
97  
98      static {
99          log.debug("Registering the observer to initialize ERChangeNotification Framework");
100         
101         NSNotificationCenter.defaultCenter().addObserver(
102             _coordinator, 
103             new NSSelector("initialize", new Class[] { NSNotification.class } ), 
104             WOApplication.ApplicationDidFinishLaunchingNotification, 
105             null);
106     }
107 
108     private ERCNNotificationCoordinator() {
109         super();
110     }
111     
112     public static ERCNNotificationCoordinator coordinator() {
113         return _coordinator;
114     }
115 
116     public NSArray entitiesNotToSynchronize() {
117         return _entitiesNotToSynchronize;
118     }
119     
120     public void setEntitiesNotToSynchronize(NSArray newEntitiesNotToSynchronize) {
121         if (newEntitiesNotToSynchronize != null) 
122             _entitiesNotToSynchronize = newEntitiesNotToSynchronize;
123         else 
124             _entitiesNotToSynchronize = NSArray.EmptyArray;
125     } 
126     
127     public NSArray changeTypesToTrack() {
128         return _changeTypesToTrack;
129     }
130 
131     public void setChageTypesToTrack(NSArray newChageTypesToTrack) {
132         if (newChageTypesToTrack != null) 
133             _changeTypesToTrack = newChageTypesToTrack;
134         else 
135             _changeTypesToTrack = NSArray.EmptyArray;
136     } 
137     
138     protected Topic topic() {
139         return _topic;
140     }
141 
142     protected TopicConnection connection() {
143         return _connection;
144     }
145 
146     protected String id() {
147         WOApplication app = WOApplication.application();
148         String host = app.host();
149         Number port = app.port(); // Don't forget to apply Max's change
150         String appName = app.name();
151         return host + ":" + port + "/" + appName;
152     }
153 
154     public synchronized void initialize(NSNotification notification) {
155         if (_isInitialized)  return;
156 
157         log.info("Initializing ERChangeNotification Framework");
158 
159         setEntitiesNotToSynchronize((NSArray)NSPropertyListSerialization.propertyListFromString(
160                                         System.getProperty("er.changenotification.entitiesNotToSynchronize")));
161         setChageTypesToTrack((NSArray)NSPropertyListSerialization.propertyListFromString(
162                                         System.getProperty("er.changenotification.changeTypesToTrack")));
163         _topicName = System.getProperty("er.changenotification.jms.topicName");
164         _isSubscriberDurable = "true".equals(System.getProperty("er.changenotification.jms.durableSubscribers"));
165         
166         // Uses OpenJMS embedded JNDI server to locate the TopicConnectionFactory 
167         // and Destination objects. Of course, you can use your own JNDI servers 
168         // if you want. 
169         // Set RMI as the protocol. Note that OpenJMS also supports TCP, HTTP and SSL 
170         // protocols as well. 
171         String host = "localhost";
172         String port = "1099";
173         String jndiName = "JndiServer";
174         String protocol = "rmi";
175         String protocolType = RmiJndiInitialContextFactory.class.getName();
176 
177         Properties props = new Properties();
178         props.put(Context.PROVIDER_URL, protocol + "://" + host + ":" + port + "/" + jndiName);
179         props.put(Context.INITIAL_CONTEXT_FACTORY, protocolType);
180         
181         log.debug("props: " + props);
182         
183         try {
184             // Open the connection to the JMS server. 
185             _context = new InitialContext(props);
186             TopicConnectionFactory factory = (TopicConnectionFactory)_context.lookup("JmsTopicConnectionFactory");
187             if (factory == null) 
188                 throw new RuntimeException("Failed to locate connection factory");
189             
190             _topic = (Topic)_context.lookup(_topicName);
191             if (_topic == null) 
192                 throw new RuntimeException("Failed to locate topic \"" + _topic +"\"");
193 
194             _connection = factory.createTopicConnection();
195             
196             // Set itself as the exception listener. 
197             _connection.setExceptionListener(this);
198 
199         } catch (Exception ex) {
200             log.error("An exception occured: " + ex.getClass().getName() + " - " + ex.getMessage());
201             ex.printStackTrace();
202             return;
203         }
204     
205         // Create the notification publisher object and register it as the observer for 
206         // the EOObjectStoreCoordinator changes. 
207         _publisher = new ERCNPublisher(this);
208         NSNotificationCenter.defaultCenter().addObserver(
209             _publisher, 
210             new NSSelector("publishChange", new Class[] { NSNotification.class } ), 
211             EOObjectStoreCoordinator.ObjectsChangedInStoreNotification, 
212             EOObjectStoreCoordinator.defaultCoordinator());
213 
214         // Create the notification subscriber object and register it as the observer for 
215         // the distributed change notifications.  
216         _subscriber = new ERCNSubscriber(this, _isSubscriberDurable);
217         try {
218             _subscriber.topicSubscriber().setMessageListener(_subscriber);
219         } catch (JMSException ex) {
220             log.error("An exception occured: " + ex.getClass().getName() + " - " + ex.getMessage());
221             ex.printStackTrace();
222             return;
223         }
224 
225         // Start to receive notifications from the connection. 
226         try {
227             _connection.start();
228         } catch (JMSException ex) {
229             log.error("An exception occured: " + ex.getClass().getName() + " - " + ex.getMessage());
230             ex.printStackTrace();
231             return;
232         }
233 
234         _isInitialized = true;
235     }
236 
237     /** 
238      * releases JMS resouces, including closing the connection. 
239      * <p>
240      * This method is supposed to be called by the applicaiton's 
241      * terminate method. 
242      */ 
243      // ENHANCEME: Should remove observers as well. 
244     public synchronized void terminate() {
245         if (_isTerminated)   return;
246         
247         try {
248             _connection.stop();
249          } catch (JMSException ex) {
250             log.error("An exception occured: " + ex.getClass().getName() + " - " + ex.getMessage());
251             ex.printStackTrace();
252         }
253        
254         _publisher.terminate();
255      _subscriber.terminate();
256         
257         log.debug("Closing the JMS connection.");
258         try {
259             _connection.close();
260         } catch (JMSException ex) {
261             log.error("An exception occured: " + ex.getClass().getName() + " - " + ex.getMessage());
262             ex.printStackTrace();
263         }
264 
265         _isTerminated = true;
266     }
267 
268     public void finalize() throws Throwable {
269         if (! _isTerminated)   terminate(); 
270         super.finalize();
271     }
272 
273     // ENHANCEME: Should handle connection errors; try to reconnect when the 
274     //            connection is interrupted. 
275     public void onException(JMSException exception) {
276         log.error("An exception occured: " + exception.getClass().getName() + " - " + exception.getMessage());
277         exception.printStackTrace();
278     }
279 
280     public static EODatabaseContext databaseContextForEntityNamed(String entityName, EOEditingContext editingContext) {
281         return EOUtilities.databaseContextForModelNamed(editingContext, 
282                         EOModelGroup.defaultGroup().entityNamed(entityName).model().name());
283     }
284 
285 }