Source code: org/activemq/gbean/ActiveMQConnectorGBean.java
1 /**
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18 package org.activemq.gbean;
19
20 import java.net.InetSocketAddress;
21 import java.net.URI;
22 import java.net.URISyntaxException;
23 import javax.jms.JMSException;
24
25 import org.apache.commons.logging.Log;
26 import org.apache.commons.logging.LogFactory;
27 import org.apache.geronimo.gbean.GBeanInfo;
28 import org.apache.geronimo.gbean.GBeanInfoBuilder;
29 import org.apache.geronimo.gbean.GBeanLifecycle;
30 import org.apache.geronimo.gbean.GConstructorInfo;
31 import org.apache.geronimo.kernel.Kernel;
32 import org.activemq.ActiveMQConnectionFactory;
33 import org.activemq.broker.BrokerConnector;
34 import org.activemq.broker.impl.BrokerConnectorImpl;
35 import org.activemq.io.WireFormat;
36 import org.activemq.io.impl.DefaultWireFormat;
37
38 /**
39 * Default implementation of the ActiveMQ connector
40 *
41 * @version $Revision: 1.1.1.1 $
42 */
43 public class ActiveMQConnectorGBean implements GBeanLifecycle, ActiveMQConnector {
44 private Log log = LogFactory.getLog(getClass().getName());
45
46 private BrokerConnector brokerConnector;
47 private ActiveMQContainer container;
48 private WireFormat wireFormat = new DefaultWireFormat();
49 private String protocol;
50 private String host;
51 private int port;
52 private String path;
53 private String query;
54 private String urlAsStarted;
55
56 public ActiveMQConnectorGBean(ActiveMQContainer container, String protocol, String host, int port) {
57 this.container = container;
58 this.protocol = protocol;
59 this.host = host;
60 this.port = port;
61 }
62
63 public String getProtocol() {
64 return protocol;
65 }
66
67 public void setProtocol(String protocol) {
68 this.protocol = protocol;
69 }
70
71 public String getHost() {
72 return host;
73 }
74
75 public void setHost(String host) {
76 this.host = host;
77 }
78
79 public int getPort() {
80 return port;
81 }
82
83 public void setPort(int port) {
84 this.port = port;
85 }
86
87 public String getPath() {
88 return path;
89 }
90
91 public void setPath(String path) {
92 this.path = path;
93 }
94
95 public String getQuery() {
96 return query;
97 }
98
99 public void setQuery(String query) {
100 this.query = query;
101 }
102
103 public String getUrl() {
104 try {
105 return new URI(protocol, null, host, port, path, query, null).toString();
106 } catch (URISyntaxException e) {
107 throw new IllegalStateException("Attributes don't form a valid URI: "+protocol+"://"+host+":"+port+"/"+path+"?"+query);
108 }
109 }
110
111 public WireFormat getWireFormat() {
112 return wireFormat;
113 }
114
115 public void setWireFormat(WireFormat wireFormat) {
116 this.wireFormat = wireFormat;
117 }
118
119 public InetSocketAddress getListenAddress() {
120 return brokerConnector == null ? null : brokerConnector.getServerChannel().getSocketAddress();
121 }
122
123 public synchronized void doStart() throws Exception {
124 ClassLoader old = Thread.currentThread().getContextClassLoader();
125 Thread.currentThread().setContextClassLoader(ActiveMQContainerGBean.class.getClassLoader());
126 try {
127 if (brokerConnector == null) {
128 urlAsStarted = getUrl();
129 brokerConnector = createBrokerConnector(urlAsStarted);
130 brokerConnector.start();
131 ActiveMQConnectionFactory.registerBroker(urlAsStarted, brokerConnector);
132 }
133 } finally {
134 Thread.currentThread().setContextClassLoader(old);
135 }
136 }
137
138 public synchronized void doStop() throws Exception {
139 if (brokerConnector != null) {
140 ActiveMQConnectionFactory.unregisterBroker(urlAsStarted);
141 BrokerConnector temp = brokerConnector;
142 brokerConnector = null;
143 temp.stop();
144 }
145 }
146
147 public synchronized void doFail() {
148 if (brokerConnector != null) {
149 BrokerConnector temp = brokerConnector;
150 brokerConnector = null;
151 try {
152 temp.stop();
153 }
154 catch (JMSException e) {
155 log.info("Caught while closing due to failure: " + e, e);
156 }
157 }
158 }
159
160 protected BrokerConnector createBrokerConnector(String url) throws Exception {
161 return new BrokerConnectorImpl(container.getBrokerContainer(), url, wireFormat);
162 }
163
164 public static final GBeanInfo GBEAN_INFO;
165
166 static {
167 GBeanInfoBuilder infoFactory = new GBeanInfoBuilder("ActiveMQ Message Broker Connector", ActiveMQConnectorGBean.class, CONNECTOR_J2EE_TYPE);
168 infoFactory.addAttribute("url", String.class.getName(), false);
169 infoFactory.addAttribute("wireFormat", WireFormat.class.getName(), false);
170 infoFactory.addReference("activeMQContainer", ActiveMQContainer.class);
171 infoFactory.addInterface(ActiveMQConnector.class, new String[]{"host","port","protocol","path","query"});
172 infoFactory.setConstructor(new GConstructorInfo(new String[]{"activeMQContainer", "protocol", "host", "port"}));
173 GBEAN_INFO = infoFactory.getBeanInfo();
174 }
175
176 public static GBeanInfo getGBeanInfo() {
177 return GBEAN_INFO;
178 }
179 }