Source code: org/activemq/ActiveMQConnectionConsumer.java
1 /**
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18
19 package org.activemq;
20
21 import javax.jms.ConnectionConsumer;
22 import javax.jms.IllegalStateException;
23 import javax.jms.JMSException;
24 import javax.jms.ServerSession;
25 import javax.jms.ServerSessionPool;
26 import javax.jms.Session;
27
28 import org.activemq.io.util.MemoryBoundedQueue;
29 import org.activemq.message.ActiveMQMessage;
30 import org.activemq.message.ConsumerInfo;
31
32 /**
33 * For application servers, <CODE>Connection</CODE> objects provide a special
34 * facility for creating a <CODE>ConnectionConsumer</CODE> (optional). The
35 * messages it is to consume are specified by a <CODE>Destination</CODE> and a
36 * message selector. In addition, a <CODE>ConnectionConsumer</CODE> must be
37 * given a <CODE>ServerSessionPool</CODE> to use for processing its messages.
38 * <p/>
39 * <P>
40 * Normally, when traffic is light, a <CODE>ConnectionConsumer</CODE> gets a
41 * <CODE>ServerSession</CODE> from its pool, loads it with a single message,
42 * and starts it. As traffic picks up, messages can back up. If this happens, a
43 * <CODE>ConnectionConsumer</CODE> can load each <CODE>ServerSession</CODE>
44 * with more than one message. This reduces the thread context switches and
45 * minimizes resource use at the expense of some serialization of message
46 * processing.
47 *
48 * @see javax.jms.Connection#createConnectionConsumer
49 * @see javax.jms.Connection#createDurableConnectionConsumer
50 * @see javax.jms.QueueConnection#createConnectionConsumer
51 * @see javax.jms.TopicConnection#createConnectionConsumer
52 * @see javax.jms.TopicConnection#createDurableConnectionConsumer
53 */
54
55 public class ActiveMQConnectionConsumer implements ConnectionConsumer, ActiveMQMessageDispatcher {
56
57 private ActiveMQConnection connection;
58
59 private ServerSessionPool sessionPool;
60
61 private ConsumerInfo consumerInfo;
62
63 private boolean closed;
64
65 protected MemoryBoundedQueue messageQueue;
66
67 /**
68 * Create a ConnectionConsumer
69 *
70 * @param theConnection
71 * @param theSessionPool
72 * @param theConsumerInfo
73 * @param theMaximumMessages
74 * @throws JMSException
75 */
76 protected ActiveMQConnectionConsumer(ActiveMQConnection theConnection, ServerSessionPool theSessionPool,
77 ConsumerInfo theConsumerInfo, int theMaximumMessages) throws JMSException {
78 this.connection = theConnection;
79 this.sessionPool = theSessionPool;
80 this.consumerInfo = theConsumerInfo;
81 this.connection.addConnectionConsumer(this);
82 this.consumerInfo.setStarted(true);
83 this.consumerInfo.setPrefetchNumber(theMaximumMessages);
84 this.connection.syncSendPacket(this.consumerInfo);
85
86 String queueName = connection.clientID + ":" + theConsumerInfo.getConsumerName() + ":"
87 + theConsumerInfo.getConsumerNo();
88 this.messageQueue = connection.getMemoryBoundedQueue(queueName);
89 }
90
91 /**
92 * Tests to see if the Message Dispatcher is a target for this message
93 *
94 * @param message
95 * the message to test
96 * @return true if the Message Dispatcher can dispatch the message
97 */
98 public boolean isTarget(ActiveMQMessage message) {
99 return message.isConsumerTarget(this.consumerInfo.getConsumerNo());
100 }
101
102 /**
103 * Dispatch an ActiveMQMessage
104 *
105 * @param message
106 */
107 public void dispatch(ActiveMQMessage message) {
108 if (message.isConsumerTarget(this.consumerInfo.getConsumerNo())) {
109 message.setConsumerIdentifer(this.consumerInfo.getConsumerId());
110 message.setTransientConsumed(!this.consumerInfo.isDurableTopic()
111 && !this.consumerInfo.getDestination().isQueue());
112 try {
113 if (sessionPool != null)
114 dispatchToSession(message);
115 else
116 dispatchToQueue(message);
117 } catch (JMSException jmsEx) {
118 this.connection.handleAsyncException(jmsEx);
119 }
120 }
121 }
122
123 /**
124 * @param message
125 * @throws JMSException
126 */
127 private void dispatchToQueue(ActiveMQMessage message) throws JMSException {
128 messageQueue.enqueue(message);
129 }
130
131 /**
132 * Receives the next message that arrives within the specified timeout
133 * interval.
134 *
135 * @throws JMSException
136 */
137 public ActiveMQMessage receive(long timeout) throws JMSException {
138 try {
139 ActiveMQMessage message = (ActiveMQMessage) messageQueue.dequeue(timeout);
140 return message;
141 } catch (InterruptedException ioe) {
142 return null;
143 }
144 }
145
146 /**
147 * @param message
148 * @throws JMSException
149 */
150 private void dispatchToSession(ActiveMQMessage message) throws JMSException {
151
152 ServerSession serverSession = sessionPool.getServerSession();
153 Session nestedSession = serverSession.getSession();
154 ActiveMQSession session = null;
155 if (nestedSession instanceof ActiveMQSession) {
156 session = (ActiveMQSession) nestedSession;
157 } else if (nestedSession instanceof ActiveMQTopicSession) {
158 ActiveMQTopicSession topicSession = (ActiveMQTopicSession) nestedSession;
159 session = (ActiveMQSession) topicSession.getNext();
160 } else if (nestedSession instanceof ActiveMQQueueSession) {
161 ActiveMQQueueSession queueSession = (ActiveMQQueueSession) nestedSession;
162 session = (ActiveMQSession) queueSession.getNext();
163 } else {
164 throw new JMSException("Invalid instance of session obtained from server session." +
165 "The instance should be one of the following: ActiveMQSession, ActiveMQTopicSession, ActiveMQQueueSession. " +
166 "Found instance of " + nestedSession.getClass().getName());
167 }
168 session.dispatch(message);
169 serverSession.start();
170 }
171
172 /**
173 * Gets the server session pool associated with this connection consumer.
174 *
175 * @return the server session pool used by this connection consumer
176 * @throws JMSException
177 * if the JMS provider fails to get the server session pool
178 * associated with this consumer due to some internal error.
179 */
180
181 public ServerSessionPool getServerSessionPool() throws JMSException {
182 if (closed) {
183 throw new IllegalStateException("The Connection Consumer is closed");
184 }
185 return this.sessionPool;
186 }
187
188 /**
189 * Closes the connection consumer. <p/>
190 * <P>
191 * Since a provider may allocate some resources on behalf of a connection
192 * consumer outside the Java virtual machine, clients should close these
193 * resources when they are not needed. Relying on garbage collection to
194 * eventually reclaim these resources may not be timely enough.
195 *
196 * @throws JMSException
197 */
198
199 public void close() throws JMSException {
200 if (!closed) {
201 closed = true;
202 this.consumerInfo.setStarted(false);
203 this.connection.asyncSendPacket(this.consumerInfo);
204 this.connection.removeConnectionConsumer(this);
205 }
206
207 }
208 }