Docjar: A Java Source and Docuemnt Enginecom.*    java.*    javax.*    org.*    all    new    plug-in

Quick Search    Search Deep

Source code: org/activemq/store/journal/JournalBrokerBenchmark.java


1   /** 
2    * 
3    * Copyright 2004 Hiram Chirino
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  package org.activemq.store.journal;
19  
20  import javax.jms.BytesMessage;
21  import javax.jms.Connection;
22  import javax.jms.DeliveryMode;
23  import javax.jms.JMSException;
24  import javax.jms.Message;
25  import javax.jms.MessageConsumer;
26  import javax.jms.MessageListener;
27  import javax.jms.MessageProducer;
28  import javax.jms.Session;
29  
30  import junit.framework.TestCase;
31  
32  import org.activemq.ActiveMQConnectionFactory;
33  import org.activemq.broker.impl.BrokerContainerImpl;
34  import org.activemq.message.ActiveMQQueue;
35  
36  import EDU.oswego.cs.dl.util.concurrent.Callable;
37  import EDU.oswego.cs.dl.util.concurrent.Latch;
38  import EDU.oswego.cs.dl.util.concurrent.Semaphore;
39  
40  /**
41   * Benchmarks the Journal Store by using an embeded broker to add and remove messages from 
42   * the store.
43   * 
44   * Make sure you run with jvm option -server (makes a big difference).
45   * The tests simulate storing 1000 1k jms messages to see the rate of 
46   * processing msg/sec.
47   * 
48   * @version $Revision: 1.1 $
49   */
50  public class JournalBrokerBenchmark extends TestCase {
51  
52      private static final int MESSAGE_COUNT = Integer.parseInt(System.getProperty("MESSAGE_COUNT","10000"));
53      private BrokerContainerImpl broker;
54      private ActiveMQQueue dest;
55      private ActiveMQConnectionFactory connectionFactory;
56  
57      public static void main(String[] args) {
58          junit.textui.TestRunner.run(JournalBrokerBenchmark.class);
59      }
60      
61      protected void setUp() throws Exception {
62          
63          broker = new BrokerContainerImpl("localhost");
64          broker.addConnector("tcp://localhost:61616");
65          broker.start();
66          connectionFactory = new ActiveMQConnectionFactory(broker, "tcp://localhost:61616");
67  //        connectionFactory.setTurboBoost(true);
68  //        connectionFactory.setCachingEnabled(false);
69  //        connectionFactory.setCopyMessageOnSend(true);
70          dest = new ActiveMQQueue("TEST");
71      }
72  
73      protected void tearDown() throws Exception {
74          broker.stop();
75      }
76  
77      static class ProgressPrinter {
78          private final int total;
79          private final int interval;
80          int percentDone=0;
81          int counter=0;
82          
83          public ProgressPrinter(int total, int interval) {
84              this.total=total;
85              this.interval = interval;
86          }
87          
88          synchronized public void increment() {
89            update(++counter);
90          }
91          
92          synchronized public void update(int current) {
93              int at = 100*current/total;
94              if( (percentDone/interval) != (at/interval) ) {
95                  percentDone=at;
96                  System.out.println("Completed: "+percentDone+"%");
97              }            
98          }
99      }
100     
101     /**
102      * Moves about 6000 msg/sec on a Win XP 3.2 ghz Intel Machine.
103      * 
104      * @throws Throwable  
105      */
106     public void testConcurrentSendReceive() throws Throwable {
107         
108         final int PRODUCER_COUNT=Integer.parseInt(System.getProperty("PRODUCER_COUNT","10"));
109         final int CONSUMER_COUNT=Integer.parseInt(System.getProperty("CONSUMER_COUNT","10"));
110         
111         final ProgressPrinter pp = new ProgressPrinter(MESSAGE_COUNT*2, 5);
112 
113         final Semaphore connectionsEstablished = new Semaphore(1-(CONSUMER_COUNT+PRODUCER_COUNT)); 
114         final Latch startTest = new Latch(); 
115         final Semaphore testsFinished = new Semaphore(1-(CONSUMER_COUNT+PRODUCER_COUNT)); 
116         
117         final Callable producer = new Callable() {
118             public Object call() throws JMSException, InterruptedException {
119                 Connection connection = connectionFactory.createConnection();
120                 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
121                 MessageProducer producer = session.createProducer(dest);
122                 producer.setDeliveryMode(DeliveryMode.PERSISTENT);
123                 BytesMessage message = session.createBytesMessage();
124                 message.writeBytes(new byte[1024]);
125                 connection.start();
126                 
127                 connectionsEstablished.release();
128                 
129                 startTest.acquire();                
130                 final int msgs = (MESSAGE_COUNT/PRODUCER_COUNT)+1;
131                 for (int i = 0; i < msgs; i++) {
132                     pp.increment();
133                     producer.send(message);
134                 }                
135                 
136                 testsFinished.release();                
137                 connection.close();
138                 return null;
139             }
140         };
141         
142         final Callable consumer = new Callable() {
143             public Object call() throws JMSException, InterruptedException {
144                 final Latch doneLatch = new Latch();                
145                 Connection connection = connectionFactory.createConnection();                
146                 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
147                 MessageConsumer consumer = session.createConsumer(dest);
148 
149                 connectionsEstablished.release();
150                 startTest.acquire();
151 
152                 final int msgs = (MESSAGE_COUNT/CONSUMER_COUNT)-1;
153                 consumer.setMessageListener(new MessageListener(){
154                     int counter=0;
155                     public void onMessage(Message msg) {                
156                         pp.increment();
157                         counter++;
158                         if( counter >= msgs ) {
159                             doneLatch.release();
160                         }
161                     }
162                 });
163                 connection.start();
164                 doneLatch.acquire();
165 
166                 testsFinished.release();                
167                 connection.close();
168                 return null;
169             }
170         };
171         
172         final Throwable workerError[] = new Throwable[1];
173         for( int i=0; i < PRODUCER_COUNT; i++ ) {
174             new Thread("Producer:"+i) {
175                 public void run() {
176                     try {
177                         producer.call();
178                     } catch (Throwable e) {
179                         e.printStackTrace();
180                         workerError[0] = e;
181                     }
182                 }
183             }.start();
184         }
185 
186         for( int i=0; i < CONSUMER_COUNT; i++ ) {
187             new Thread("Consumer:"+i) {
188                 public void run() {
189                     try {
190                         consumer.call();
191                     } catch (Throwable e) {
192                         workerError[0] = e;
193                     }
194                 }
195             }.start();
196         }
197 
198         connectionsEstablished.acquire();
199 
200 //        System.out.println("ready.");
201 //        System.in.read();System.in.read();
202         
203         startTest.release();
204         long start = System.currentTimeMillis();
205         testsFinished.acquire();
206         long end = System.currentTimeMillis();
207         System.out.println(getName() + ": test duration: " + (end - start) + " ms, published+acked msg/s: "
208                 + (MESSAGE_COUNT * 1000f / (end - start)));
209         
210 //        System.out.println("ready.");
211 //        System.in.read();System.in.read();
212 
213         if( workerError[0] != null )
214             throw workerError[0];
215 
216     }
217 
218 }