Source code: org/activemq/transport/RemoteNetworkChannel.java
1 /**
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18
19 package org.activemq.transport;
20 import java.net.URI;
21 import java.net.URISyntaxException;
22 import javax.jms.JMSException;
23 import org.apache.commons.logging.Log;
24 import org.apache.commons.logging.LogFactory;
25 import org.activemq.broker.BrokerContainer;
26 import org.activemq.broker.impl.BrokerConnectorImpl;
27 import org.activemq.io.impl.DefaultWireFormat;
28 import org.activemq.message.BrokerInfo;
29 import org.activemq.transport.composite.CompositeTransportChannel;
30 import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
31
32 /**
33 * Represents a Boondocks broker's connection with a single remote broker which bridges the two brokers to form a network. <p/>
34 * The NetworkChannel contains a JMS connection with the remote broker. <p/>New subscriptions on the local broker are
35 * multiplexed into the JMS connection so that messages published on the remote broker can be replayed onto the local
36 * broker.
37 *
38 * @version $Revision: 1.1.1.1 $
39 */
40 public class RemoteNetworkChannel extends NetworkChannel implements TransportStatusEventListener {
41 private static final Log log = LogFactory.getLog(RemoteNetworkChannel.class);
42 private TransportChannel boondocksChannel;
43
44 /**
45 * Default Constructor
46 *
47 * @param tp
48 */
49 public RemoteNetworkChannel(PooledExecutor tp) {
50 super(tp);
51 }
52
53 /**
54 * Constructor
55 *
56 * @param connector
57 * @param brokerContainer
58 * @param uri
59 */
60 public RemoteNetworkChannel(NetworkConnector connector, BrokerContainer brokerContainer, String uri) {
61 super(connector,brokerContainer,uri);
62 }
63
64
65 /**
66 * @see org.activemq.transport.TransportStatusEventListener#statusChanged(org.activemq.transport.TransportStatusEvent)
67 */
68 public void statusChanged(TransportStatusEvent event) {
69 if (event.getTransportChannel() == boondocksChannel) {
70 if (event.getChannelStatus() == TransportStatusEvent.RECONNECTED) {
71 try {
72 sendBrokerInfo();
73 }
74 catch (JMSException e) {
75 log.error("Failed to send Broker Info", e);
76 }
77 }
78 }
79 else {
80 super.statusChanged(event);
81 }
82 }
83
84
85 /**
86 * remote:// can only make outgoing connections - we assume we can't
87 * accept incomming (duck!). So we initialize the transport channel
88 * from this side and create the broker client as well
89 * @throws JMSException
90 */
91
92 protected void initialize() throws JMSException {
93 super.initialize();
94 try {
95 boondocksChannel = TransportChannelProvider.create(new DefaultWireFormat(), new URI(uri));
96 boondocksChannel.addTransportStatusEventListener(this);
97 if (boondocksChannel instanceof CompositeTransportChannel) {
98 CompositeTransportChannel composite = (CompositeTransportChannel)boondocksChannel;
99 composite.setMaximumRetries(maximumRetries);
100 composite.setFailureSleepTime(reconnectSleepTime);
101 composite.setIncrementTimeout(false);
102 }
103 boondocksChannel.start();
104 //create our own broker connector ...
105 BrokerConnectorImpl connector = new BrokerConnectorImpl(getBrokerContainer(),"vm://uri",new DefaultWireFormat());
106 connector.start();
107 connector.addClient(boondocksChannel);
108 sendBrokerInfo();
109 }
110 catch (URISyntaxException e) {
111 log.error("Could not parse uri: " + uri + " to make remote connector",e);
112 }
113 }
114
115 private void sendBrokerInfo() throws JMSException{
116 //inform the other side we are a remote channel
117 if (boondocksChannel != null) {
118 BrokerInfo info = new BrokerInfo();
119 info.setBrokerName(brokerContainer.getBroker().getBrokerName());
120 info.setClusterName(brokerContainer.getBroker().getBrokerClusterName());
121 info.setRemote(true);
122 boondocksChannel.asyncSend(info);
123 }
124 }
125
126
127 }