Docjar: A Java Source and Docuemnt Enginecom.*    java.*    javax.*    org.*    all    new    plug-in

Quick Search    Search Deep

Source code: org/activemq/ActiveMQConnectionConsumer.java


1   /** 
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  
19  package org.activemq;
20  
21  import javax.jms.ConnectionConsumer;
22  import javax.jms.IllegalStateException;
23  import javax.jms.JMSException;
24  import javax.jms.ServerSession;
25  import javax.jms.ServerSessionPool;
26  import javax.jms.Session;
27  
28  import org.activemq.io.util.MemoryBoundedQueue;
29  import org.activemq.message.ActiveMQMessage;
30  import org.activemq.message.ConsumerInfo;
31  
32  /**
33   * For application servers, <CODE>Connection</CODE> objects provide a special
34   * facility for creating a <CODE>ConnectionConsumer</CODE> (optional). The
35   * messages it is to consume are specified by a <CODE>Destination</CODE> and a
36   * message selector. In addition, a <CODE>ConnectionConsumer</CODE> must be
37   * given a <CODE>ServerSessionPool</CODE> to use for processing its messages.
38   * <p/>
39   * <P>
40   * Normally, when traffic is light, a <CODE>ConnectionConsumer</CODE> gets a
41   * <CODE>ServerSession</CODE> from its pool, loads it with a single message,
42   * and starts it. As traffic picks up, messages can back up. If this happens, a
43   * <CODE>ConnectionConsumer</CODE> can load each <CODE>ServerSession</CODE>
44   * with more than one message. This reduces the thread context switches and
45   * minimizes resource use at the expense of some serialization of message
46   * processing.
47   * 
48   * @see javax.jms.Connection#createConnectionConsumer
49   * @see javax.jms.Connection#createDurableConnectionConsumer
50   * @see javax.jms.QueueConnection#createConnectionConsumer
51   * @see javax.jms.TopicConnection#createConnectionConsumer
52   * @see javax.jms.TopicConnection#createDurableConnectionConsumer
53   */
54  
55  public class ActiveMQConnectionConsumer implements ConnectionConsumer, ActiveMQMessageDispatcher {
56  
57      private ActiveMQConnection connection;
58  
59      private ServerSessionPool sessionPool;
60  
61      private ConsumerInfo consumerInfo;
62  
63      private boolean closed;
64  
65      protected MemoryBoundedQueue messageQueue;
66  
67      /**
68       * Create a ConnectionConsumer
69       * 
70       * @param theConnection
71       * @param theSessionPool
72       * @param theConsumerInfo
73       * @param theMaximumMessages
74       * @throws JMSException
75       */
76      protected ActiveMQConnectionConsumer(ActiveMQConnection theConnection, ServerSessionPool theSessionPool,
77              ConsumerInfo theConsumerInfo, int theMaximumMessages) throws JMSException {
78          this.connection = theConnection;
79          this.sessionPool = theSessionPool;
80          this.consumerInfo = theConsumerInfo;
81          this.connection.addConnectionConsumer(this);
82          this.consumerInfo.setStarted(true);
83          this.consumerInfo.setPrefetchNumber(theMaximumMessages);
84          this.connection.syncSendPacket(this.consumerInfo);
85  
86          String queueName = connection.clientID + ":" + theConsumerInfo.getConsumerName() + ":"
87                  + theConsumerInfo.getConsumerNo();
88          this.messageQueue = connection.getMemoryBoundedQueue(queueName);
89      }
90  
91      /**
92       * Tests to see if the Message Dispatcher is a target for this message
93       * 
94       * @param message
95       *            the message to test
96       * @return true if the Message Dispatcher can dispatch the message
97       */
98      public boolean isTarget(ActiveMQMessage message) {
99          return message.isConsumerTarget(this.consumerInfo.getConsumerNo());
100     }
101 
102     /**
103      * Dispatch an ActiveMQMessage
104      * 
105      * @param message
106      */
107     public void dispatch(ActiveMQMessage message) {
108         if (message.isConsumerTarget(this.consumerInfo.getConsumerNo())) {
109             message.setConsumerIdentifer(this.consumerInfo.getConsumerId());
110             message.setTransientConsumed(!this.consumerInfo.isDurableTopic()
111                     && !this.consumerInfo.getDestination().isQueue());
112             try {
113                 if (sessionPool != null)
114                     dispatchToSession(message);
115                 else
116                     dispatchToQueue(message);
117             } catch (JMSException jmsEx) {
118                 this.connection.handleAsyncException(jmsEx);
119             }
120         }
121     }
122 
123     /**
124      * @param message
125      * @throws JMSException
126      */
127     private void dispatchToQueue(ActiveMQMessage message) throws JMSException {
128         messageQueue.enqueue(message);
129     }
130 
131     /**
132      * Receives the next message that arrives within the specified timeout
133      * interval.
134      * 
135      * @throws JMSException
136      */
137     public ActiveMQMessage receive(long timeout) throws JMSException {
138         try {
139             ActiveMQMessage message = (ActiveMQMessage) messageQueue.dequeue(timeout);
140             return message;
141         } catch (InterruptedException ioe) {
142             return null;
143         }
144     }
145 
146     /**
147      * @param message
148      * @throws JMSException
149      */
150     private void dispatchToSession(ActiveMQMessage message) throws JMSException {
151 
152         ServerSession serverSession = sessionPool.getServerSession();
153         Session nestedSession = serverSession.getSession();
154         ActiveMQSession session = null;
155         if (nestedSession instanceof ActiveMQSession) {
156             session = (ActiveMQSession) nestedSession;
157         } else if (nestedSession instanceof ActiveMQTopicSession) {
158             ActiveMQTopicSession topicSession = (ActiveMQTopicSession) nestedSession;
159             session = (ActiveMQSession) topicSession.getNext();
160         } else if (nestedSession instanceof ActiveMQQueueSession) {
161             ActiveMQQueueSession queueSession = (ActiveMQQueueSession) nestedSession;
162             session = (ActiveMQSession) queueSession.getNext();
163         } else {
164             throw new JMSException("Invalid instance of session obtained from server session." +
165             "The instance should be one of the following: ActiveMQSession, ActiveMQTopicSession, ActiveMQQueueSession. " +
166             "Found instance of " + nestedSession.getClass().getName());
167         }
168         session.dispatch(message);
169         serverSession.start();
170     }
171 
172     /**
173      * Gets the server session pool associated with this connection consumer.
174      * 
175      * @return the server session pool used by this connection consumer
176      * @throws JMSException
177      *             if the JMS provider fails to get the server session pool
178      *             associated with this consumer due to some internal error.
179      */
180 
181     public ServerSessionPool getServerSessionPool() throws JMSException {
182         if (closed) {
183             throw new IllegalStateException("The Connection Consumer is closed");
184         }
185         return this.sessionPool;
186     }
187 
188     /**
189      * Closes the connection consumer. <p/>
190      * <P>
191      * Since a provider may allocate some resources on behalf of a connection
192      * consumer outside the Java virtual machine, clients should close these
193      * resources when they are not needed. Relying on garbage collection to
194      * eventually reclaim these resources may not be timely enough.
195      * 
196      * @throws JMSException
197      */
198 
199     public void close() throws JMSException {
200         if (!closed) {
201             closed = true;
202             this.consumerInfo.setStarted(false);
203             this.connection.asyncSendPacket(this.consumerInfo);
204             this.connection.removeConnectionConsumer(this);
205         }
206 
207     }
208 }