Docjar: A Java Source and Docuemnt Enginecom.*    java.*    javax.*    org.*    all    new    plug-in

Quick Search    Search Deep

Source code: org/activemq/test/JmsXATransactionTestSupport.java


1   /** 
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  package org.activemq.test;
19  
20  import org.activemq.ActiveMQConnectionFactory;
21  import org.activemq.TestSupport;
22  import org.activemq.util.IndentPrinter;
23  
24  import javax.jms.Connection;
25  import javax.jms.ConnectionFactory;
26  import javax.jms.Destination;
27  import javax.jms.JMSException;
28  import javax.jms.Message;
29  import javax.jms.MessageConsumer;
30  import javax.jms.MessageProducer;
31  import javax.jms.Session;
32  import javax.jms.XASession;
33  import javax.transaction.xa.XAResource;
34  import java.util.ArrayList;
35  
36  /**
37   * @version $Revision: 1.1.1.1 $
38   */
39  public abstract class JmsXATransactionTestSupport extends TestSupport {
40  
41      static boolean firstRun;
42      
43      protected ConnectionFactory connectionFactory;
44      protected Connection connection;
45      protected Session session;
46      protected MessageConsumer consumer;
47      protected MessageProducer producer;
48      private JmsResourceProvider provider;
49      private MessageProducer producer2;
50      private Destination destination;
51  
52      public JmsXATransactionTestSupport() {
53          super();
54      }
55  
56      public JmsXATransactionTestSupport(String name) {
57          super(name);
58      }
59  
60      public void testSendOutsideXATransaction() throws Exception {
61          try {
62              producer.send(session.createTextMessage("First Message"));
63              fail("Using an XA session outside of a transaction should throw an exception.");
64          }
65          catch (JMSException e) {
66              // normal
67          }
68      }
69  
70  
71      public void testSendRollback() throws Exception {
72  
73          XAResource resource = ((XASession) session).getXAResource();
74          Message[] outbound = new Message[]{
75              session.createTextMessage("First Message"),
76              session.createTextMessage("Second Message")
77          };
78  
79  
80          XidStub xid1 = new XidStub(new byte[]{1, 2, 3, 4, 5});
81          resource.start(xid1, XAResource.TMNOFLAGS);
82          producer.send(outbound[0]);
83          resource.end(xid1, XAResource.TMSUCCESS);
84          resource.commit(xid1, true);
85  
86          XidStub xid2 = new XidStub(new byte[]{2, 2, 3, 4, 5});
87          resource.start(xid2, XAResource.TMNOFLAGS);
88          producer.send(session.createTextMessage("I'm going to get rolled back."));
89          resource.end(xid2, XAResource.TMSUCCESS);
90          resource.rollback(xid2);
91  
92          XidStub xid3 = new XidStub(new byte[]{3, 2, 3, 4, 5});
93          resource.start(xid3, XAResource.TMNOFLAGS);
94          producer.send(outbound[1]);
95          resource.end(xid3, XAResource.TMSUCCESS);
96          if (resource.prepare(xid3) == XAResource.XA_OK) {
97              resource.commit(xid3, false);
98          }
99  
100         ArrayList messages = new ArrayList();
101         XidStub xid4 = new XidStub(new byte[]{4, 2, 3, 4, 5});
102         resource.start(xid4, XAResource.TMNOFLAGS);
103         messages.add(consumer.receive(1000));
104         messages.add(consumer.receive(1000));
105         resource.end(xid4, XAResource.TMSUCCESS);
106         resource.commit(xid4, true);
107 
108         Message inbound[] = new Message[messages.size()];
109         messages.toArray(inbound);
110 
111         assertTextMessagesEqual("Rollback did not work.", outbound, inbound);
112     }
113 
114     public void testSendPrepareRecoverRollback() throws Exception {
115 
116         XAResource resource = ((XASession) session).getXAResource();
117         Message[] outbound = new Message[]{
118             session.createTextMessage("First Message"),
119             session.createTextMessage("Second Message")
120         };
121 
122         XidStub xid2 = new XidStub(new byte[]{1, 2, 3, 4, 6});
123         resource.start(xid2, XAResource.TMNOFLAGS);
124         producer.send(session.createTextMessage("I'm going to get rolled back."));
125         resource.end(xid2, XAResource.TMSUCCESS);
126         assertEquals(XAResource.XA_OK, resource.prepare(xid2));
127 
128 
129         XidStub xid3 = new XidStub(new byte[]{2, 2, 3, 4, 6});
130         resource.start(xid3, XAResource.TMNOFLAGS);
131         producer.send(outbound[1]);
132         resource.end(xid3, XAResource.TMSUCCESS);
133 
134         assertEquals(XAResource.XA_OK, resource.prepare(xid3));
135 
136         restart();
137 
138         resource = ((XASession) session).getXAResource();
139 
140         // lets commit a transaction
141         resource.commit(xid3, false);
142 
143         // lets try rollback a transaction
144         resource.rollback(xid2);
145 
146     }
147 
148     public void testXAWith2Sessions() throws Exception {
149 
150         Session session2 = provider.createSession(connection);
151         System.out.println("Created session: " + session2);
152         producer2 = provider.createProducer(session2, destination);
153         System.out.println("Created producer: " + producer2);
154 
155         XAResource resource1 = ((XASession) session).getXAResource();
156         XAResource resource2 = ((XASession) session2).getXAResource();
157         Message[] outbound = new Message[]{
158             session.createTextMessage("First Message!"),
159             session.createTextMessage("Second Message!")
160         };
161 
162         XidStub xid1 = new XidStub(new byte[]{1, 2, 3, 4, 5});
163         resource1.start(xid1, XAResource.TMNOFLAGS);
164         resource2.start(xid1, XAResource.TMNOFLAGS);
165         producer.send(outbound[0]);
166         producer2.send(outbound[1]);
167         resource1.end(xid1, XAResource.TMSUCCESS);
168         resource2.end(xid1, XAResource.TMSUCCESS);
169         
170         resource1.commit(xid1, true);
171 
172 
173         ArrayList messages = new ArrayList();
174         XidStub xid4 = new XidStub(new byte[]{4, 2, 3, 4, 5});
175         resource1.start(xid4, XAResource.TMNOFLAGS);
176         messages.add(consumer.receive(1000));
177         messages.add(consumer.receive(1000));
178         resource1.end(xid4, XAResource.TMSUCCESS);
179         resource1.commit(xid4, true);
180 
181         Message inbound[] = new Message[messages.size()];
182         messages.toArray(inbound);
183 
184         assertTextMessagesEqual("Failed.", outbound, inbound);
185     }
186 
187     abstract protected JmsResourceProvider getJmsResourceProvider();
188 
189     protected void setUp() throws Exception {
190         if( !firstRun ) {
191             TestSupport.removeMessageStore();
192             firstRun=true;
193         }
194         super.setUp();
195         openConnection();
196     }
197 
198     protected void restart() throws JMSException {
199         try {
200             closeConnection();
201             openConnection();
202         } catch (JMSException e) {
203             e.printStackTrace();
204             throw e;
205         }
206     }
207 
208     protected void openConnection() throws JMSException {
209         provider = getJmsResourceProvider();
210 
211         connectionFactory = provider.createConnectionFactory();
212         connection = provider.createConnection(connectionFactory);
213         System.out.println("Created connection: " + connection);
214         session = provider.createSession(connection);
215         System.out.println("Created session: " + session);
216         destination = provider.createDestination(session, "QUEUE."+getName()+"."+System.currentTimeMillis());
217         System.out.println("Created destination: " + destination + " of type: " + destination.getClass());
218         producer = provider.createProducer(session, destination);
219         System.out.println("Created producer: " + producer);
220         consumer = provider.createConsumer(session, destination);
221         System.out.println("Created consumer: " + consumer);
222         connection.start();
223     }
224 
225     protected void tearDown() throws Exception {
226         System.out.println("Test Done.  Stats");
227         ((ActiveMQConnectionFactory) connectionFactory).getFactoryStats().dump(new IndentPrinter());
228         closeConnection();
229     }
230 
231     private void closeConnection() throws JMSException {
232         System.out.println("Closing down connection");
233 
234         connection.stop();
235         connection.close();
236         System.out.println("Connection closed.");
237     }
238 
239 }