Source code: org/activemq/test/JmsXATransactionTestSupport.java
1 /**
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18 package org.activemq.test;
19
20 import org.activemq.ActiveMQConnectionFactory;
21 import org.activemq.TestSupport;
22 import org.activemq.util.IndentPrinter;
23
24 import javax.jms.Connection;
25 import javax.jms.ConnectionFactory;
26 import javax.jms.Destination;
27 import javax.jms.JMSException;
28 import javax.jms.Message;
29 import javax.jms.MessageConsumer;
30 import javax.jms.MessageProducer;
31 import javax.jms.Session;
32 import javax.jms.XASession;
33 import javax.transaction.xa.XAResource;
34 import java.util.ArrayList;
35
36 /**
37 * @version $Revision: 1.1.1.1 $
38 */
39 public abstract class JmsXATransactionTestSupport extends TestSupport {
40
41 static boolean firstRun;
42
43 protected ConnectionFactory connectionFactory;
44 protected Connection connection;
45 protected Session session;
46 protected MessageConsumer consumer;
47 protected MessageProducer producer;
48 private JmsResourceProvider provider;
49 private MessageProducer producer2;
50 private Destination destination;
51
52 public JmsXATransactionTestSupport() {
53 super();
54 }
55
56 public JmsXATransactionTestSupport(String name) {
57 super(name);
58 }
59
60 public void testSendOutsideXATransaction() throws Exception {
61 try {
62 producer.send(session.createTextMessage("First Message"));
63 fail("Using an XA session outside of a transaction should throw an exception.");
64 }
65 catch (JMSException e) {
66 // normal
67 }
68 }
69
70
71 public void testSendRollback() throws Exception {
72
73 XAResource resource = ((XASession) session).getXAResource();
74 Message[] outbound = new Message[]{
75 session.createTextMessage("First Message"),
76 session.createTextMessage("Second Message")
77 };
78
79
80 XidStub xid1 = new XidStub(new byte[]{1, 2, 3, 4, 5});
81 resource.start(xid1, XAResource.TMNOFLAGS);
82 producer.send(outbound[0]);
83 resource.end(xid1, XAResource.TMSUCCESS);
84 resource.commit(xid1, true);
85
86 XidStub xid2 = new XidStub(new byte[]{2, 2, 3, 4, 5});
87 resource.start(xid2, XAResource.TMNOFLAGS);
88 producer.send(session.createTextMessage("I'm going to get rolled back."));
89 resource.end(xid2, XAResource.TMSUCCESS);
90 resource.rollback(xid2);
91
92 XidStub xid3 = new XidStub(new byte[]{3, 2, 3, 4, 5});
93 resource.start(xid3, XAResource.TMNOFLAGS);
94 producer.send(outbound[1]);
95 resource.end(xid3, XAResource.TMSUCCESS);
96 if (resource.prepare(xid3) == XAResource.XA_OK) {
97 resource.commit(xid3, false);
98 }
99
100 ArrayList messages = new ArrayList();
101 XidStub xid4 = new XidStub(new byte[]{4, 2, 3, 4, 5});
102 resource.start(xid4, XAResource.TMNOFLAGS);
103 messages.add(consumer.receive(1000));
104 messages.add(consumer.receive(1000));
105 resource.end(xid4, XAResource.TMSUCCESS);
106 resource.commit(xid4, true);
107
108 Message inbound[] = new Message[messages.size()];
109 messages.toArray(inbound);
110
111 assertTextMessagesEqual("Rollback did not work.", outbound, inbound);
112 }
113
114 public void testSendPrepareRecoverRollback() throws Exception {
115
116 XAResource resource = ((XASession) session).getXAResource();
117 Message[] outbound = new Message[]{
118 session.createTextMessage("First Message"),
119 session.createTextMessage("Second Message")
120 };
121
122 XidStub xid2 = new XidStub(new byte[]{1, 2, 3, 4, 6});
123 resource.start(xid2, XAResource.TMNOFLAGS);
124 producer.send(session.createTextMessage("I'm going to get rolled back."));
125 resource.end(xid2, XAResource.TMSUCCESS);
126 assertEquals(XAResource.XA_OK, resource.prepare(xid2));
127
128
129 XidStub xid3 = new XidStub(new byte[]{2, 2, 3, 4, 6});
130 resource.start(xid3, XAResource.TMNOFLAGS);
131 producer.send(outbound[1]);
132 resource.end(xid3, XAResource.TMSUCCESS);
133
134 assertEquals(XAResource.XA_OK, resource.prepare(xid3));
135
136 restart();
137
138 resource = ((XASession) session).getXAResource();
139
140 // lets commit a transaction
141 resource.commit(xid3, false);
142
143 // lets try rollback a transaction
144 resource.rollback(xid2);
145
146 }
147
148 public void testXAWith2Sessions() throws Exception {
149
150 Session session2 = provider.createSession(connection);
151 System.out.println("Created session: " + session2);
152 producer2 = provider.createProducer(session2, destination);
153 System.out.println("Created producer: " + producer2);
154
155 XAResource resource1 = ((XASession) session).getXAResource();
156 XAResource resource2 = ((XASession) session2).getXAResource();
157 Message[] outbound = new Message[]{
158 session.createTextMessage("First Message!"),
159 session.createTextMessage("Second Message!")
160 };
161
162 XidStub xid1 = new XidStub(new byte[]{1, 2, 3, 4, 5});
163 resource1.start(xid1, XAResource.TMNOFLAGS);
164 resource2.start(xid1, XAResource.TMNOFLAGS);
165 producer.send(outbound[0]);
166 producer2.send(outbound[1]);
167 resource1.end(xid1, XAResource.TMSUCCESS);
168 resource2.end(xid1, XAResource.TMSUCCESS);
169
170 resource1.commit(xid1, true);
171
172
173 ArrayList messages = new ArrayList();
174 XidStub xid4 = new XidStub(new byte[]{4, 2, 3, 4, 5});
175 resource1.start(xid4, XAResource.TMNOFLAGS);
176 messages.add(consumer.receive(1000));
177 messages.add(consumer.receive(1000));
178 resource1.end(xid4, XAResource.TMSUCCESS);
179 resource1.commit(xid4, true);
180
181 Message inbound[] = new Message[messages.size()];
182 messages.toArray(inbound);
183
184 assertTextMessagesEqual("Failed.", outbound, inbound);
185 }
186
187 abstract protected JmsResourceProvider getJmsResourceProvider();
188
189 protected void setUp() throws Exception {
190 if( !firstRun ) {
191 TestSupport.removeMessageStore();
192 firstRun=true;
193 }
194 super.setUp();
195 openConnection();
196 }
197
198 protected void restart() throws JMSException {
199 try {
200 closeConnection();
201 openConnection();
202 } catch (JMSException e) {
203 e.printStackTrace();
204 throw e;
205 }
206 }
207
208 protected void openConnection() throws JMSException {
209 provider = getJmsResourceProvider();
210
211 connectionFactory = provider.createConnectionFactory();
212 connection = provider.createConnection(connectionFactory);
213 System.out.println("Created connection: " + connection);
214 session = provider.createSession(connection);
215 System.out.println("Created session: " + session);
216 destination = provider.createDestination(session, "QUEUE."+getName()+"."+System.currentTimeMillis());
217 System.out.println("Created destination: " + destination + " of type: " + destination.getClass());
218 producer = provider.createProducer(session, destination);
219 System.out.println("Created producer: " + producer);
220 consumer = provider.createConsumer(session, destination);
221 System.out.println("Created consumer: " + consumer);
222 connection.start();
223 }
224
225 protected void tearDown() throws Exception {
226 System.out.println("Test Done. Stats");
227 ((ActiveMQConnectionFactory) connectionFactory).getFactoryStats().dump(new IndentPrinter());
228 closeConnection();
229 }
230
231 private void closeConnection() throws JMSException {
232 System.out.println("Closing down connection");
233
234 connection.stop();
235 connection.close();
236 System.out.println("Connection closed.");
237 }
238
239 }