Source code: org/activemq/JmsTransactionTestSupport.java
1 /**
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18 package org.activemq;
19
20 import java.util.ArrayList;
21
22 import javax.jms.Connection;
23 import javax.jms.ConnectionFactory;
24 import javax.jms.Destination;
25 import javax.jms.JMSException;
26 import javax.jms.Message;
27 import javax.jms.MessageConsumer;
28 import javax.jms.MessageProducer;
29 import javax.jms.Session;
30 import javax.jms.TextMessage;
31
32 import org.activemq.test.JmsResourceProvider;
33 import org.activemq.test.TestSupport;
34
35 /**
36 * @version $Revision: 1.2 $
37 */
38 abstract public class JmsTransactionTestSupport extends TestSupport {
39
40 protected ConnectionFactory connectionFactory;
41 protected Connection connection;
42 protected Session session;
43 protected MessageConsumer consumer;
44 protected MessageProducer producer;
45 protected JmsResourceProvider resourceProvider;
46 protected Destination destination;
47
48 public JmsTransactionTestSupport() {
49 super();
50 }
51
52 public JmsTransactionTestSupport(String name) {
53 super(name);
54 }
55
56 public void testSendReceiveTransactedBatches() throws Exception {
57
58 int batchCount = 10;
59 int batchSize = 20;
60 TextMessage message = session.createTextMessage("Batch Message");
61
62 for( int j=0; j < batchCount; j++) {
63 System.out.println("Producing bacth "+j+" of "+batchSize+" messages");
64 for( int i=0; i < batchSize; i++) {
65 producer.send(message);
66 }
67 session.commit();
68
69 System.out.println("Consuming bacth "+j+" of "+batchSize+" messages");
70 for( int i=0; i < batchSize; i++) {
71 message = (TextMessage) consumer.receive(1000*5);
72 assertNotNull("Received only "+i+" messages in batch "+j, message);
73 assertEquals("Batch Message", message.getText());
74 }
75 session.commit();
76 }
77 }
78
79 public void testSendRollback() throws Exception {
80
81 Message[] outbound = new Message[]{
82 session.createTextMessage("First Message"),
83 session.createTextMessage("Second Message")
84 };
85
86 producer.send(outbound[0]);
87 session.commit();
88 producer.send(session.createTextMessage("I'm going to get rolled back."));
89 session.rollback();
90 producer.send(outbound[1]);
91 session.commit();
92
93 ArrayList messages = new ArrayList();
94 System.out.println("About to consume message 1");
95 Message message = consumer.receive(1000);
96 messages.add(message);
97 System.out.println("Received: " + message);
98
99 System.out.println("About to consume message 2");
100 message = consumer.receive(4000);
101 messages.add(message);
102 System.out.println("Received: " + message);
103
104 session.commit();
105
106 Message inbound[] = new Message[messages.size()];
107 messages.toArray(inbound);
108
109 assertTextMessagesEqual("Rollback did not work.", outbound, inbound);
110 }
111
112 public void testReceiveRollback() throws Exception {
113
114 Message[] outbound = new Message[]{
115 session.createTextMessage("First Message"),
116 session.createTextMessage("Second Message")
117 };
118
119 // lets consume any outstanding messages from previous test runs
120 while (consumer.receive(1000) != null) {
121 }
122 session.commit();
123
124 producer.send(outbound[0]);
125 producer.send(outbound[1]);
126 session.commit();
127
128 System.out.println("Sent 0: " + outbound[0]);
129 System.out.println("Sent 1: " + outbound[1]);
130
131 ArrayList messages = new ArrayList();
132 Message message = consumer.receive(1000);
133 messages.add(message);
134 assertEquals(outbound[0], message);
135 session.commit();
136
137 // rollback so we can get that last message again.
138 message = consumer.receive(1000);
139 assertNotNull(message);
140 assertEquals(outbound[1], message);
141 session.rollback();
142
143 // Consume again.. the previous message should
144 // get redelivered.
145 message = consumer.receive(5000);
146 assertNotNull("Should have re-received the message again!", message);
147 messages.add(message);
148 session.commit();
149
150 Message inbound[] = new Message[messages.size()];
151 messages.toArray(inbound);
152
153 assertTextMessagesEqual("Rollback did not work", outbound, inbound);
154 }
155
156 public void testReceiveTwoThenRollback() throws Exception {
157
158 Message[] outbound = new Message[]{
159 session.createTextMessage("First Message"),
160 session.createTextMessage("Second Message")
161 };
162
163 // lets consume any outstanding messages from previous test runs
164 while (consumer.receive(1000) != null) {
165 }
166 session.commit();
167
168 producer.send(outbound[0]);
169 producer.send(outbound[1]);
170 session.commit();
171
172 System.out.println("Sent 0: " + outbound[0]);
173 System.out.println("Sent 1: " + outbound[1]);
174
175 ArrayList messages = new ArrayList();
176 Message message = consumer.receive(1000);
177 assertEquals(outbound[0], message);
178
179 message = consumer.receive(1000);
180 assertNotNull(message);
181 assertEquals(outbound[1], message);
182 session.rollback();
183
184 // Consume again.. the previous message should
185 // get redelivered.
186 message = consumer.receive(5000);
187 assertNotNull("Should have re-received the first message again!", message);
188 messages.add(message);
189 assertEquals(outbound[0], message);
190
191 message = consumer.receive(5000);
192 assertNotNull("Should have re-received the second message again!", message);
193 messages.add(message);
194 assertEquals(outbound[1], message);
195 session.commit();
196
197 Message inbound[] = new Message[messages.size()];
198 messages.toArray(inbound);
199
200 assertTextMessagesEqual("Rollback did not work", outbound, inbound);
201 }
202
203 public void testReceiveTwoThenRollbackManyTimes() throws Exception {
204 for( int i=0; i < 5 ; i++ )
205 testReceiveTwoThenRollback();
206 }
207
208 public void testSendRollbackWithPrefetchOfOne() throws Exception {
209 setPrefetchToOne();
210 testSendRollback();
211 }
212
213 public void testReceiveRollbackWithPrefetchOfOne() throws Exception {
214 setPrefetchToOne();
215 testReceiveRollback();
216 }
217
218 /*
219 * see http://jira.codehaus.org/browse/AMQ-143
220 *
221 */
222 public void testCloseConsumerBeforeCommit() throws Exception {
223
224 TextMessage[] outbound = new TextMessage[]{
225 session.createTextMessage("First Message"),
226 session.createTextMessage("Second Message")
227 };
228
229 // lets consume any outstanding messages from previous test runs
230 while (consumer.receive(1000) != null) {
231 }
232 session.commit();
233
234 producer.send(outbound[0]);
235 producer.send(outbound[1]);
236 session.commit();
237
238 System.out.println("Sent 0: " + outbound[0]);
239 System.out.println("Sent 1: " + outbound[1]);
240
241 ArrayList messages = new ArrayList();
242 TextMessage message = (TextMessage) consumer.receive(1000);
243 assertEquals(outbound[0].getText(), message.getText());
244 // Close the consumer before the commit. This should not cause the received message
245 // to rollback.
246 consumer.close();
247 session.commit();
248
249 // Create a new consumer
250 consumer = resourceProvider.createConsumer(session, destination);
251 System.out.println("Created consumer: " + consumer);
252
253 message = (TextMessage) consumer.receive(1000);
254 assertEquals(outbound[1].getText(), message.getText());
255 session.commit();
256 }
257
258 protected abstract JmsResourceProvider getJmsResourceProvider();
259
260 protected void setUp() throws Exception {
261 super.setUp();
262
263 resourceProvider = getJmsResourceProvider();
264 topic = resourceProvider.isTopic();
265
266 // We will be using transacted sessions.
267 resourceProvider.setTransacted(true);
268
269 connectionFactory = resourceProvider.createConnectionFactory();
270 reconnect();
271 }
272
273 /**
274 * @throws JMSException
275 */
276 protected void reconnect() throws JMSException {
277
278 Connection t = resourceProvider.createConnection(connectionFactory);
279 if( connection != null ) {
280 // Close the previous connection.
281 connection.close();
282 }
283 connection = t;
284
285 session = resourceProvider.createSession(connection);
286 destination = resourceProvider.createDestination(session, getSubject());
287 producer = resourceProvider.createProducer(session, destination);
288 consumer = resourceProvider.createConsumer(session, destination);
289 connection.start();
290 }
291
292 protected void tearDown() throws Exception {
293 //System.out.println("Test Done. Stats");
294 //((ActiveMQConnectionFactory) connectionFactory).getFactoryStats().dump(new IndentPrinter());
295 System.out.println("Closing down connection");
296
297 session.close();
298 connection.close();
299 System.out.println("Connection closed.");
300 }
301
302 protected void setPrefetchToOne() {
303 ActiveMQPrefetchPolicy prefetchPolicy = ((ActiveMQConnection) connection).getPrefetchPolicy();
304 prefetchPolicy.setQueuePrefetch(1);
305 prefetchPolicy.setTopicPrefetch(1);
306 prefetchPolicy.setDurableTopicPrefetch(1);
307 }
308 }