Source code: org/activemq/JmsQueueTransactionTest.java
1 /**
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18 package org.activemq;
19
20 import java.util.ArrayList;
21 import java.util.Enumeration;
22
23 import javax.jms.Message;
24 import javax.jms.MessageConsumer;
25 import javax.jms.MessageProducer;
26 import javax.jms.Queue;
27 import javax.jms.QueueBrowser;
28 import javax.jms.Session;
29 import javax.jms.TextMessage;
30
31 import org.activemq.test.JmsResourceProvider;
32
33
34 /**
35 * @version $Revision: 1.1.1.1 $
36 */
37 public class JmsQueueTransactionTest extends JmsTransactionTestSupport {
38
39 /**
40 * @see org.activemq.JmsTransactionTestSupport#getJmsResourceProvider()
41 */
42 protected JmsResourceProvider getJmsResourceProvider() {
43 JmsResourceProvider p = new JmsResourceProvider();
44 p.setTopic(false);
45 return p;
46 }
47
48 public void testReceiveTwoThenCloseConnection() throws Exception {
49
50 Message[] outbound = new Message[]{
51 session.createTextMessage("First Message"),
52 session.createTextMessage("Second Message")
53 };
54
55 // lets consume any outstanding messages from previous test runs
56 while (consumer.receive(1000) != null) {
57 }
58 session.commit();
59
60 producer.send(outbound[0]);
61 producer.send(outbound[1]);
62 session.commit();
63
64 System.out.println("Sent 0: " + outbound[0]);
65 System.out.println("Sent 1: " + outbound[1]);
66
67 ArrayList messages = new ArrayList();
68 Message message = consumer.receive(1000);
69 assertEquals(outbound[0], message);
70
71 message = consumer.receive(1000);
72 assertNotNull(message);
73 assertEquals(outbound[1], message);
74
75 // Close and reopen connection.
76 reconnect();
77
78 // Consume again.. the previous message should
79 // get redelivered.
80 message = consumer.receive(5000);
81 assertNotNull("Should have re-received the first message again!", message);
82 messages.add(message);
83 assertEquals(outbound[0], message);
84
85 message = consumer.receive(5000);
86 assertNotNull("Should have re-received the second message again!", message);
87 messages.add(message);
88 assertEquals(outbound[1], message);
89 session.commit();
90
91 Message inbound[] = new Message[messages.size()];
92 messages.toArray(inbound);
93
94 assertTextMessagesEqual("Rollback did not work", outbound, inbound);
95 }
96
97 public void testSendReceiveInSeperateSessionTest() throws Exception {
98
99 session.close();
100 int batchCount = 10;
101
102 for( int i=0; i < batchCount; i++) {
103 {
104 Session session = resourceProvider.createSession(connection);
105 MessageProducer producer = resourceProvider.createProducer(session, destination);
106 //consumer = resourceProvider.createConsumer(session, destination);
107 producer.send(session.createTextMessage("Test Message: "+i));
108 session.commit();
109 session.close();
110 }
111 {
112 Session session = resourceProvider.createSession(connection);
113 MessageConsumer consumer = resourceProvider.createConsumer(session, destination);
114
115 TextMessage message = (TextMessage) consumer.receive(1000*5);
116 assertNotNull("Received only "+i+" messages in batch ", message);
117 assertEquals("Test Message: "+i, message.getText());
118
119 session.commit();
120 session.close();
121 }
122 }
123 }
124
125 public void testReceiveBrowseReceive() throws Exception {
126
127 Message[] outbound = new Message[] { session.createTextMessage("First Message"),
128 session.createTextMessage("Second Message"), session.createTextMessage("Third Message") };
129
130 // lets consume any outstanding messages from previous test runs
131 while (consumer.receive(1000) != null) {
132 }
133 session.commit();
134
135 producer.send(outbound[0]);
136 producer.send(outbound[1]);
137 producer.send(outbound[2]);
138 session.commit();
139
140 // Get the first.
141 assertEquals(outbound[0], consumer.receive(1000));
142 consumer.close();
143
144 QueueBrowser browser = session.createBrowser((Queue) destination);
145 Enumeration enumeration = browser.getEnumeration();
146
147 // browse the second
148 assertTrue("should have received the second message", enumeration.hasMoreElements());
149 assertEquals(outbound[1], (Message) enumeration.nextElement());
150
151 // browse the third.
152 assertTrue("Should have received the third message", enumeration.hasMoreElements());
153 assertEquals(outbound[2], (Message) enumeration.nextElement());
154
155 // There should be no more.
156 boolean tooMany = false;
157 while (enumeration.hasMoreElements()) {
158 System.out.println("Got extra message: " + ((TextMessage) enumeration.nextElement()).getText());
159 tooMany = true;
160 }
161 assertFalse(tooMany);
162 browser.close();
163
164 // Re-open the consumer.
165 consumer = resourceProvider.createConsumer(session, destination);
166 // Receive the second.
167 assertEquals(outbound[1], consumer.receive(1000));
168 // Receive the third.
169 assertEquals(outbound[2], consumer.receive(1000));
170 consumer.close();
171
172 session.commit();
173 }
174
175 }