Source code: org/activemq/JmsRedeliveredTest.java
1 /**
2 *
3 * Copyright 2004 Hiram Chirino
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18 package org.activemq;
19
20 import javax.jms.Connection;
21 import javax.jms.DeliveryMode;
22 import javax.jms.Destination;
23 import javax.jms.JMSException;
24 import javax.jms.Message;
25 import javax.jms.MessageConsumer;
26 import javax.jms.MessageProducer;
27 import javax.jms.Queue;
28 import javax.jms.Session;
29 import javax.jms.TextMessage;
30 import javax.jms.Topic;
31
32 import junit.framework.Test;
33 import junit.framework.TestCase;
34 import junit.framework.TestSuite;
35
36 /**
37 * @version $Revision: 1.1.1.1 $
38 */
39 public class JmsRedeliveredTest extends TestCase {
40
41 private Connection connection;
42
43 protected void setUp() throws Exception {
44 connection = createConnection();
45 }
46
47 /**
48 * @see junit.framework.TestCase#tearDown()
49 */
50 protected void tearDown() throws Exception {
51 if (connection != null) {
52 connection.close();
53 connection = null;
54 }
55 }
56
57 protected Connection createConnection() throws JMSException{
58 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
59 return factory.createConnection();
60 }
61
62 /*
63 * TODO: Currently fails.
64 */
65 public void testQueueSessionCloseMarksMessageRedelivered() throws JMSException {
66
67 connection.start();
68
69 Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
70 Queue queue = session.createQueue("queue-"+getName());
71 MessageProducer producer = createProducer(session, queue);
72 producer.send(createTextMessage(session));
73
74 // Consume the message...
75 MessageConsumer consumer = session.createConsumer(queue);
76 Message msg = consumer.receive(1000);
77 assertNotNull(msg);
78 assertFalse("Message should not be redelivered.", msg.getJMSRedelivered());
79 // Don't ack the message.
80
81 // Reset the session. This should cause the Unacked message to be redelivered.
82 session.close();
83 session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
84
85 // Attempt to Consume the message...
86 consumer = session.createConsumer(queue);
87 msg = consumer.receive(2000);
88 assertNotNull(msg);
89 assertTrue("Message should be redelivered.", msg.getJMSRedelivered());
90 msg.acknowledge();
91
92 session.close();
93 }
94
95
96 public void testQueueRecoverMarksMessageRedelivered() throws JMSException {
97
98 connection.start();
99
100 Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
101 Queue queue = session.createQueue("queue-"+getName());
102 MessageProducer producer = createProducer(session, queue);
103 producer.send(createTextMessage(session));
104
105 // Consume the message...
106 MessageConsumer consumer = session.createConsumer(queue);
107 Message msg = consumer.receive(1000);
108 assertNotNull(msg);
109 assertFalse("Message should not be redelivered.", msg.getJMSRedelivered());
110 // Don't ack the message.
111
112 // Reset the session. This should cause the Unacked message to be redelivered.
113 session.recover();
114
115 // Attempt to Consume the message...
116 msg = consumer.receive(2000);
117 assertNotNull(msg);
118 assertTrue("Message should be redelivered.", msg.getJMSRedelivered());
119 msg.acknowledge();
120
121 session.close();
122 }
123
124 public void testQueueRollbackMarksMessageRedelivered() throws JMSException {
125 connection.start();
126
127 Session session = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE);
128 Queue queue = session.createQueue("queue-"+getName());
129 MessageProducer producer = createProducer(session, queue);
130 producer.send(createTextMessage(session));
131 session.commit();
132
133 // Get the message... Should not be redelivered.
134 MessageConsumer consumer = session.createConsumer(queue);
135 Message msg = consumer.receive(1000);
136 assertNotNull(msg);
137 assertFalse("Message should not be redelivered.", msg.getJMSRedelivered());
138
139 // Rollback.. should cause redelivery.
140 session.rollback();
141
142 // Attempt to Consume the message...
143 msg = consumer.receive(2000);
144 assertNotNull(msg);
145 assertTrue("Message should be redelivered.", msg.getJMSRedelivered());
146
147
148 session.commit();
149 session.close();
150 }
151
152 public void testDurableTopicSessionCloseMarksMessageRedelivered() throws JMSException {
153
154 connection.setClientID(getName());
155 connection.start();
156
157 Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
158 Topic topic = session.createTopic("topic-"+getName());
159 MessageConsumer consumer = session.createDurableSubscriber(topic, "sub1");
160
161 MessageProducer producer = createProducer(session, topic);
162 producer.send(createTextMessage(session));
163
164 // Consume the message...
165 Message msg = consumer.receive(1000);
166 assertNotNull(msg);
167 assertFalse("Message should not be redelivered.", msg.getJMSRedelivered());
168 // Don't ack the message.
169
170 // Reset the session. This should cause the Unacked message to be redelivered.
171 session.close();
172 session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
173
174 // Attempt to Consume the message...
175 consumer = session.createDurableSubscriber(topic, "sub1");
176 msg = consumer.receive(2000);
177 assertNotNull(msg);
178 assertTrue("Message should be redelivered.", msg.getJMSRedelivered());
179 msg.acknowledge();
180
181 session.close();
182 }
183
184 public void testDurableTopicRecoverMarksMessageRedelivered() throws JMSException {
185
186 connection.setClientID(getName());
187 connection.start();
188
189 Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
190 Topic topic = session.createTopic("topic-"+getName());
191 MessageConsumer consumer = session.createDurableSubscriber(topic, "sub1");
192
193 MessageProducer producer = createProducer(session, topic);
194 producer.send(createTextMessage(session));
195
196 // Consume the message...
197 Message msg = consumer.receive(1000);
198 assertNotNull(msg);
199 assertFalse("Message should not be redelivered.", msg.getJMSRedelivered());
200 // Don't ack the message.
201
202 // Reset the session. This should cause the Unacked message to be redelivered.
203 session.recover();
204
205 // Attempt to Consume the message...
206 msg = consumer.receive(2000);
207 assertNotNull(msg);
208 assertTrue("Message should be redelivered.", msg.getJMSRedelivered());
209 msg.acknowledge();
210
211 session.close();
212 }
213
214 public void testDurableTopicRollbackMarksMessageRedelivered() throws JMSException {
215 connection.setClientID(getName());
216 connection.start();
217
218 Session session = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE);
219 Topic topic = session.createTopic("topic-"+getName());
220 MessageConsumer consumer = session.createDurableSubscriber(topic, "sub1");
221
222 MessageProducer producer = createProducer(session, topic);
223 producer.send(createTextMessage(session));
224 session.commit();
225
226 // Get the message... Should not be redelivered.
227 Message msg = consumer.receive(1000);
228 assertNotNull(msg);
229 assertFalse("Message should not be redelivered.", msg.getJMSRedelivered());
230
231 // Rollback.. should cause redelivery.
232 session.rollback();
233
234 // Attempt to Consume the message...
235 msg = consumer.receive(2000);
236 assertNotNull(msg);
237 assertTrue("Message should be redelivered.", msg.getJMSRedelivered());
238
239
240 session.commit();
241 session.close();
242 }
243
244 public void testTopicRecoverMarksMessageRedelivered() throws JMSException {
245
246 connection.setClientID(getName());
247 connection.start();
248
249 Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
250 Topic topic = session.createTopic("topic-"+getName());
251 MessageConsumer consumer = session.createConsumer(topic);
252
253 MessageProducer producer = createProducer(session, topic);
254 producer.send(createTextMessage(session));
255
256 // Consume the message...
257 Message msg = consumer.receive(1000);
258 assertNotNull(msg);
259 assertFalse("Message should not be redelivered.", msg.getJMSRedelivered());
260 // Don't ack the message.
261
262 // Reset the session. This should cause the Unacked message to be redelivered.
263 session.recover();
264
265 // Attempt to Consume the message...
266 msg = consumer.receive(2000);
267 assertNotNull(msg);
268 assertTrue("Message should be redelivered.", msg.getJMSRedelivered());
269 msg.acknowledge();
270
271 session.close();
272 }
273
274 public void testTopicRollbackMarksMessageRedelivered() throws JMSException {
275 connection.setClientID(getName());
276 connection.start();
277
278 Session session = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE);
279 Topic topic = session.createTopic("topic-"+getName());
280 MessageConsumer consumer = session.createConsumer(topic);
281
282 MessageProducer producer = createProducer(session, topic);
283 producer.send(createTextMessage(session));
284 session.commit();
285
286 // Get the message... Should not be redelivered.
287 Message msg = consumer.receive(1000);
288 assertNotNull(msg);
289 assertFalse("Message should not be redelivered.", msg.getJMSRedelivered());
290
291 // Rollback.. should cause redelivery.
292 session.rollback();
293
294 // Attempt to Consume the message...
295 msg = consumer.receive(2000);
296 assertNotNull(msg);
297 assertTrue("Message should be redelivered.", msg.getJMSRedelivered());
298
299 session.commit();
300 session.close();
301 }
302
303 private TextMessage createTextMessage(Session session) throws JMSException {
304 return session.createTextMessage("Hello");
305 }
306
307 private MessageProducer createProducer(Session session, Destination queue) throws JMSException {
308 MessageProducer producer = session.createProducer(queue);
309 producer.setDeliveryMode(getDeliveryMode());
310 return producer;
311 }
312
313 protected int getDeliveryMode() {
314 return DeliveryMode.PERSISTENT;
315 }
316
317 static final public class PersistentCase extends JmsRedeliveredTest {
318 protected int getDeliveryMode() {
319 return DeliveryMode.PERSISTENT;
320 }
321 }
322
323 static final public class TransientCase extends JmsRedeliveredTest {
324 protected int getDeliveryMode() {
325 return DeliveryMode.NON_PERSISTENT;
326 }
327 }
328
329 public static Test suite() {
330 TestSuite suite= new TestSuite();
331 suite.addTestSuite(PersistentCase.class);
332 suite.addTestSuite(TransientCase.class);
333 return suite;
334 }
335 }