Source code: org/activemq/usecases/DeadLetterTest.java
1 /**
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18 package org.activemq.usecases;
19
20 import javax.jms.Connection;
21 import javax.jms.DeliveryMode;
22 import javax.jms.Destination;
23 import javax.jms.Message;
24 import javax.jms.MessageConsumer;
25 import javax.jms.MessageProducer;
26 import javax.jms.Session;
27 import javax.jms.Topic;
28 import org.activemq.TestSupport;
29 import org.activemq.message.ActiveMQDestination;
30 import org.activemq.service.*;
31
32
33 /**
34 * @version $Revision: 1.1.1.1 $
35 */
36 public class DeadLetterTest extends TestSupport {
37
38 private static final int MESSAGE_COUNT = 10;
39 private static final long TIME_TO_LIVE = 250;
40 private Connection connection;
41 private Session session;
42 private MessageConsumer consumer;
43 private MessageProducer producer;
44 private ActiveMQDestination destination;
45 private int deliveryMode = DeliveryMode.PERSISTENT;
46 private boolean durableSubscriber = false;
47
48 protected void setUp() throws Exception{
49 super.setUp();
50 connection = createConnection();
51 connection.setClientID(toString());
52
53 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
54 }
55
56 protected void tearDown() throws Exception {
57 connection.close();
58 }
59
60 protected void doTest() throws Exception{
61 destination = (ActiveMQDestination) createDestination(getClass().getName());
62 producer = session.createProducer(destination);
63 producer.setDeliveryMode(deliveryMode);
64 producer.setTimeToLive(TIME_TO_LIVE);
65
66 DeadLetterPolicy dlq = new DeadLetterPolicy();
67 String dlqName = dlq.getDeadLetterNameFromDestination(destination);
68 Destination dlqDestination = session.createQueue(dlqName);
69 MessageConsumer dlqConsumer = session.createConsumer(dlqDestination);
70
71 if (durableSubscriber){
72 consumer = session.createDurableSubscriber((Topic)destination, destination.toString());
73 }else {
74 consumer = session.createConsumer(destination);
75 }
76
77 for (int i =0; i < MESSAGE_COUNT;i++){
78 Message message = session.createTextMessage("msg: " + i);
79 producer.send(message);
80 }
81 Thread.sleep(TIME_TO_LIVE * 2);
82 connection.start();
83
84
85 for (int i =0; i < MESSAGE_COUNT; i++){
86 Message msg = consumer.receive(10);
87 assertNull("Should be null message", msg);
88 }
89
90 for (int i =0; i < MESSAGE_COUNT; i++){
91 Message msg = dlqConsumer.receive(1000);
92 assertNotNull("Should be message", msg);
93 }
94
95
96 }
97
98
99
100 public void testDurableTopicMessageExpiration() throws Exception{
101 super.topic = true;
102 deliveryMode = DeliveryMode.PERSISTENT;
103 durableSubscriber = true;
104 doTest();
105 }
106
107 public void testTransientQueueMessageExpiration() throws Exception{
108 super.topic = false;
109 deliveryMode = DeliveryMode.NON_PERSISTENT;
110 durableSubscriber = false;
111 doTest();
112 }
113
114 public void testDurableQueueMessageExpiration() throws Exception{
115 super.topic = false;
116 deliveryMode = DeliveryMode.PERSISTENT;
117 durableSubscriber = false;
118 doTest();
119 }
120
121 }