Source code: org/activemq/store/journal/JournalBrokerBenchmark.java
1 /**
2 *
3 * Copyright 2004 Hiram Chirino
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18 package org.activemq.store.journal;
19
20 import javax.jms.BytesMessage;
21 import javax.jms.Connection;
22 import javax.jms.DeliveryMode;
23 import javax.jms.JMSException;
24 import javax.jms.Message;
25 import javax.jms.MessageConsumer;
26 import javax.jms.MessageListener;
27 import javax.jms.MessageProducer;
28 import javax.jms.Session;
29
30 import junit.framework.TestCase;
31
32 import org.activemq.ActiveMQConnectionFactory;
33 import org.activemq.broker.impl.BrokerContainerImpl;
34 import org.activemq.message.ActiveMQQueue;
35
36 import EDU.oswego.cs.dl.util.concurrent.Callable;
37 import EDU.oswego.cs.dl.util.concurrent.Latch;
38 import EDU.oswego.cs.dl.util.concurrent.Semaphore;
39
40 /**
41 * Benchmarks the Journal Store by using an embeded broker to add and remove messages from
42 * the store.
43 *
44 * Make sure you run with jvm option -server (makes a big difference).
45 * The tests simulate storing 1000 1k jms messages to see the rate of
46 * processing msg/sec.
47 *
48 * @version $Revision: 1.1 $
49 */
50 public class JournalBrokerBenchmark extends TestCase {
51
52 private static final int MESSAGE_COUNT = Integer.parseInt(System.getProperty("MESSAGE_COUNT","10000"));
53 private BrokerContainerImpl broker;
54 private ActiveMQQueue dest;
55 private ActiveMQConnectionFactory connectionFactory;
56
57 public static void main(String[] args) {
58 junit.textui.TestRunner.run(JournalBrokerBenchmark.class);
59 }
60
61 protected void setUp() throws Exception {
62
63 broker = new BrokerContainerImpl("localhost");
64 broker.addConnector("tcp://localhost:61616");
65 broker.start();
66 connectionFactory = new ActiveMQConnectionFactory(broker, "tcp://localhost:61616");
67 // connectionFactory.setTurboBoost(true);
68 // connectionFactory.setCachingEnabled(false);
69 // connectionFactory.setCopyMessageOnSend(true);
70 dest = new ActiveMQQueue("TEST");
71 }
72
73 protected void tearDown() throws Exception {
74 broker.stop();
75 }
76
77 static class ProgressPrinter {
78 private final int total;
79 private final int interval;
80 int percentDone=0;
81 int counter=0;
82
83 public ProgressPrinter(int total, int interval) {
84 this.total=total;
85 this.interval = interval;
86 }
87
88 synchronized public void increment() {
89 update(++counter);
90 }
91
92 synchronized public void update(int current) {
93 int at = 100*current/total;
94 if( (percentDone/interval) != (at/interval) ) {
95 percentDone=at;
96 System.out.println("Completed: "+percentDone+"%");
97 }
98 }
99 }
100
101 /**
102 * Moves about 6000 msg/sec on a Win XP 3.2 ghz Intel Machine.
103 *
104 * @throws Throwable
105 */
106 public void testConcurrentSendReceive() throws Throwable {
107
108 final int PRODUCER_COUNT=Integer.parseInt(System.getProperty("PRODUCER_COUNT","10"));
109 final int CONSUMER_COUNT=Integer.parseInt(System.getProperty("CONSUMER_COUNT","10"));
110
111 final ProgressPrinter pp = new ProgressPrinter(MESSAGE_COUNT*2, 5);
112
113 final Semaphore connectionsEstablished = new Semaphore(1-(CONSUMER_COUNT+PRODUCER_COUNT));
114 final Latch startTest = new Latch();
115 final Semaphore testsFinished = new Semaphore(1-(CONSUMER_COUNT+PRODUCER_COUNT));
116
117 final Callable producer = new Callable() {
118 public Object call() throws JMSException, InterruptedException {
119 Connection connection = connectionFactory.createConnection();
120 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
121 MessageProducer producer = session.createProducer(dest);
122 producer.setDeliveryMode(DeliveryMode.PERSISTENT);
123 BytesMessage message = session.createBytesMessage();
124 message.writeBytes(new byte[1024]);
125 connection.start();
126
127 connectionsEstablished.release();
128
129 startTest.acquire();
130 final int msgs = (MESSAGE_COUNT/PRODUCER_COUNT)+1;
131 for (int i = 0; i < msgs; i++) {
132 pp.increment();
133 producer.send(message);
134 }
135
136 testsFinished.release();
137 connection.close();
138 return null;
139 }
140 };
141
142 final Callable consumer = new Callable() {
143 public Object call() throws JMSException, InterruptedException {
144 final Latch doneLatch = new Latch();
145 Connection connection = connectionFactory.createConnection();
146 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
147 MessageConsumer consumer = session.createConsumer(dest);
148
149 connectionsEstablished.release();
150 startTest.acquire();
151
152 final int msgs = (MESSAGE_COUNT/CONSUMER_COUNT)-1;
153 consumer.setMessageListener(new MessageListener(){
154 int counter=0;
155 public void onMessage(Message msg) {
156 pp.increment();
157 counter++;
158 if( counter >= msgs ) {
159 doneLatch.release();
160 }
161 }
162 });
163 connection.start();
164 doneLatch.acquire();
165
166 testsFinished.release();
167 connection.close();
168 return null;
169 }
170 };
171
172 final Throwable workerError[] = new Throwable[1];
173 for( int i=0; i < PRODUCER_COUNT; i++ ) {
174 new Thread("Producer:"+i) {
175 public void run() {
176 try {
177 producer.call();
178 } catch (Throwable e) {
179 e.printStackTrace();
180 workerError[0] = e;
181 }
182 }
183 }.start();
184 }
185
186 for( int i=0; i < CONSUMER_COUNT; i++ ) {
187 new Thread("Consumer:"+i) {
188 public void run() {
189 try {
190 consumer.call();
191 } catch (Throwable e) {
192 workerError[0] = e;
193 }
194 }
195 }.start();
196 }
197
198 connectionsEstablished.acquire();
199
200 // System.out.println("ready.");
201 // System.in.read();System.in.read();
202
203 startTest.release();
204 long start = System.currentTimeMillis();
205 testsFinished.acquire();
206 long end = System.currentTimeMillis();
207 System.out.println(getName() + ": test duration: " + (end - start) + " ms, published+acked msg/s: "
208 + (MESSAGE_COUNT * 1000f / (end - start)));
209
210 // System.out.println("ready.");
211 // System.in.read();System.in.read();
212
213 if( workerError[0] != null )
214 throw workerError[0];
215
216 }
217
218 }