Docjar: A Java Source and Docuemnt Enginecom.*    java.*    javax.*    org.*    all    new    plug-in

Quick Search    Search Deep

Source code: org/activemq/JmsQueueTransactionTest.java


1   /** 
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  package org.activemq;
19  
20  import java.util.ArrayList;
21  import java.util.Enumeration;
22  
23  import javax.jms.Message;
24  import javax.jms.MessageConsumer;
25  import javax.jms.MessageProducer;
26  import javax.jms.Queue;
27  import javax.jms.QueueBrowser;
28  import javax.jms.Session;
29  import javax.jms.TextMessage;
30  
31  import org.activemq.test.JmsResourceProvider;
32  
33  
34  /**
35   * @version $Revision: 1.1.1.1 $
36   */
37  public class JmsQueueTransactionTest extends JmsTransactionTestSupport {
38    
39    /**
40     * @see org.activemq.JmsTransactionTestSupport#getJmsResourceProvider()
41     */
42    protected JmsResourceProvider getJmsResourceProvider() {
43          JmsResourceProvider p = new JmsResourceProvider();
44          p.setTopic(false); 
45          return p;
46    }    
47  
48      public void testReceiveTwoThenCloseConnection() throws Exception {
49  
50          Message[] outbound = new Message[]{
51              session.createTextMessage("First Message"),
52              session.createTextMessage("Second Message")
53          };
54  
55          // lets consume any outstanding messages from previous test runs
56          while (consumer.receive(1000) != null) {
57          }
58          session.commit();
59  
60          producer.send(outbound[0]);
61          producer.send(outbound[1]);
62          session.commit();
63  
64          System.out.println("Sent 0: " + outbound[0]);
65          System.out.println("Sent 1: " + outbound[1]);
66  
67          ArrayList messages = new ArrayList();
68          Message message = consumer.receive(1000);
69          assertEquals(outbound[0], message);
70  
71          message = consumer.receive(1000);
72          assertNotNull(message);
73          assertEquals(outbound[1], message);
74          
75          // Close and reopen connection.
76          reconnect();        
77          
78          // Consume again.. the previous message should
79          // get redelivered.
80          message = consumer.receive(5000);
81          assertNotNull("Should have re-received the first message again!", message);
82          messages.add(message);
83          assertEquals(outbound[0], message);
84  
85          message = consumer.receive(5000);
86          assertNotNull("Should have re-received the second message again!", message);
87          messages.add(message);
88          assertEquals(outbound[1], message);
89          session.commit();
90  
91          Message inbound[] = new Message[messages.size()];
92          messages.toArray(inbound);
93  
94          assertTextMessagesEqual("Rollback did not work", outbound, inbound);
95      }
96      
97      public void testSendReceiveInSeperateSessionTest() throws Exception {
98  
99          session.close();        
100         int batchCount = 10;
101 
102         for( int i=0; i < batchCount; i++) {
103             {
104                 Session session = resourceProvider.createSession(connection);
105                 MessageProducer producer = resourceProvider.createProducer(session, destination);
106                 //consumer = resourceProvider.createConsumer(session, destination);
107                 producer.send(session.createTextMessage("Test Message: "+i));                   
108                 session.commit();
109                 session.close();
110             }
111             {
112                 Session session = resourceProvider.createSession(connection);
113                 MessageConsumer consumer = resourceProvider.createConsumer(session, destination);
114 
115                 TextMessage message = (TextMessage) consumer.receive(1000*5);
116                 assertNotNull("Received only "+i+" messages in batch ", message);
117                 assertEquals("Test Message: "+i, message.getText());
118 
119                 session.commit();
120                 session.close();
121             }
122         }
123     }
124     
125     public void testReceiveBrowseReceive() throws Exception {
126 
127         Message[] outbound = new Message[] { session.createTextMessage("First Message"),
128                 session.createTextMessage("Second Message"), session.createTextMessage("Third Message") };
129 
130         // lets consume any outstanding messages from previous test runs
131         while (consumer.receive(1000) != null) {
132         }
133         session.commit();
134 
135         producer.send(outbound[0]);
136         producer.send(outbound[1]);
137         producer.send(outbound[2]);
138         session.commit();
139 
140         // Get the first.
141         assertEquals(outbound[0], consumer.receive(1000));
142         consumer.close();
143 
144         QueueBrowser browser = session.createBrowser((Queue) destination);
145         Enumeration enumeration = browser.getEnumeration();
146 
147         // browse the second
148         assertTrue("should have received the second message", enumeration.hasMoreElements());
149         assertEquals(outbound[1], (Message) enumeration.nextElement());
150 
151         // browse the third.
152         assertTrue("Should have received the third message", enumeration.hasMoreElements());
153         assertEquals(outbound[2], (Message) enumeration.nextElement());
154 
155         // There should be no more.
156         boolean tooMany = false;
157         while (enumeration.hasMoreElements()) {
158             System.out.println("Got extra message: " + ((TextMessage) enumeration.nextElement()).getText());
159             tooMany = true;
160         }
161         assertFalse(tooMany);
162         browser.close();
163 
164         // Re-open the consumer.
165         consumer = resourceProvider.createConsumer(session, destination);
166         // Receive the second.
167         assertEquals(outbound[1], consumer.receive(1000));
168         // Receive the third.
169         assertEquals(outbound[2], consumer.receive(1000));
170         consumer.close();
171 
172         session.commit();
173     }
174     
175 }