Docjar: A Java Source and Docuemnt Enginecom.*    java.*    javax.*    org.*    all    new    plug-in

Quick Search    Search Deep

Source code: org/activemq/io/util/MemoryBoundedQueueTest.java


1   /** 
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  
19  package org.activemq.io.util;
20  import java.util.ArrayList;
21  import java.util.List;
22  
23  import junit.framework.TestCase;
24  
25  import org.activemq.message.ActiveMQMessage;
26  
27  import EDU.oswego.cs.dl.util.concurrent.Semaphore;
28  import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
29  
30  /**
31   * MemoryBoundedQueueTest
32   * 
33   * @version $Revision: 1.1.1.1 $
34   */
35  public class MemoryBoundedQueueTest extends TestCase {
36      protected static final int TEST_INSTANCE_SIZE = 2048;
37      private static final int TEST_ENQUEUE_SIZE = TEST_INSTANCE_SIZE / 2;
38      protected static final String QUEUE_NAME = "TestQueue";
39      private final int TOTAL_LOAD = 100000;
40      private final int NUMBER_CONSUMERS = 20;
41      private Semaphore stoppedSemaphore = new Semaphore(1-NUMBER_CONSUMERS);
42      
43      private MemoryBoundedObjectManager memoryManager;
44      private MemoryBoundedQueueManager queueManager;
45      
46      protected boolean supportJMSPriority=false;
47  
48      protected MemoryBoundedObjectManager getMemoryManager() {
49        if (memoryManager == null) {
50          memoryManager = new MemoryBoundedObjectManager("testmanager", 1024 * 1024, supportJMSPriority);
51        }
52        return memoryManager;
53      }
54      
55      protected MemoryBoundedQueueManager getQueueManager() {
56        if (queueManager == null) {
57          queueManager = new MemoryBoundedQueueManager(getMemoryManager());
58        }
59        return queueManager;      
60      }
61      
62      private class Dequeue implements Runnable {
63          private MemoryBoundedQueue queue;
64          private int localCount;
65  
66          Dequeue(MemoryBoundedQueue q, int num, int localCount) {
67              this.queue = q;
68              this.localCount = localCount;
69          }
70  
71          public void run() {
72              try {
73              int internalCount = 0;
74                  while (internalCount < localCount) {
75                      Thread.yield();
76                      MemoryManageable obj = queue.dequeue();
77                      if (obj == null) {
78                          break;
79                      }
80                      internalCount++;
81                  }
82              } catch (InterruptedException ie) {
83                  ie.printStackTrace();
84              } finally {
85                  stoppedSemaphore.release();
86              }
87          }
88      }
89  
90      public void testLoad() throws Exception {
91  
92          final MemoryBoundedQueue queue = getQueueManager().getMemoryBoundedQueue(QUEUE_NAME);
93          getMemoryManager().setValueLimit(TEST_INSTANCE_SIZE * NUMBER_CONSUMERS);
94          final List list = new ArrayList(NUMBER_CONSUMERS);
95          int numberOfMessages = TOTAL_LOAD / NUMBER_CONSUMERS;
96          for (int i = 0;i < NUMBER_CONSUMERS;i++) {
97              Dequeue dq = new Dequeue(queue, i, numberOfMessages);
98              list.add(dq);
99              Thread t = new Thread(dq);
100             t.setPriority(Thread.NORM_PRIORITY - 1);
101             t.start();
102         }
103         for (int i = 0;i < TOTAL_LOAD;i++) {
104             ActiveMQMessage msg = new ActiveMQMessage();
105             msg.setMemoryUsage(TEST_INSTANCE_SIZE);
106             queue.enqueue(msg);
107         }
108         try {
109             // Assert that all the consumers stopped.
110             assertTrue(stoppedSemaphore.attempt(1000*30));
111         }
112         catch (InterruptedException ie) {
113             ie.printStackTrace();
114         }
115         
116         //Thread.sleep(250);
117         assertEquals(0, getMemoryManager().getTotalMemoryUsedSize());
118         queue.close();
119     }
120 
121     public void testClear() {
122         final MemoryBoundedQueue queue = getQueueManager().getMemoryBoundedQueue(QUEUE_NAME);
123         getMemoryManager().setValueLimit(TEST_INSTANCE_SIZE);
124         ActiveMQMessage msg = new ActiveMQMessage();
125         queue.enqueue(msg);
126         queue.clear();
127         assertTrue(queue.size() == 0);
128         queue.close();
129     }
130 
131     public void testDequeue() throws Exception {
132         final MemoryBoundedQueue queue = getQueueManager().getMemoryBoundedQueue(QUEUE_NAME);
133         getMemoryManager().setValueLimit(TEST_INSTANCE_SIZE * 100);
134         ActiveMQMessage msg = new ActiveMQMessage();
135         queue.enqueue(msg);
136         Object result = queue.dequeue();
137         assertTrue(result == msg);
138         queue.close();
139     }
140 
141     public void testClose() {
142         /** @todo: Insert test code here. Use assertEquals(), for example. */
143         final MemoryBoundedQueue queue = getQueueManager().getMemoryBoundedQueue(QUEUE_NAME);
144         getMemoryManager().setValueLimit(TEST_ENQUEUE_SIZE);
145         final SynchronizedBoolean success = new SynchronizedBoolean(false);
146         final MemoryBoundedQueue q1 = getQueueManager().getMemoryBoundedQueue(QUEUE_NAME);
147         assertTrue(queue == q1);
148         Thread t = new Thread(new Runnable() {
149             public void run() {
150                 try {
151                     Thread.sleep(250);
152                     queue.dequeue();
153                 }
154                 catch (Exception e) {
155                     e.printStackTrace();
156                 }
157                 synchronized (success) {
158                     success.set(true);
159                     success.notify();
160                 }
161             }
162         });
163         t.start();
164         queue.close();
165         try {
166             synchronized (success) {
167                 if (!success.get()) {
168                     success.wait(2000);
169                 }
170             }
171         }
172         catch (Throwable e) {
173             e.printStackTrace();
174         }
175         assertTrue(success.get());
176        
177         MemoryBoundedQueue q2 = getQueueManager().getMemoryBoundedQueue(QUEUE_NAME);
178         assertTrue(queue != q2);
179     }
180 
181     public void testDequeueNoWait() throws Exception {
182         final MemoryBoundedQueue queue = getQueueManager().getMemoryBoundedQueue(QUEUE_NAME);
183         Object obj = queue.dequeueNoWait();
184         assertTrue(obj == null);
185         queue.close();
186     }
187 
188     public void testEnqueueFirst() throws Exception {
189         final MemoryBoundedQueue queue = getQueueManager().getMemoryBoundedQueue(QUEUE_NAME);
190         assertTrue(getMemoryManager().getTotalMemoryUsedSize() == 0);
191         Object mutex = new Object();
192         getMemoryManager().setValueLimit(TEST_INSTANCE_SIZE * 100);
193         for (int i = 0;i < 10;i++) {
194             queue.enqueue(new ActiveMQMessage());
195         }
196         ActiveMQMessage test = new ActiveMQMessage();
197         test.setJMSMessageID("FIRST");
198         queue.enqueueFirst(test);
199         Object obj = queue.dequeue();
200         assertTrue(obj == test);
201         queue.close();
202     }
203 
204     public void testEnqueueNoBlock() {
205         MemoryBoundedQueue queue = getQueueManager().getMemoryBoundedQueue(QUEUE_NAME);
206         getMemoryManager().setValueLimit(TEST_ENQUEUE_SIZE);
207         ActiveMQMessage msg = new ActiveMQMessage();
208         queue.enqueueNoBlock(msg);
209         assertTrue(true);
210         queue.close();
211     }
212 
213     public void testIsEmpty() {
214         int size = 10;
215         MemoryBoundedQueue queue = getQueueManager().getMemoryBoundedQueue(QUEUE_NAME);
216         for (int i = 0;i < size;i++) {
217             queue.enqueue(new ActiveMQMessage());
218         }
219         queue.clear();
220         assertTrue(queue.isEmpty());
221         queue.close();
222     }
223 
224     public void testRemove() {
225         MemoryBoundedQueue queue = getQueueManager().getMemoryBoundedQueue(QUEUE_NAME);
226         ActiveMQMessage msg = new ActiveMQMessage();
227         queue.enqueue(msg);
228         assertTrue(queue.remove(msg));
229         queue.close();
230     }
231 
232     public void testSize() {
233         int size = 10;
234         MemoryBoundedQueue queue = getQueueManager().getMemoryBoundedQueue(QUEUE_NAME);
235         for (int i = 0;i < size;i++) {
236             queue.enqueue(new ActiveMQMessage());
237         }
238         assertTrue(queue.size() == size);
239         queue.close();
240     }
241     
242     public void testRemovePacket(){
243         int size = 100;
244         MemoryBoundedQueue queue = getQueueManager().getMemoryBoundedQueue(QUEUE_NAME);
245         List list = new ArrayList(size);
246         for (int i = 0;i < size;i++) {
247             ActiveMQMessage msg = new ActiveMQMessage();
248             msg.setJMSMessageID(""+i);
249             list.add(msg);
250             queue.enqueue(msg);
251         }
252         for (int i =0; i < size; i++){
253             queue.remove((ActiveMQMessage)list.get(i));
254         }
255         assertTrue(queue.size() == 0);
256         queue.close();
257     }
258     
259     public void testRemovePacketById(){
260         int size = 100;
261         MemoryBoundedQueue queue = getQueueManager().getMemoryBoundedQueue(QUEUE_NAME);
262         List list = new ArrayList(size);
263         for (int i = 0;i < size;i++) {
264             ActiveMQMessage msg = new ActiveMQMessage();
265             msg.setJMSMessageID(""+i);
266             list.add(msg);
267             queue.enqueue(msg);
268         }
269         for (int i =0; i < size; i++){
270             ActiveMQMessage p = (ActiveMQMessage)list.get(i);
271             MemoryManageable removed = queue.remove(p.getJMSMessageID());
272             assertTrue(removed != null);
273             assertTrue(removed == p);
274         }
275         assertTrue(queue.size() == 0);
276         queue.close();
277     }
278 }