Docjar: A Java Source and Docuemnt Enginecom.*    java.*    javax.*    org.*    all    new    plug-in

Quick Search    Search Deep

Source code: org/activemq/transport/NetworkChannel.java


1   /**
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  
19  package org.activemq.transport;
20  import java.util.Iterator;
21  import java.util.Map;
22  import javax.jms.JMSException;
23  import javax.jms.Session;
24  import org.apache.commons.logging.Log;
25  import org.apache.commons.logging.LogFactory;
26  import org.activemq.ActiveMQConnection;
27  import org.activemq.ActiveMQConnectionFactory;
28  import org.activemq.ActiveMQPrefetchPolicy;
29  import org.activemq.advisories.ConnectionAdvisor;
30  import org.activemq.advisories.ConnectionAdvisoryEvent;
31  import org.activemq.advisories.ConnectionAdvisoryEventListener;
32  import org.activemq.broker.BrokerClient;
33  import org.activemq.broker.BrokerContainer;
34  import org.activemq.broker.ConsumerInfoListener;
35  import org.activemq.message.ActiveMQDestination;
36  import org.activemq.message.BrokerInfo;
37  import org.activemq.message.ConsumerInfo;
38  import org.activemq.message.Receipt;
39  import org.activemq.service.MessageContainerManager;
40  import org.activemq.service.Service;
41  import org.activemq.transport.composite.CompositeTransportChannel;
42  import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
43  import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
44  import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
45  
46  /**
47   * Represents a broker's connection with a single remote broker which bridges the two brokers to form a network. <p/>
48   * The NetworkChannel contains a JMS connection with the remote broker. <p/>New subscriptions on the local broker are
49   * multiplexed into the JMS connection so that messages published on the remote broker can be replayed onto the local
50   * broker.
51   * 
52   * @version $Revision: 1.1.1.1 $
53   */
54  public class NetworkChannel
55          implements
56              Service,
57              ConsumerInfoListener,
58              ConnectionAdvisoryEventListener,
59              TransportStatusEventListener {
60      private static final Log log = LogFactory.getLog(NetworkChannel.class);
61      protected String uri;
62      protected BrokerContainer brokerContainer;
63      protected ActiveMQConnection localConnection;
64      protected ActiveMQConnection remoteConnection;
65      protected ConcurrentHashMap topicConsumerMap;
66      protected ConcurrentHashMap queueConsumerMap;
67      protected String remoteUserName;
68      protected String remotePassword;
69      protected String remoteBrokerName;
70      protected String remoteClusterName;
71      protected int maximumRetries = 0;
72      protected long reconnectSleepTime = 2000L;
73      protected PooledExecutor threadPool;
74      private boolean remote = false;
75      private SynchronizedBoolean started = new SynchronizedBoolean(false);
76      private SynchronizedBoolean connected = new SynchronizedBoolean(false);
77      private SynchronizedBoolean stopped = new SynchronizedBoolean(false);
78      private ConnectionAdvisor connectionAdvisor;
79      private ActiveMQPrefetchPolicy localPrefetchPolicy;
80      private ActiveMQPrefetchPolicy remotePrefetchPolicy;
81      private boolean demandBasedForwarding = true;
82  
83      /**
84       * Default constructor
85       */
86      public NetworkChannel() {
87          this.topicConsumerMap = new ConcurrentHashMap();
88          this.queueConsumerMap = new ConcurrentHashMap();
89      }
90  
91      /**
92       * Default Constructor
93       * 
94       * @param tp
95       */
96      public NetworkChannel(PooledExecutor tp) {
97          this();
98          this.threadPool = tp;
99      }
100 
101     /**
102      * Constructor
103      * 
104      * @param connector
105      * @param brokerContainer
106      * @param uri
107      */
108     public NetworkChannel(NetworkConnector connector, BrokerContainer brokerContainer, String uri) {
109         this(connector.threadPool);
110         this.brokerContainer = brokerContainer;
111         this.uri = uri;
112     }
113 
114     /**
115      * Create a NetworkConnector from a TransportChannel
116      * 
117      * @param connector
118      * @param brokerContainer
119      * @param channel
120      * @param remoteBrokerName
121      * @param remoteclusterName
122      * @throws JMSException
123      */
124     public NetworkChannel(NetworkConnector connector, BrokerContainer brokerContainer, TransportChannel channel,
125             String remoteBrokerName, String remoteclusterName) throws JMSException {
126         this(connector.threadPool);
127         this.brokerContainer = brokerContainer;
128         this.uri = "";
129         this.remoteBrokerName = remoteBrokerName;
130         this.remoteClusterName = remoteclusterName;
131         ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory();
132         fac.setJ2EEcompliant(false);
133         fac.setTurboBoost(true);
134         remoteConnection = new ActiveMQConnection(fac, remoteUserName, remotePassword, channel);
135         remoteConnection.setClientID("Boondocks:" + remoteClusterName + ":" + remoteBrokerName);
136         remoteConnection.setQuickClose(true);
137         remoteConnection.start();
138         BrokerInfo info = new BrokerInfo();
139         info.setBrokerName(brokerContainer.getBroker().getBrokerName());
140         info.setClusterName(brokerContainer.getBroker().getBrokerClusterName());
141         channel.asyncSend(info);
142         remote = true;
143     }
144 
145     /**
146      * @see org.activemq.transport.TransportStatusEventListener#statusChanged(org.activemq.transport.TransportStatusEvent)
147      */
148     public void statusChanged(TransportStatusEvent event) {
149        if (event != null
150            && (event.getChannelStatus() == TransportStatusEvent.CONNECTED
151                || event.getChannelStatus() == TransportStatusEvent.RECONNECTED)) {
152            connected.set(true);
153        }else {
154            connected.set(false);
155        }
156     }
157 
158     private void doSetConnected() {
159         synchronized (connected) {
160             connected.set(true);
161             connected.notifyAll();
162         }
163     }
164 
165     /**
166      * @return text info on this
167      */
168     public String toString() {
169         return "NetworkChannel{ " + ", uri = '" + uri + "' " + ", remoteBrokerName = '" + remoteBrokerName + "' "
170                 + " }";
171     }
172 
173     /**
174      * Start the channel
175      */
176     public void start() {
177         if (started.commit(false, true)) {
178             try {
179                 stopped.set(false);
180                 threadPool.execute(new Runnable() {
181                     public void run() {
182                         String originalName = Thread.currentThread().getName();
183                         try {
184                             Thread.currentThread().setName("NetworkChannel Initiator to " + uri);
185                             initialize();
186                             startSubscriptions();
187                             log.info("Started NetworkChannel to " + uri);
188                         }
189                         catch (JMSException jmsEx) {
190                             log.error("Failed to start NetworkChannel: " + uri, jmsEx);
191                         }
192                         finally {
193                             Thread.currentThread().setName(originalName);
194                         }
195                     }
196                 });
197             }
198             catch (InterruptedException e) {
199                 log.warn("Failed to start - interuppted", e);
200             }
201         }
202     }
203 
204     /**
205      * stop the channel
206      * 
207      * @throws JMSException on error
208      */
209     public void stop() throws JMSException {
210         if (started.commit(true, false)) {
211             stopped.set(true);
212             topicConsumerMap.clear();
213             if (remoteConnection != null) {
214                 remoteConnection.close();
215                 remoteConnection = null;
216             }
217             if (localConnection != null) {
218                 localConnection.close();
219                 localConnection = null;
220             }
221             for (Iterator i = topicConsumerMap.values().iterator();i.hasNext();) {
222                 NetworkMessageBridge consumer = (NetworkMessageBridge) i.next();
223                 consumer.stop();
224             }
225         }
226     }
227 
228     /**
229      * Listen for new Consumer events at this broker
230      * 
231      * @param client
232      * @param info
233      */
234     public void onConsumerInfo(final BrokerClient client, final ConsumerInfo info) {
235         String brokerName = client.getBrokerConnector().getBrokerInfo().getBrokerName();
236         if (!client.isClusteredConnection()) {
237             if (connected.get()) {
238                 if (!info.hasVisited(remoteBrokerName)) {
239                     if (info.isStarted()) {
240                         addConsumerInfo(info);
241                     }
242                     else {
243                         removeConsumerInfo(info);
244                     }
245                 }
246             }
247             else {
248                 try {
249                     threadPool.execute(new Runnable() {
250                         public void run() {
251                             if (!client.isClusteredConnection()) {
252                                 if (!info.hasVisited(remoteBrokerName)) {
253                                     synchronized (connected) {
254                                         while (!connected.get() && !stopped.get()) {
255                                             try {
256                                                 connected.wait(500);
257                                             }
258                                             catch (InterruptedException e) {
259                                                 log.debug("interuppted", e);
260                                             }
261                                         }
262                                         if (info.isStarted()) {
263                                             addConsumerInfo(info);
264                                         }
265                                         else {
266                                             removeConsumerInfo(info);
267                                         }
268                                     }
269                                 }
270                             }
271                         }
272                     });
273                 }
274                 catch (InterruptedException e) {
275                     log.warn("Failed to process ConsumerInfo: " + info, e);
276                 }
277             }
278         }
279     }
280 
281     /**
282      * @return the uri of the broker(s) this channel is connected to
283      */
284     public String getUri() {
285         return uri;
286     }
287 
288     /**
289      * set the uri of the broker(s) this channel is connected to
290      * 
291      * @param uri
292      */
293     public void setUri(String uri) {
294         this.uri = uri;
295     }
296 
297     /**
298      * @return Returns the remotePassword.
299      */
300     public String getRemotePassword() {
301         return remotePassword;
302     }
303 
304     /**
305      * @param remotePassword The remotePassword to set.
306      */
307     public void setRemotePassword(String remotePassword) {
308         this.remotePassword = remotePassword;
309     }
310 
311     /**
312      * @return Returns the remoteUserName.
313      */
314     public String getRemoteUserName() {
315         return remoteUserName;
316     }
317 
318     /**
319      * @param remoteUserName The remoteUserName to set.
320      */
321     public void setRemoteUserName(String remoteUserName) {
322         this.remoteUserName = remoteUserName;
323     }
324 
325     /**
326      * @return Returns the brokerContainer.
327      */
328     public BrokerContainer getBrokerContainer() {
329         return brokerContainer;
330     }
331 
332     /**
333      * @param brokerContainer The brokerContainer to set.
334      */
335     public void setBrokerContainer(BrokerContainer brokerContainer) {
336         this.brokerContainer = brokerContainer;
337     }
338 
339     public int getMaximumRetries() {
340         return maximumRetries;
341     }
342 
343     public void setMaximumRetries(int maximumRetries) {
344         this.maximumRetries = maximumRetries;
345     }
346 
347     public long getReconnectSleepTime() {
348         return reconnectSleepTime;
349     }
350 
351     public void setReconnectSleepTime(long reconnectSleepTime) {
352         this.reconnectSleepTime = reconnectSleepTime;
353     }
354 
355     public String getRemoteBrokerName() {
356         return remoteBrokerName;
357     }
358 
359     public void setRemoteBrokerName(String remoteBrokerName) {
360         this.remoteBrokerName = remoteBrokerName;
361     }
362 
363     /**
364      * @return Returns the threadPool.
365      */
366     protected PooledExecutor getThreadPool() {
367         return threadPool;
368     }
369 
370     /**
371      * @param threadPool The threadPool to set.
372      */
373     protected void setThreadPool(PooledExecutor threadPool) {
374         this.threadPool = threadPool;
375     }
376 
377     private synchronized ActiveMQConnection getLocalConnection() throws JMSException {
378         if (localConnection == null) {
379             initializeLocal();
380         }
381         return localConnection;
382     }
383 
384     private synchronized ActiveMQConnection getRemoteConnection() throws JMSException {
385         if (remoteConnection == null) {
386             initializeRemote();
387         }
388         return remoteConnection;
389     }
390 
391     /**
392      * @return Returns the localPrefetchPolicy.
393      */
394     public ActiveMQPrefetchPolicy getLocalPrefetchPolicy() {
395         return localPrefetchPolicy;
396     }
397 
398     /**
399      * @param localPrefetchPolicy The localPrefetchPolicy to set.
400      */
401     public void setLocalPrefetchPolicy(ActiveMQPrefetchPolicy localPrefetchPolicy) {
402         this.localPrefetchPolicy = localPrefetchPolicy;
403     }
404 
405     /**
406      * @return Returns the remotePrefetchPolicy.
407      */
408     public ActiveMQPrefetchPolicy getRemotePrefetchPolicy() {
409         return remotePrefetchPolicy;
410     }
411 
412     /**
413      * @param remotePrefetchPolicy The remotePrefetchPolicy to set.
414      */
415     public void setRemotePrefetchPolicy(ActiveMQPrefetchPolicy remotePrefetchPolicy) {
416         this.remotePrefetchPolicy = remotePrefetchPolicy;
417     }
418 
419     /**
420      * @return Returns the demandBasedForwarding.
421      */
422     public boolean isDemandBasedForwarding() {
423         return demandBasedForwarding;
424     }
425 
426     /**
427      * @param demandBasedForwarding The demandBasedForwarding to set.
428      */
429     public void setDemandBasedForwarding(boolean demandBasedForwarding) {
430         this.demandBasedForwarding = demandBasedForwarding;
431     }
432 
433     // Implementation methods
434     //-------------------------------------------------------------------------
435     /**
436      * Implementation of ConnectionAdvisoryEventListener
437      * 
438      * @param event
439      */
440     public void onEvent(ConnectionAdvisoryEvent event) {
441         String localBrokerName = brokerContainer.getBroker().getBrokerName();
442         if (!event.getInfo().isClosed()) {
443             brokerContainer.registerRemoteClientID(event.getInfo().getClientId());
444         }
445         else {
446             brokerContainer.deregisterRemoteClientID(event.getInfo().getClientId());
447         }
448     }
449 
450     private void addConsumerInfo(ConsumerInfo info) {
451         addConsumerInfo(info.getDestination(), info.getDestination().isTopic(), info.isDurableTopic());
452     }
453 
454     private void addConsumerInfo(ActiveMQDestination destination, boolean topic, boolean durableTopic) {
455         Map map = topic ? topicConsumerMap : queueConsumerMap;
456         NetworkMessageBridge bridge = (NetworkMessageBridge) map.get(destination.getPhysicalName());
457         if (bridge == null) {
458             bridge = createBridge(map, destination, durableTopic);
459         }
460         else if (durableTopic && !bridge.isDurableTopic() && !demandBasedForwarding) {
461             //upgrade our subscription
462             bridge.decrementReferenceCount();
463             upgradeBridge(bridge);
464         }
465         bridge.incrementReferenceCount();
466     }
467 
468     private void upgradeBridge(NetworkMessageBridge bridge) {
469         try {
470             remoteConnection.stop();
471             bridge.upgrade();
472         }
473         catch (JMSException e) {
474             log.warn("Could not upgrade the NetworkMessageBridge to a durable subscription for destination: "
475                     + bridge.getDestination(), e);
476         }
477         try {
478             remoteConnection.start();
479         }
480         catch (JMSException e) {
481             log.error("Failed to restart the NetworkMessageBridge", e);
482         }
483     }
484 
485     private NetworkMessageBridge createBridge(Map map, ActiveMQDestination destination, boolean durableTopic) {
486         NetworkMessageBridge bridge = new NetworkMessageBridge();
487         try {
488             bridge.setDestination(destination);
489             bridge.setDurableTopic(durableTopic);
490             bridge.setLocalBrokerName(brokerContainer.getBroker().getBrokerName());
491             bridge.setLocalSession(getLocalConnection().createSession(false, Session.CLIENT_ACKNOWLEDGE));
492             bridge.setRemoteSession(getRemoteConnection().createSession(false, Session.CLIENT_ACKNOWLEDGE));
493             map.put(destination.getPhysicalName(), bridge);
494             bridge.start();
495             log.info("started NetworkMessageBridge for destination: " + destination + " -- NetworkChannel: "
496                     + this.toString());
497         }
498         catch (JMSException jmsEx) {
499             log.error("Failed to start NetworkMessageBridge for destination: " + destination, jmsEx);
500         }
501         return bridge;
502     }
503 
504     private void removeConsumerInfo(final ConsumerInfo info) {
505         final String physicalName = info.getDestination().getPhysicalName();
506         Map map = (demandBasedForwarding || info.getDestination().isTopic()) ? topicConsumerMap : queueConsumerMap;
507         final NetworkMessageBridge bridge = (NetworkMessageBridge) map.get(physicalName);
508         if (bridge != null) {
509             if (bridge.decrementReferenceCount() <= 0) {
510                 try {
511                     threadPool.execute(new Runnable() {
512                         public void run() {
513                             bridge.stop();
514                             topicConsumerMap.remove(physicalName);
515                             log.info("stopped MetworkMessageBridge for destination: " + info.getDestination());
516                         }
517                     });
518                 }
519                 catch (InterruptedException e) {
520                     log.warn("got interrupted stoping NetworkBridge", e);
521                 }
522             }
523         }
524     }
525 
526     private void startSubscriptions() {
527         if (!demandBasedForwarding) {
528             if (!remote) {
529                 MessageContainerManager mcm = brokerContainer.getBroker().getPersistentTopicContainerManager();
530                 if (mcm != null) {
531                     Map map = mcm.getLocalDestinations();
532                     startSubscriptions(map, true, true);
533                 }
534                 mcm = brokerContainer.getBroker().getTransientTopicContainerManager();
535                 if (mcm != null) {
536                     Map map = mcm.getLocalDestinations();
537                     startSubscriptions(map, true, false);
538                 }
539                 mcm = brokerContainer.getBroker().getTransientQueueContainerManager();
540                 if (mcm != null) {
541                     Map map = mcm.getLocalDestinations();
542                     startSubscriptions(map, false, false);
543                 }
544                 mcm = brokerContainer.getBroker().getPersistentQueueContainerManager();
545                 if (mcm != null) {
546                     Map map = mcm.getLocalDestinations();
547                     startSubscriptions(map, false, false);
548                 }
549             }
550         }
551     }
552 
553     private void startSubscriptions(Map destinations, boolean topic, boolean durableTopic) {
554         if (destinations != null) {
555             for (Iterator i = destinations.values().iterator();i.hasNext();) {
556                 ActiveMQDestination dest = (ActiveMQDestination) i.next();
557                 addConsumerInfo(dest, topic, durableTopic);
558             }
559         }
560     }
561 
562     protected void initialize() throws JMSException {
563         // force lazy construction
564         initializeLocal();
565         initializeRemote();
566         brokerContainer.getBroker().addConsumerInfoListener(NetworkChannel.this);
567     }
568 
569     private synchronized void initializeRemote() throws JMSException {
570         if (remoteConnection == null) {
571             ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(remoteUserName, remotePassword, uri);
572             //factory.setTurboBoost(true);
573             factory.setJ2EEcompliant(false);
574             factory.setQuickClose(true);
575             factory.setInternalConnection(true);
576             remoteConnection = (ActiveMQConnection) factory.createConnection();
577             TransportChannel transportChannel = remoteConnection.getTransportChannel();
578             if (transportChannel instanceof CompositeTransportChannel) {
579                 CompositeTransportChannel composite = (CompositeTransportChannel) transportChannel;
580                 composite.setMaximumRetries(maximumRetries);
581                 composite.setFailureSleepTime(reconnectSleepTime);
582                 composite.setIncrementTimeout(false);
583             }
584             transportChannel.addTransportStatusEventListener(this);
585             remoteConnection.setClientID(brokerContainer.getBroker().getBrokerName() + "_NetworkChannel");
586             remoteConnection.start();
587             BrokerInfo info = new BrokerInfo();
588             info.setBrokerName(brokerContainer.getBroker().getBrokerName());
589             info.setClusterName(brokerContainer.getBroker().getBrokerClusterName());
590             Receipt receipt = remoteConnection.syncSendRequest(info);
591             if (receipt != null) {
592                 remoteBrokerName = receipt.getBrokerName();
593                 remoteClusterName = receipt.getClusterName();
594             }
595             connectionAdvisor = new ConnectionAdvisor(remoteConnection);
596             connectionAdvisor.addListener(this);
597             connectionAdvisor.start();
598             if (remotePrefetchPolicy != null) {
599                 remoteConnection.setPrefetchPolicy(remotePrefetchPolicy);
600             }
601         }
602         doSetConnected();
603     }
604 
605     private void initializeLocal() throws JMSException {
606         String brokerName = brokerContainer.getBroker().getBrokerName();
607         ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://" + brokerName);
608         factory.setTurboBoost(true);
609         factory.setJ2EEcompliant(false);
610         factory.setBrokerName(brokerName);
611         factory.setQuickClose(true);
612         factory.setInternalConnection(true);
613         localConnection = (ActiveMQConnection) factory.createConnection();
614         localConnection.start();
615         BrokerInfo info = new BrokerInfo();
616         info.setBrokerName(remoteBrokerName);
617         info.setClusterName(remoteClusterName);
618         localConnection.asyncSendPacket(info);
619         if (localPrefetchPolicy != null) {
620             localConnection.setPrefetchPolicy(localPrefetchPolicy);
621         }
622     }
623     
624     /*private synchronized void releaseRemote() throws JMSException {
625       if (remoteConnection != null) {
626         TransportChannel transportChannel = remoteConnection.getTransportChannel();
627         transportChannel.stop();
628             if (connectionAdvisor != null) {
629             connectionAdvisor.stop();
630             }
631         try {
632           remoteConnection.stop();
633         } catch (JMSException e) {
634           // ignore this exception, since the remote broker is most likely down 
635         }
636         remoteConnection = null;
637       }
638     }*/
639    
640 }