Docjar: A Java Source and Docuemnt Enginecom.*    java.*    javax.*    org.*    all    new    plug-in

Quick Search    Search Deep

Source code: org/activemq/broker/impl/BrokerConnectorImpl.java


1   /**
2    *
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  package org.activemq.broker.impl;
19  
20  import org.activemq.broker.BrokerClient;
21  import org.activemq.broker.BrokerConnector;
22  import org.activemq.broker.BrokerContainer;
23  import org.activemq.io.WireFormat;
24  import org.activemq.message.ActiveMQMessage;
25  import org.activemq.message.ActiveMQXid;
26  import org.activemq.message.BrokerInfo;
27  import org.activemq.message.ConnectionInfo;
28  import org.activemq.message.ConsumerInfo;
29  import org.activemq.message.DurableUnsubscribe;
30  import org.activemq.message.MessageAck;
31  import org.activemq.message.ProducerInfo;
32  import org.activemq.message.SessionInfo;
33  import org.activemq.transport.TransportChannel;
34  import org.activemq.transport.TransportChannelListener;
35  import org.activemq.transport.TransportServerChannel;
36  import org.activemq.transport.TransportServerChannelProvider;
37  import org.apache.commons.logging.Log;
38  import org.apache.commons.logging.LogFactory;
39  
40  import javax.jms.JMSException;
41  import javax.jms.JMSSecurityException;
42  import javax.transaction.xa.XAException;
43  import java.net.URI;
44  import java.net.URISyntaxException;
45  import java.util.Collections;
46  import java.util.HashMap;
47  import java.util.Map;
48  
49  /**
50   * An implementation of the broker (the JMS server)
51   *
52   * @version $Revision: 1.1.1.1 $
53   */
54  public class BrokerConnectorImpl implements BrokerConnector, TransportChannelListener {
55      
56      private TransportServerChannel serverChannel;
57      private Log log;
58      private BrokerContainer container;
59      private Map clients = Collections.synchronizedMap(new HashMap());
60  
61      /**
62       * Helper constructor for TCP protocol with the given bind address
63       *
64       * @param container
65       * @param bindAddress
66       * @throws JMSException
67       */
68      public BrokerConnectorImpl(BrokerContainer container, String bindAddress, WireFormat wireFormat) throws JMSException {
69          this(container, createTransportServerChannel(wireFormat, bindAddress));
70      }
71  
72      /**
73       * @param container
74       * @param serverChannel
75       */
76      public BrokerConnectorImpl(BrokerContainer container, TransportServerChannel serverChannel) {
77          this(container);
78          this.serverChannel = serverChannel;
79          serverChannel.setTransportChannelListener(this);
80      }
81      
82      /**
83      * @param container
84      * @param serverChannel
85      */
86     public BrokerConnectorImpl(BrokerContainer container) {
87         assert container != null;
88         this.log = LogFactory.getLog(getClass().getName());
89         this.container = container;
90         this.container.addConnector(this);
91         
92     }
93  
94      /**
95       * @return infomation about the Broker
96       */
97      public BrokerInfo getBrokerInfo() {
98          return container.getBroker().getBrokerInfo();
99      }
100 
101     /**
102      * Get a hint about the broker capacity for more messages
103      *
104      * @return percentage value (0-100) about how much capacity the
105      *         broker has
106      */
107     public int getBrokerCapacity() {
108         return container.getBroker().getRoundedCapacity();
109     }
110 
111     /**
112      * @return Get the server channel
113      */
114     public TransportServerChannel getServerChannel() {
115         return serverChannel;
116     }
117 
118     /**
119      * start the Broker
120      *
121      * @throws JMSException
122      */
123     public void start() throws JMSException {
124         if (this.serverChannel != null){
125             this.serverChannel.start();
126         }
127         log.info("ActiveMQ connector started: " + serverChannel);
128     }
129 
130     /**
131      * Stop the Broker
132      *
133      * @throws JMSException
134      */
135     public void stop() throws JMSException {
136         this.container.removeConnector(this);
137         if (this.serverChannel != null){
138             this.serverChannel.stop();
139         }
140         log.info("ActiveMQ connector stopped: " + serverChannel);
141     }
142 
143     /**
144      * Register a Broker Client
145      *
146      * @param client
147      * @param info   contains infomation about the Connection this Client represents
148      * @throws JMSException
149      * @throws javax.jms.InvalidClientIDException
150      *                              if the JMS client specifies an invalid or duplicate client ID.
151      * @throws JMSSecurityException if client authentication fails due to an invalid user name or password.
152      */
153     public void registerClient(BrokerClient client, ConnectionInfo info) throws JMSException {
154         this.container.registerConnection(client, info);
155     }
156 
157     /**
158      * Deregister a Broker Client
159      *
160      * @param client
161      * @param info
162      * @throws JMSException if some internal error occurs
163      */
164     public void deregisterClient(BrokerClient client, ConnectionInfo info) throws JMSException {
165         this.container.deregisterConnection(client, info);
166     }
167 
168     /**
169      * Registers a MessageConsumer
170      *
171      * @param client
172      * @param info
173      * @throws JMSException
174      * @throws JMSSecurityException if client authentication fails for the Destination the Consumer applies for
175      */
176     public void registerMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException {
177         if (info.getDestination() == null) {
178             throw new JMSException("No Destination specified on consumerInfo for client: " + client + " info: " + info);
179         }
180         this.container.registerMessageConsumer(client, info);
181 
182     }
183 
184     /**
185      * De-register a MessageConsumer from the Broker
186      *
187      * @param client
188      * @param info
189      * @throws JMSException
190      */
191     public void deregisterMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException {
192         this.container.deregisterMessageConsumer(client, info);
193     }
194 
195     /**
196      * Registers a MessageProducer
197      *
198      * @param client
199      * @param info
200      * @throws JMSException
201      * @throws JMSSecurityException if client authentication fails for the Destination the Consumer applies for
202      */
203     public void registerMessageProducer(BrokerClient client, ProducerInfo info) throws JMSException {
204         this.container.registerMessageProducer(client, info);
205     }
206 
207     /**
208      * De-register a MessageProducer from the Broker
209      *
210      * @param client
211      * @param info
212      * @throws JMSException
213      */
214     public void deregisterMessageProducer(BrokerClient client, ProducerInfo info) throws JMSException {
215         this.container.deregisterMessageProducer(client, info);
216     }
217 
218     /**
219      * Register a client-side Session (used for Monitoring)
220      *
221      * @param client
222      * @param info
223      * @throws JMSException
224      */
225     public void registerSession(BrokerClient client, SessionInfo info) throws JMSException {
226         this.container.registerSession(client, info);
227     }
228 
229     /**
230      * De-register a client-side Session from the Broker (used for monitoring)
231      *
232      * @param client
233      * @param info
234      * @throws JMSException
235      */
236     public void deregisterSession(BrokerClient client, SessionInfo info) throws JMSException {
237         this.container.deregisterSession(client, info);
238     }
239 
240     /**
241      * Start a transaction from the Client session
242      *
243      * @param client
244      * @param transactionId
245      * @throws JMSException
246      */
247     public void startTransaction(BrokerClient client, String transactionId) throws JMSException {
248         this.container.startTransaction(client, transactionId);
249     }
250 
251     /**
252      * Rollback a transacton
253      *
254      * @param client
255      * @param transactionId
256      * @throws JMSException
257      */
258     public void rollbackTransaction(BrokerClient client, String transactionId) throws JMSException {
259         this.container.rollbackTransaction(client, transactionId);
260     }
261 
262     /**
263      * Commit a transaction
264      *
265      * @param client
266      * @param transactionId
267      * @throws JMSException
268      */
269     public void commitTransaction(BrokerClient client, String transactionId) throws JMSException {
270         this.container.commitTransaction(client, transactionId);
271     }
272 
273     /**
274      * Send a non-transacted message to the Broker
275      *
276      * @param client
277      * @param message
278      * @throws JMSException
279      */
280     public void sendMessage(BrokerClient client, ActiveMQMessage message) throws JMSException {
281         this.container.sendMessage(client, message);
282     }
283 
284     /**
285      * Acknowledge reciept of a message
286      *
287      * @param client
288      * @param ack
289      * @throws JMSException
290      */
291     public void acknowledgeMessage(BrokerClient client, MessageAck ack) throws JMSException {
292         this.container.acknowledgeMessage(client, ack);
293     }
294 
295     /**
296      * Command to delete a durable topic subscription
297      *
298      * @param client
299      * @param ds
300      * @throws JMSException
301      */
302     public void durableUnsubscribe(BrokerClient client, DurableUnsubscribe ds) throws JMSException {
303         this.container.durableUnsubscribe(client, ds);
304     }
305 
306 
307     /**
308      * @param channel - client to add
309      */
310     public void addClient(TransportChannel channel) {
311         try {
312             BrokerClient client = new BrokerClientImpl();
313             client.initialize(this, channel);
314             if (log.isDebugEnabled()) {
315                 log.debug("Starting new client: " + client);
316             }
317             channel.setServerSide(true);
318             channel.start();
319             clients.put(channel, client);
320         }
321         catch (JMSException e) {
322             log.error("Failed to add client due to: " + e, e);
323         }
324     }
325 
326     /**
327      * @param channel - client to remove
328      */
329     public void removeClient(TransportChannel channel) {
330         BrokerClient client = (BrokerClient) clients.remove(channel);
331         if (client != null) {
332             if (log.isDebugEnabled()) {
333                 log.debug("Client leaving client: " + client);
334             }
335 
336             // we may have already been closed, if not then lets simulate a normal shutdown
337             client.cleanUp();
338         }
339         else {
340             // might have got a duplicate callback
341             log.warn("No such client for channel: " + channel);
342         }
343     }
344 
345     /**
346      * @return the BrokerContainer for this Connector
347      */
348     public BrokerContainer getBrokerContainer() {
349         return this.container;
350     }
351 
352     /**
353      * Start an XA transaction.
354      *
355      * @see org.activemq.broker.BrokerConnector#startTransaction(org.activemq.broker.BrokerClient, org.activemq.message.ActiveMQXid)
356      */
357     public void startTransaction(BrokerClient client, ActiveMQXid xid) throws XAException {
358         this.container.startTransaction(client, xid);
359     }
360 
361     /**
362      * Gets the prepared XA transactions.
363      *
364      * @see org.activemq.broker.BrokerConnector#getPreparedTransactions(org.activemq.broker.BrokerClient)
365      */
366     public ActiveMQXid[] getPreparedTransactions(BrokerClient client) throws XAException {
367         return this.container.getPreparedTransactions(client);
368     }
369 
370     /**
371      * Prepare an XA transaction.
372      *
373      * @see org.activemq.broker.BrokerConnector#prepareTransaction(org.activemq.broker.BrokerClient, org.activemq.message.ActiveMQXid)
374      */
375     public int prepareTransaction(BrokerClient client, ActiveMQXid xid) throws XAException {
376         return this.container.prepareTransaction(client, xid);
377     }
378 
379     /**
380      * Rollback an XA transaction.
381      *
382      * @see org.activemq.broker.BrokerConnector#rollbackTransaction(org.activemq.broker.BrokerClient, org.activemq.message.ActiveMQXid)
383      */
384     public void rollbackTransaction(BrokerClient client, ActiveMQXid xid) throws XAException {
385         this.container.rollbackTransaction(client, xid);
386     }
387 
388     /**
389      * Commit an XA transaction.
390      *
391      * @see org.activemq.broker.BrokerConnector#commitTransaction(org.activemq.broker.BrokerClient, org.activemq.message.ActiveMQXid, boolean)
392      */
393     public void commitTransaction(BrokerClient client, ActiveMQXid xid, boolean onePhase) throws XAException {
394         this.container.commitTransaction(client, xid, onePhase);
395     }
396 
397     /**
398      * @see org.activemq.broker.BrokerConnector#getResourceManagerId(org.activemq.broker.BrokerClient)
399      */
400     public String getResourceManagerId(BrokerClient client) {
401         // TODO: I think we need to return a better (more unique) RM id.
402         return getBrokerInfo().getBrokerName();
403     }
404 
405 
406     // Implementation methods
407     //-------------------------------------------------------------------------
408     /**
409      * Factory method ot create a transport channel
410      *
411      * @param bindAddress
412      * @return @throws JMSException
413      * @throws JMSException
414      */
415     protected static TransportServerChannel createTransportServerChannel(WireFormat wireFormat, String bindAddress) throws JMSException {
416         URI url;
417         try {
418             url = new URI(bindAddress);
419         }
420         catch (URISyntaxException e) {
421             JMSException jmsEx = new JMSException("Badly formated bindAddress: " + e.getMessage());
422             jmsEx.setLinkedException(e);
423             throw jmsEx;
424         }
425         return TransportServerChannelProvider.create(wireFormat, url);
426     }
427 
428 }