Source code: org/activemq/io/util/MemoryBoundedQueueTest.java
1 /**
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18
19 package org.activemq.io.util;
20 import java.util.ArrayList;
21 import java.util.List;
22
23 import junit.framework.TestCase;
24
25 import org.activemq.message.ActiveMQMessage;
26
27 import EDU.oswego.cs.dl.util.concurrent.Semaphore;
28 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
29
30 /**
31 * MemoryBoundedQueueTest
32 *
33 * @version $Revision: 1.1.1.1 $
34 */
35 public class MemoryBoundedQueueTest extends TestCase {
36 protected static final int TEST_INSTANCE_SIZE = 2048;
37 private static final int TEST_ENQUEUE_SIZE = TEST_INSTANCE_SIZE / 2;
38 protected static final String QUEUE_NAME = "TestQueue";
39 private final int TOTAL_LOAD = 100000;
40 private final int NUMBER_CONSUMERS = 20;
41 private Semaphore stoppedSemaphore = new Semaphore(1-NUMBER_CONSUMERS);
42
43 private MemoryBoundedObjectManager memoryManager;
44 private MemoryBoundedQueueManager queueManager;
45
46 protected boolean supportJMSPriority=false;
47
48 protected MemoryBoundedObjectManager getMemoryManager() {
49 if (memoryManager == null) {
50 memoryManager = new MemoryBoundedObjectManager("testmanager", 1024 * 1024, supportJMSPriority);
51 }
52 return memoryManager;
53 }
54
55 protected MemoryBoundedQueueManager getQueueManager() {
56 if (queueManager == null) {
57 queueManager = new MemoryBoundedQueueManager(getMemoryManager());
58 }
59 return queueManager;
60 }
61
62 private class Dequeue implements Runnable {
63 private MemoryBoundedQueue queue;
64 private int localCount;
65
66 Dequeue(MemoryBoundedQueue q, int num, int localCount) {
67 this.queue = q;
68 this.localCount = localCount;
69 }
70
71 public void run() {
72 try {
73 int internalCount = 0;
74 while (internalCount < localCount) {
75 Thread.yield();
76 MemoryManageable obj = queue.dequeue();
77 if (obj == null) {
78 break;
79 }
80 internalCount++;
81 }
82 } catch (InterruptedException ie) {
83 ie.printStackTrace();
84 } finally {
85 stoppedSemaphore.release();
86 }
87 }
88 }
89
90 public void testLoad() throws Exception {
91
92 final MemoryBoundedQueue queue = getQueueManager().getMemoryBoundedQueue(QUEUE_NAME);
93 getMemoryManager().setValueLimit(TEST_INSTANCE_SIZE * NUMBER_CONSUMERS);
94 final List list = new ArrayList(NUMBER_CONSUMERS);
95 int numberOfMessages = TOTAL_LOAD / NUMBER_CONSUMERS;
96 for (int i = 0;i < NUMBER_CONSUMERS;i++) {
97 Dequeue dq = new Dequeue(queue, i, numberOfMessages);
98 list.add(dq);
99 Thread t = new Thread(dq);
100 t.setPriority(Thread.NORM_PRIORITY - 1);
101 t.start();
102 }
103 for (int i = 0;i < TOTAL_LOAD;i++) {
104 ActiveMQMessage msg = new ActiveMQMessage();
105 msg.setMemoryUsage(TEST_INSTANCE_SIZE);
106 queue.enqueue(msg);
107 }
108 try {
109 // Assert that all the consumers stopped.
110 assertTrue(stoppedSemaphore.attempt(1000*30));
111 }
112 catch (InterruptedException ie) {
113 ie.printStackTrace();
114 }
115
116 //Thread.sleep(250);
117 assertEquals(0, getMemoryManager().getTotalMemoryUsedSize());
118 queue.close();
119 }
120
121 public void testClear() {
122 final MemoryBoundedQueue queue = getQueueManager().getMemoryBoundedQueue(QUEUE_NAME);
123 getMemoryManager().setValueLimit(TEST_INSTANCE_SIZE);
124 ActiveMQMessage msg = new ActiveMQMessage();
125 queue.enqueue(msg);
126 queue.clear();
127 assertTrue(queue.size() == 0);
128 queue.close();
129 }
130
131 public void testDequeue() throws Exception {
132 final MemoryBoundedQueue queue = getQueueManager().getMemoryBoundedQueue(QUEUE_NAME);
133 getMemoryManager().setValueLimit(TEST_INSTANCE_SIZE * 100);
134 ActiveMQMessage msg = new ActiveMQMessage();
135 queue.enqueue(msg);
136 Object result = queue.dequeue();
137 assertTrue(result == msg);
138 queue.close();
139 }
140
141 public void testClose() {
142 /** @todo: Insert test code here. Use assertEquals(), for example. */
143 final MemoryBoundedQueue queue = getQueueManager().getMemoryBoundedQueue(QUEUE_NAME);
144 getMemoryManager().setValueLimit(TEST_ENQUEUE_SIZE);
145 final SynchronizedBoolean success = new SynchronizedBoolean(false);
146 final MemoryBoundedQueue q1 = getQueueManager().getMemoryBoundedQueue(QUEUE_NAME);
147 assertTrue(queue == q1);
148 Thread t = new Thread(new Runnable() {
149 public void run() {
150 try {
151 Thread.sleep(250);
152 queue.dequeue();
153 }
154 catch (Exception e) {
155 e.printStackTrace();
156 }
157 synchronized (success) {
158 success.set(true);
159 success.notify();
160 }
161 }
162 });
163 t.start();
164 queue.close();
165 try {
166 synchronized (success) {
167 if (!success.get()) {
168 success.wait(2000);
169 }
170 }
171 }
172 catch (Throwable e) {
173 e.printStackTrace();
174 }
175 assertTrue(success.get());
176
177 MemoryBoundedQueue q2 = getQueueManager().getMemoryBoundedQueue(QUEUE_NAME);
178 assertTrue(queue != q2);
179 }
180
181 public void testDequeueNoWait() throws Exception {
182 final MemoryBoundedQueue queue = getQueueManager().getMemoryBoundedQueue(QUEUE_NAME);
183 Object obj = queue.dequeueNoWait();
184 assertTrue(obj == null);
185 queue.close();
186 }
187
188 public void testEnqueueFirst() throws Exception {
189 final MemoryBoundedQueue queue = getQueueManager().getMemoryBoundedQueue(QUEUE_NAME);
190 assertTrue(getMemoryManager().getTotalMemoryUsedSize() == 0);
191 Object mutex = new Object();
192 getMemoryManager().setValueLimit(TEST_INSTANCE_SIZE * 100);
193 for (int i = 0;i < 10;i++) {
194 queue.enqueue(new ActiveMQMessage());
195 }
196 ActiveMQMessage test = new ActiveMQMessage();
197 test.setJMSMessageID("FIRST");
198 queue.enqueueFirst(test);
199 Object obj = queue.dequeue();
200 assertTrue(obj == test);
201 queue.close();
202 }
203
204 public void testEnqueueNoBlock() {
205 MemoryBoundedQueue queue = getQueueManager().getMemoryBoundedQueue(QUEUE_NAME);
206 getMemoryManager().setValueLimit(TEST_ENQUEUE_SIZE);
207 ActiveMQMessage msg = new ActiveMQMessage();
208 queue.enqueueNoBlock(msg);
209 assertTrue(true);
210 queue.close();
211 }
212
213 public void testIsEmpty() {
214 int size = 10;
215 MemoryBoundedQueue queue = getQueueManager().getMemoryBoundedQueue(QUEUE_NAME);
216 for (int i = 0;i < size;i++) {
217 queue.enqueue(new ActiveMQMessage());
218 }
219 queue.clear();
220 assertTrue(queue.isEmpty());
221 queue.close();
222 }
223
224 public void testRemove() {
225 MemoryBoundedQueue queue = getQueueManager().getMemoryBoundedQueue(QUEUE_NAME);
226 ActiveMQMessage msg = new ActiveMQMessage();
227 queue.enqueue(msg);
228 assertTrue(queue.remove(msg));
229 queue.close();
230 }
231
232 public void testSize() {
233 int size = 10;
234 MemoryBoundedQueue queue = getQueueManager().getMemoryBoundedQueue(QUEUE_NAME);
235 for (int i = 0;i < size;i++) {
236 queue.enqueue(new ActiveMQMessage());
237 }
238 assertTrue(queue.size() == size);
239 queue.close();
240 }
241
242 public void testRemovePacket(){
243 int size = 100;
244 MemoryBoundedQueue queue = getQueueManager().getMemoryBoundedQueue(QUEUE_NAME);
245 List list = new ArrayList(size);
246 for (int i = 0;i < size;i++) {
247 ActiveMQMessage msg = new ActiveMQMessage();
248 msg.setJMSMessageID(""+i);
249 list.add(msg);
250 queue.enqueue(msg);
251 }
252 for (int i =0; i < size; i++){
253 queue.remove((ActiveMQMessage)list.get(i));
254 }
255 assertTrue(queue.size() == 0);
256 queue.close();
257 }
258
259 public void testRemovePacketById(){
260 int size = 100;
261 MemoryBoundedQueue queue = getQueueManager().getMemoryBoundedQueue(QUEUE_NAME);
262 List list = new ArrayList(size);
263 for (int i = 0;i < size;i++) {
264 ActiveMQMessage msg = new ActiveMQMessage();
265 msg.setJMSMessageID(""+i);
266 list.add(msg);
267 queue.enqueue(msg);
268 }
269 for (int i =0; i < size; i++){
270 ActiveMQMessage p = (ActiveMQMessage)list.get(i);
271 MemoryManageable removed = queue.remove(p.getJMSMessageID());
272 assertTrue(removed != null);
273 assertTrue(removed == p);
274 }
275 assertTrue(queue.size() == 0);
276 queue.close();
277 }
278 }