Docjar: A Java Source and Docuemnt Enginecom.*    java.*    javax.*    org.*    all    new    plug-in

Quick Search    Search Deep

Source code: org/activemq/usecases/DeadLetterTest.java


1   /** 
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  package org.activemq.usecases;
19  
20  import javax.jms.Connection;
21  import javax.jms.DeliveryMode;
22  import javax.jms.Destination;
23  import javax.jms.Message;
24  import javax.jms.MessageConsumer;
25  import javax.jms.MessageProducer;
26  import javax.jms.Session;
27  import javax.jms.Topic;
28  import org.activemq.TestSupport;
29  import org.activemq.message.ActiveMQDestination;
30  import org.activemq.service.*;
31  
32  
33  /**
34   * @version $Revision: 1.1.1.1 $
35   */
36  public class DeadLetterTest extends TestSupport {
37      
38      private static final int MESSAGE_COUNT = 10;
39      private static final long TIME_TO_LIVE = 250;
40      private Connection connection;
41      private Session session;
42      private MessageConsumer consumer;
43      private MessageProducer producer;
44      private ActiveMQDestination destination;
45      private int deliveryMode = DeliveryMode.PERSISTENT;
46      private boolean durableSubscriber = false;
47     
48      protected void setUp() throws Exception{
49          super.setUp();
50          connection = createConnection();
51          connection.setClientID(toString());
52          
53          session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
54      }
55      
56      protected void tearDown() throws Exception {
57          connection.close();
58      }
59      
60      protected void doTest() throws Exception{
61          destination = (ActiveMQDestination) createDestination(getClass().getName());
62          producer = session.createProducer(destination);
63          producer.setDeliveryMode(deliveryMode);
64          producer.setTimeToLive(TIME_TO_LIVE);
65  
66          DeadLetterPolicy dlq = new DeadLetterPolicy();
67          String dlqName = dlq.getDeadLetterNameFromDestination(destination);
68          Destination dlqDestination = session.createQueue(dlqName);
69          MessageConsumer dlqConsumer = session.createConsumer(dlqDestination); 
70          
71          if (durableSubscriber){
72              consumer = session.createDurableSubscriber((Topic)destination, destination.toString());
73          }else {
74              consumer = session.createConsumer(destination);
75          }
76          
77          for (int i =0; i < MESSAGE_COUNT;i++){
78              Message message = session.createTextMessage("msg: " + i);
79              producer.send(message);
80          }
81          Thread.sleep(TIME_TO_LIVE * 2);
82          connection.start();
83          
84          
85          for (int i =0; i < MESSAGE_COUNT; i++){
86              Message msg = consumer.receive(10);
87              assertNull("Should be null message", msg);
88          }
89          
90          for (int i =0; i < MESSAGE_COUNT; i++){
91              Message msg = dlqConsumer.receive(1000);
92              assertNotNull("Should be  message", msg);
93          }
94          
95          
96      }
97      
98     
99      
100     public void testDurableTopicMessageExpiration() throws Exception{
101         super.topic = true;
102         deliveryMode = DeliveryMode.PERSISTENT;
103         durableSubscriber = true;
104         doTest();
105     }
106     
107     public void testTransientQueueMessageExpiration() throws Exception{
108         super.topic = false;
109         deliveryMode = DeliveryMode.NON_PERSISTENT;
110         durableSubscriber = false;
111         doTest();
112     }
113     
114     public void testDurableQueueMessageExpiration() throws Exception{
115         super.topic = false;
116         deliveryMode = DeliveryMode.PERSISTENT;
117         durableSubscriber = false;
118         doTest();
119     }
120    
121 }