Docjar: A Java Source and Docuemnt Enginecom.*    java.*    javax.*    org.*    all    new    plug-in

Quick Search    Search Deep

Source code: org/activemq/JmsRedeliveredTest.java


1   /** 
2    * 
3    * Copyright 2004 Hiram Chirino
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  package org.activemq;
19  
20  import javax.jms.Connection;
21  import javax.jms.DeliveryMode;
22  import javax.jms.Destination;
23  import javax.jms.JMSException;
24  import javax.jms.Message;
25  import javax.jms.MessageConsumer;
26  import javax.jms.MessageProducer;
27  import javax.jms.Queue;
28  import javax.jms.Session;
29  import javax.jms.TextMessage;
30  import javax.jms.Topic;
31  
32  import junit.framework.Test;
33  import junit.framework.TestCase;
34  import junit.framework.TestSuite;
35  
36  /**
37   * @version $Revision: 1.1.1.1 $
38   */
39  public class JmsRedeliveredTest extends TestCase {
40  
41      private Connection connection;
42      
43      protected void setUp() throws Exception {
44          connection = createConnection();
45      }
46  
47      /**
48       * @see junit.framework.TestCase#tearDown()
49       */
50      protected void tearDown() throws Exception {
51          if (connection != null) {
52              connection.close();
53              connection = null;
54          }
55      }
56      
57      protected Connection createConnection() throws JMSException{
58          ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
59          return factory.createConnection();
60      }
61  
62      /*
63       * TODO: Currently fails. 
64       */
65      public void testQueueSessionCloseMarksMessageRedelivered() throws JMSException {
66  
67          connection.start();
68  
69          Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
70          Queue queue = session.createQueue("queue-"+getName());
71          MessageProducer producer = createProducer(session, queue);
72          producer.send(createTextMessage(session));
73  
74          // Consume the message...
75          MessageConsumer consumer = session.createConsumer(queue);
76          Message msg = consumer.receive(1000);
77          assertNotNull(msg);        
78          assertFalse("Message should not be redelivered.", msg.getJMSRedelivered());
79          // Don't ack the message.
80          
81          // Reset the session.  This should cause the Unacked message to be redelivered.
82          session.close();
83          session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
84                  
85          // Attempt to Consume the message...
86          consumer = session.createConsumer(queue);
87          msg = consumer.receive(2000);
88          assertNotNull(msg);        
89          assertTrue("Message should be redelivered.", msg.getJMSRedelivered());
90          msg.acknowledge();
91          
92          session.close();
93      }
94  
95      
96      public void testQueueRecoverMarksMessageRedelivered() throws JMSException {
97  
98          connection.start();
99  
100         Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
101         Queue queue = session.createQueue("queue-"+getName());
102         MessageProducer producer = createProducer(session, queue);
103         producer.send(createTextMessage(session));
104 
105         // Consume the message...
106         MessageConsumer consumer = session.createConsumer(queue);
107         Message msg = consumer.receive(1000);
108         assertNotNull(msg);        
109         assertFalse("Message should not be redelivered.", msg.getJMSRedelivered());
110         // Don't ack the message.
111         
112         // Reset the session.  This should cause the Unacked message to be redelivered.
113         session.recover();
114                 
115         // Attempt to Consume the message...
116         msg = consumer.receive(2000);
117         assertNotNull(msg);        
118         assertTrue("Message should be redelivered.", msg.getJMSRedelivered());
119         msg.acknowledge();
120         
121         session.close();
122     }
123     
124     public void testQueueRollbackMarksMessageRedelivered() throws JMSException {
125         connection.start();
126 
127         Session session = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE);
128         Queue queue = session.createQueue("queue-"+getName());
129         MessageProducer producer = createProducer(session, queue);
130         producer.send(createTextMessage(session));
131         session.commit();
132         
133         // Get the message... Should not be redelivered.
134         MessageConsumer consumer = session.createConsumer(queue);
135         Message msg = consumer.receive(1000);
136         assertNotNull(msg);        
137         assertFalse("Message should not be redelivered.", msg.getJMSRedelivered());
138         
139         // Rollback.. should cause redelivery.
140         session.rollback();
141                         
142         // Attempt to Consume the message...
143         msg = consumer.receive(2000);
144         assertNotNull(msg);        
145         assertTrue("Message should be redelivered.", msg.getJMSRedelivered());
146         
147         
148         session.commit();
149         session.close();
150     }
151     
152     public void testDurableTopicSessionCloseMarksMessageRedelivered() throws JMSException {
153 
154         connection.setClientID(getName());
155         connection.start();
156 
157         Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
158         Topic topic = session.createTopic("topic-"+getName());
159         MessageConsumer consumer = session.createDurableSubscriber(topic, "sub1");
160 
161         MessageProducer producer = createProducer(session, topic);
162         producer.send(createTextMessage(session));
163 
164         // Consume the message...
165         Message msg = consumer.receive(1000);
166         assertNotNull(msg);        
167         assertFalse("Message should not be redelivered.", msg.getJMSRedelivered());
168         // Don't ack the message.
169         
170         // Reset the session.  This should cause the Unacked message to be redelivered.
171         session.close();
172         session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
173                 
174         // Attempt to Consume the message...
175         consumer = session.createDurableSubscriber(topic, "sub1");
176         msg = consumer.receive(2000);
177         assertNotNull(msg);        
178         assertTrue("Message should be redelivered.", msg.getJMSRedelivered());
179         msg.acknowledge();
180         
181         session.close();
182     }
183     
184     public void testDurableTopicRecoverMarksMessageRedelivered() throws JMSException {
185 
186         connection.setClientID(getName());
187         connection.start();
188 
189         Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
190         Topic topic = session.createTopic("topic-"+getName());
191         MessageConsumer consumer = session.createDurableSubscriber(topic, "sub1");
192 
193         MessageProducer producer = createProducer(session, topic);
194         producer.send(createTextMessage(session));
195 
196         // Consume the message...
197         Message msg = consumer.receive(1000);
198         assertNotNull(msg);        
199         assertFalse("Message should not be redelivered.", msg.getJMSRedelivered());
200         // Don't ack the message.
201         
202         // Reset the session.  This should cause the Unacked message to be redelivered.
203         session.recover();
204                 
205         // Attempt to Consume the message...
206         msg = consumer.receive(2000);
207         assertNotNull(msg);        
208         assertTrue("Message should be redelivered.", msg.getJMSRedelivered());
209         msg.acknowledge();
210         
211         session.close();
212     }
213     
214     public void testDurableTopicRollbackMarksMessageRedelivered() throws JMSException {
215         connection.setClientID(getName());
216         connection.start();
217 
218         Session session = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE);
219         Topic topic = session.createTopic("topic-"+getName());
220         MessageConsumer consumer = session.createDurableSubscriber(topic, "sub1");
221 
222         MessageProducer producer = createProducer(session, topic);
223         producer.send(createTextMessage(session));
224         session.commit();
225         
226         // Get the message... Should not be redelivered.
227         Message msg = consumer.receive(1000);
228         assertNotNull(msg);        
229         assertFalse("Message should not be redelivered.", msg.getJMSRedelivered());
230         
231         // Rollback.. should cause redelivery.
232         session.rollback();
233                         
234         // Attempt to Consume the message...
235         msg = consumer.receive(2000);
236         assertNotNull(msg);        
237         assertTrue("Message should be redelivered.", msg.getJMSRedelivered());
238         
239         
240         session.commit();
241         session.close();
242     }
243      
244     public void testTopicRecoverMarksMessageRedelivered() throws JMSException {
245 
246         connection.setClientID(getName());
247         connection.start();
248 
249         Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
250         Topic topic = session.createTopic("topic-"+getName());
251         MessageConsumer consumer = session.createConsumer(topic);
252 
253         MessageProducer producer = createProducer(session, topic);
254         producer.send(createTextMessage(session));
255 
256         // Consume the message...
257         Message msg = consumer.receive(1000);
258         assertNotNull(msg);        
259         assertFalse("Message should not be redelivered.", msg.getJMSRedelivered());
260         // Don't ack the message.
261         
262         // Reset the session.  This should cause the Unacked message to be redelivered.
263         session.recover();
264                 
265         // Attempt to Consume the message...
266         msg = consumer.receive(2000);
267         assertNotNull(msg);        
268         assertTrue("Message should be redelivered.", msg.getJMSRedelivered());
269         msg.acknowledge();
270         
271         session.close();
272     }
273     
274     public void testTopicRollbackMarksMessageRedelivered() throws JMSException {
275         connection.setClientID(getName());
276         connection.start();
277 
278         Session session = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE);
279         Topic topic = session.createTopic("topic-"+getName());
280         MessageConsumer consumer = session.createConsumer(topic);
281 
282         MessageProducer producer = createProducer(session, topic);
283         producer.send(createTextMessage(session));
284         session.commit();
285         
286         // Get the message... Should not be redelivered.
287         Message msg = consumer.receive(1000);
288         assertNotNull(msg);        
289         assertFalse("Message should not be redelivered.", msg.getJMSRedelivered());
290         
291         // Rollback.. should cause redelivery.
292         session.rollback();
293                         
294         // Attempt to Consume the message...
295         msg = consumer.receive(2000);
296         assertNotNull(msg);        
297         assertTrue("Message should be redelivered.", msg.getJMSRedelivered());        
298         
299         session.commit();
300         session.close();
301     }
302 
303     private TextMessage createTextMessage(Session session) throws JMSException {
304         return session.createTextMessage("Hello");
305     }
306    
307     private MessageProducer createProducer(Session session, Destination queue) throws JMSException {
308          MessageProducer producer = session.createProducer(queue);
309          producer.setDeliveryMode(getDeliveryMode());
310          return producer;
311     }
312     
313     protected int getDeliveryMode() {
314         return DeliveryMode.PERSISTENT;
315     }
316 
317     static final public class PersistentCase extends JmsRedeliveredTest {
318         protected int getDeliveryMode() {
319             return DeliveryMode.PERSISTENT;
320         }
321     }
322 
323     static final public class TransientCase extends JmsRedeliveredTest {
324         protected int getDeliveryMode() {
325             return DeliveryMode.NON_PERSISTENT;
326         }
327     }
328     
329     public static Test suite() { 
330         TestSuite suite= new TestSuite(); 
331         suite.addTestSuite(PersistentCase.class); 
332         suite.addTestSuite(TransientCase.class); 
333         return suite;
334     }
335 }