Docjar: A Java Source and Docuemnt Enginecom.*    java.*    javax.*    org.*    all    new    plug-in

Quick Search    Search Deep

Source code: org/activemq/JmsTransactionTestSupport.java


1   /** 
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  package org.activemq;
19  
20  import java.util.ArrayList;
21  
22  import javax.jms.Connection;
23  import javax.jms.ConnectionFactory;
24  import javax.jms.Destination;
25  import javax.jms.JMSException;
26  import javax.jms.Message;
27  import javax.jms.MessageConsumer;
28  import javax.jms.MessageProducer;
29  import javax.jms.Session;
30  import javax.jms.TextMessage;
31  
32  import org.activemq.test.JmsResourceProvider;
33  import org.activemq.test.TestSupport;
34  
35  /**
36   * @version $Revision: 1.2 $
37   */
38  abstract public class JmsTransactionTestSupport extends TestSupport {
39  
40      protected ConnectionFactory connectionFactory;
41      protected Connection connection;
42      protected Session session;
43      protected MessageConsumer consumer;
44      protected MessageProducer producer;
45      protected JmsResourceProvider resourceProvider;
46      protected Destination destination;
47  
48      public JmsTransactionTestSupport() {
49          super();
50      }
51  
52      public JmsTransactionTestSupport(String name) {
53          super(name);
54      }
55  
56      public void testSendReceiveTransactedBatches() throws Exception {
57  
58          int batchCount = 10;
59          int batchSize = 20;
60          TextMessage message = session.createTextMessage("Batch Message");
61  
62          for( int j=0; j < batchCount; j++) {
63              System.out.println("Producing bacth "+j+" of "+batchSize+" messages");
64              for( int i=0; i < batchSize; i++) {
65                  producer.send(message);
66              }        
67              session.commit();
68  
69              System.out.println("Consuming bacth "+j+" of "+batchSize+" messages");
70              for( int i=0; i < batchSize; i++) {
71                  message = (TextMessage) consumer.receive(1000*5);
72                  assertNotNull("Received only "+i+" messages in batch "+j, message);
73                  assertEquals("Batch Message", message.getText());
74              }
75              session.commit();
76          }
77      }
78  
79      public void testSendRollback() throws Exception {
80  
81          Message[] outbound = new Message[]{
82              session.createTextMessage("First Message"),
83              session.createTextMessage("Second Message")
84          };
85  
86          producer.send(outbound[0]);
87          session.commit();
88          producer.send(session.createTextMessage("I'm going to get rolled back."));
89          session.rollback();
90          producer.send(outbound[1]);
91          session.commit();
92  
93          ArrayList messages = new ArrayList();
94          System.out.println("About to consume message 1");
95          Message message = consumer.receive(1000);
96          messages.add(message);
97          System.out.println("Received: " + message);
98  
99          System.out.println("About to consume message 2");
100         message = consumer.receive(4000);
101         messages.add(message);
102         System.out.println("Received: " + message);
103 
104         session.commit();
105 
106         Message inbound[] = new Message[messages.size()];
107         messages.toArray(inbound);
108 
109         assertTextMessagesEqual("Rollback did not work.", outbound, inbound);
110     }
111 
112     public void testReceiveRollback() throws Exception {
113 
114         Message[] outbound = new Message[]{
115             session.createTextMessage("First Message"),
116             session.createTextMessage("Second Message")
117         };
118 
119         // lets consume any outstanding messages from previous test runs
120         while (consumer.receive(1000) != null) {
121         }
122         session.commit();
123 
124         producer.send(outbound[0]);
125         producer.send(outbound[1]);
126         session.commit();
127 
128         System.out.println("Sent 0: " + outbound[0]);
129         System.out.println("Sent 1: " + outbound[1]);
130 
131         ArrayList messages = new ArrayList();
132         Message message = consumer.receive(1000);
133         messages.add(message);
134         assertEquals(outbound[0], message);
135         session.commit();
136 
137         // rollback so we can get that last message again.
138         message = consumer.receive(1000);
139         assertNotNull(message);
140         assertEquals(outbound[1], message);
141         session.rollback();
142 
143         // Consume again.. the previous message should
144         // get redelivered.
145         message = consumer.receive(5000);
146         assertNotNull("Should have re-received the message again!", message);
147         messages.add(message);
148         session.commit();
149 
150         Message inbound[] = new Message[messages.size()];
151         messages.toArray(inbound);
152 
153         assertTextMessagesEqual("Rollback did not work", outbound, inbound);
154     }
155 
156     public void testReceiveTwoThenRollback() throws Exception {
157 
158         Message[] outbound = new Message[]{
159             session.createTextMessage("First Message"),
160             session.createTextMessage("Second Message")
161         };
162 
163         // lets consume any outstanding messages from previous test runs
164         while (consumer.receive(1000) != null) {
165         }
166         session.commit();
167 
168         producer.send(outbound[0]);
169         producer.send(outbound[1]);
170         session.commit();
171 
172         System.out.println("Sent 0: " + outbound[0]);
173         System.out.println("Sent 1: " + outbound[1]);
174 
175         ArrayList messages = new ArrayList();
176         Message message = consumer.receive(1000);
177         assertEquals(outbound[0], message);
178 
179         message = consumer.receive(1000);
180         assertNotNull(message);
181         assertEquals(outbound[1], message);
182         session.rollback();
183 
184         // Consume again.. the previous message should
185         // get redelivered.
186         message = consumer.receive(5000);
187         assertNotNull("Should have re-received the first message again!", message);
188         messages.add(message);
189         assertEquals(outbound[0], message);
190 
191         message = consumer.receive(5000);
192         assertNotNull("Should have re-received the second message again!", message);
193         messages.add(message);
194         assertEquals(outbound[1], message);
195         session.commit();
196 
197         Message inbound[] = new Message[messages.size()];
198         messages.toArray(inbound);
199 
200         assertTextMessagesEqual("Rollback did not work", outbound, inbound);
201     }
202     
203     public void testReceiveTwoThenRollbackManyTimes() throws Exception {
204         for( int i=0; i < 5 ; i++ )
205             testReceiveTwoThenRollback();
206     }            
207 
208     public void testSendRollbackWithPrefetchOfOne() throws Exception {
209         setPrefetchToOne();
210         testSendRollback();
211     }
212 
213     public void testReceiveRollbackWithPrefetchOfOne() throws Exception {
214         setPrefetchToOne();
215         testReceiveRollback();
216     }
217     
218     /*
219      * see http://jira.codehaus.org/browse/AMQ-143 
220      * 
221      */ 
222     public void testCloseConsumerBeforeCommit() throws Exception {
223 
224         TextMessage[] outbound = new TextMessage[]{
225             session.createTextMessage("First Message"),
226             session.createTextMessage("Second Message")
227         };
228 
229         // lets consume any outstanding messages from previous test runs
230         while (consumer.receive(1000) != null) {
231         }
232         session.commit();
233 
234         producer.send(outbound[0]);
235         producer.send(outbound[1]);
236         session.commit();
237 
238         System.out.println("Sent 0: " + outbound[0]);
239         System.out.println("Sent 1: " + outbound[1]);
240 
241         ArrayList messages = new ArrayList();
242         TextMessage message = (TextMessage) consumer.receive(1000);
243         assertEquals(outbound[0].getText(), message.getText());        
244         // Close the consumer before the commit.  This should not cause the received message
245         // to rollback.
246         consumer.close();
247         session.commit();
248 
249         // Create a new consumer
250         consumer = resourceProvider.createConsumer(session, destination);
251         System.out.println("Created consumer: " + consumer);
252 
253         message = (TextMessage) consumer.receive(1000);
254         assertEquals(outbound[1].getText(), message.getText());
255         session.commit();
256     }
257     
258     protected abstract JmsResourceProvider getJmsResourceProvider();
259 
260     protected void setUp() throws Exception {
261         super.setUp();
262 
263         resourceProvider = getJmsResourceProvider();
264         topic = resourceProvider.isTopic();
265         
266         // We will be using transacted sessions.
267         resourceProvider.setTransacted(true);
268 
269         connectionFactory = resourceProvider.createConnectionFactory();
270         reconnect();
271     }
272 
273     /**
274      * @throws JMSException
275      */
276     protected void reconnect() throws JMSException {
277         
278         Connection t = resourceProvider.createConnection(connectionFactory);
279         if( connection != null ) {
280             // Close the previous connection.
281             connection.close();
282         }
283         connection = t;
284         
285         session = resourceProvider.createSession(connection);
286         destination = resourceProvider.createDestination(session, getSubject());
287         producer = resourceProvider.createProducer(session, destination);
288         consumer = resourceProvider.createConsumer(session, destination);
289         connection.start();
290     }
291 
292     protected void tearDown() throws Exception {
293         //System.out.println("Test Done.  Stats");
294         //((ActiveMQConnectionFactory) connectionFactory).getFactoryStats().dump(new IndentPrinter());
295         System.out.println("Closing down connection");
296 
297         session.close();
298         connection.close();
299         System.out.println("Connection closed.");
300     }
301 
302     protected void setPrefetchToOne() {
303         ActiveMQPrefetchPolicy prefetchPolicy = ((ActiveMQConnection) connection).getPrefetchPolicy();
304         prefetchPolicy.setQueuePrefetch(1);
305         prefetchPolicy.setTopicPrefetch(1);
306         prefetchPolicy.setDurableTopicPrefetch(1);
307     }
308 }