Docjar: A Java Source and Docuemnt Enginecom.*    java.*    javax.*    org.*    all    new    plug-in

Quick Search    Search Deep

Source code: org/activemq/transport/RemoteNetworkChannel.java


1   /**
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  
19  package org.activemq.transport;
20  import java.net.URI;
21  import java.net.URISyntaxException;
22  import javax.jms.JMSException;
23  import org.apache.commons.logging.Log;
24  import org.apache.commons.logging.LogFactory;
25  import org.activemq.broker.BrokerContainer;
26  import org.activemq.broker.impl.BrokerConnectorImpl;
27  import org.activemq.io.impl.DefaultWireFormat;
28  import org.activemq.message.BrokerInfo;
29  import org.activemq.transport.composite.CompositeTransportChannel;
30  import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
31  
32  /**
33   * Represents a Boondocks broker's connection with a single remote broker which bridges the two brokers to form a network. <p/>
34   * The NetworkChannel contains a JMS connection with the remote broker. <p/>New subscriptions on the local broker are
35   * multiplexed into the JMS connection so that messages published on the remote broker can be replayed onto the local
36   * broker.
37   * 
38   * @version $Revision: 1.1.1.1 $
39   */
40  public class RemoteNetworkChannel extends NetworkChannel implements TransportStatusEventListener  {
41      private static final Log log = LogFactory.getLog(RemoteNetworkChannel.class);
42      private TransportChannel boondocksChannel;
43     
44      /**
45       * Default Constructor
46       * 
47       * @param tp
48       */
49      public RemoteNetworkChannel(PooledExecutor tp) {
50          super(tp);
51      }
52  
53      /**
54       * Constructor
55       * 
56       * @param connector
57       * @param brokerContainer
58       * @param uri
59       */
60      public RemoteNetworkChannel(NetworkConnector connector, BrokerContainer brokerContainer, String uri) {
61          super(connector,brokerContainer,uri);
62      }
63  
64      
65      /**
66       * @see org.activemq.transport.TransportStatusEventListener#statusChanged(org.activemq.transport.TransportStatusEvent)
67       */
68      public void statusChanged(TransportStatusEvent event) {
69          if (event.getTransportChannel() == boondocksChannel) {
70              if (event.getChannelStatus() == TransportStatusEvent.RECONNECTED) {
71                  try {
72                      sendBrokerInfo();
73                  }
74                  catch (JMSException e) {
75                      log.error("Failed to send Broker Info", e);
76                  }
77              }
78          }
79          else {
80              super.statusChanged(event);
81          }
82      }
83  
84      
85      /**
86       * remote:// can only make outgoing connections - we assume we can't
87       * accept incomming (duck!). So we initialize the transport channel
88       * from this side and create the broker client as well
89       * @throws JMSException
90       */
91      
92      protected void initialize() throws JMSException {
93          super.initialize();
94          try {
95              boondocksChannel = TransportChannelProvider.create(new DefaultWireFormat(), new URI(uri));
96              boondocksChannel.addTransportStatusEventListener(this);
97              if (boondocksChannel instanceof CompositeTransportChannel) {
98                  CompositeTransportChannel composite = (CompositeTransportChannel)boondocksChannel;
99                  composite.setMaximumRetries(maximumRetries);
100                 composite.setFailureSleepTime(reconnectSleepTime);
101                 composite.setIncrementTimeout(false);
102             }
103             boondocksChannel.start();
104             //create our own broker connector ...
105             BrokerConnectorImpl connector = new BrokerConnectorImpl(getBrokerContainer(),"vm://uri",new DefaultWireFormat());
106             connector.start();
107             connector.addClient(boondocksChannel);
108             sendBrokerInfo();
109         }
110         catch (URISyntaxException e) {
111          log.error("Could not parse uri: " + uri + " to make remote connector",e);
112         }
113     }
114     
115     private void sendBrokerInfo() throws JMSException{
116         //inform the other side we are a remote channel
117         if (boondocksChannel != null) {
118             BrokerInfo info = new BrokerInfo();
119             info.setBrokerName(brokerContainer.getBroker().getBrokerName());
120             info.setClusterName(brokerContainer.getBroker().getBrokerClusterName());
121             info.setRemote(true);
122             boondocksChannel.asyncSend(info);
123         }
124     }
125 
126     
127 }