Source code: org/activemq/sampler/Producer.java
1 /**
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18 package org.activemq.sampler;
19
20 import org.apache.jmeter.engine.event.LoopIterationEvent;
21 import org.apache.jmeter.samplers.Entry;
22 import org.apache.jmeter.samplers.SampleResult;
23 import org.apache.jmeter.testelement.TestListener;
24 import org.apache.jmeter.util.JMeterUtils;
25 import org.apache.jorphan.logging.LoggingManager;
26 import org.apache.log.Logger;
27 import org.activemq.util.connection.ServerConnectionFactory;
28 import org.activemq.util.IdGenerator;
29
30 import javax.jms.Connection;
31 import javax.jms.Session;
32 import javax.jms.JMSException;
33 import javax.jms.Topic;
34 import javax.jms.Queue;
35 import javax.jms.TopicSession;
36 import javax.jms.QueueSession;
37 import javax.jms.Destination;
38 import javax.jms.Message;
39 import javax.jms.TopicPublisher;
40 import javax.jms.QueueSender;
41 import javax.jms.MessageProducer;
42 import javax.jms.DeliveryMode;
43
44 import java.io.File;
45 import java.io.FileInputStream;
46 import java.io.FileNotFoundException;
47 import java.io.IOException;
48 import java.util.Properties;
49 import java.util.Timer;
50 import java.util.TimerTask;
51
52 /**
53 * A sampler which understands Tcp requests.
54 */
55 public class Producer extends Sampler implements TestListener {
56
57 public static int counter;
58
59 private static final Logger log = LoggingManager.getLoggerForClass();
60
61 // Otherwise, the response is scanned for these strings
62 private static final String STATUS_PREFIX = JMeterUtils.getPropDefault("tcp.status.prefix", "");
63 private static final String STATUS_SUFFIX = JMeterUtils.getPropDefault("tcp.status.suffix", "");
64 private static final String STATUS_PROPERTIES = JMeterUtils.getPropDefault("tcp.status.properties", "");
65
66 private static final Properties statusProps = new Properties();
67 private static final long INSECONDS = 60;
68 private static final long MSGINTERVALINSECS = 60;
69
70 private Timer timerPublish;
71 private Timer timerPublishLoop;
72
73 private int batchCounter = 0;
74
75 static {
76 log.info("Protocol Handler name=" + getClassname());
77 log.info("Status prefix=" + STATUS_PREFIX);
78 log.info("Status suffix=" + STATUS_SUFFIX);
79 log.info("Status properties=" + STATUS_PROPERTIES);
80
81 if (STATUS_PROPERTIES.length() > 0) {
82 File f = new File(STATUS_PROPERTIES);
83
84 try {
85 statusProps.load(new FileInputStream(f));
86 log.info("Successfully loaded properties");
87 //haveStatusProps = true;
88 } catch (FileNotFoundException e) {
89 log.info("Property file not found");
90 } catch (IOException e) {
91 log.info("Property file error " + e.toString());
92 }
93 }
94 }
95
96 /**
97 * Constructor for ProducerSampler object.
98 */
99 public Producer() {
100 log.debug("Created " + this);
101 protocolHandler = this.getProtocol(); //from superclass sampler.
102 log.debug("Using Protocol Handler: " + protocolHandler.getClass().getName());
103 }
104
105 /**
106 * Increments the int variable.
107 *
108 * @param count - variable incremented.
109 */
110 protected synchronized void count(int count) {
111 counter += count;
112 }
113
114 /**
115 * @return Returns the message.
116 */
117 protected String getMessage() {
118 StringBuffer buffer = new StringBuffer();
119
120 for (int i = 0; i < getMsgSize(); i++) {
121 char ch = 'X';
122 buffer.append(ch);
123 }
124
125 return buffer.toString();
126 }
127
128 /**
129 * Retrieves the message then sends it via tcp.
130 *
131 * @throws Exception
132 */
133 protected void publish() throws Exception {
134 long threadRampUp = 0;
135
136 if (getNoProducer() > 0) {
137 threadRampUp = (long) ((double) (getRampUp() * INSECONDS) / ((double) getNoProducer()) * 1000);
138 }
139
140 timerPublish = new Timer();
141 timerPublish.scheduleAtFixedRate(new newThread(), 0, threadRampUp);
142 }
143
144 /**
145 * Sends the information from the client via tcp.
146 *
147 * @param text - message that is sent.
148 * @param subject - subject of the message to be sent.
149 * @throws JMSException
150 */
151 protected void publish(String text, String subject) throws JMSException {
152 Destination destination = null;
153 Session session = null;
154 MessageProducer publisher = null;
155 Connection connection = null;
156
157 connection = ServerConnectionFactory.createConnectionFactory(this.getURL(),
158 this.getMQServer(),
159 this.getTopic(),
160 this.getEmbeddedBroker());
161
162 if (this.getDurable()) {
163 if ((ServerConnectionFactory.JORAM_SERVER.equals(this.getMQServer()))||
164 (ServerConnectionFactory.MANTARAY_SERVER.equals(this.getMQServer()))) {
165 //Id set be server
166
167 } else {
168 IdGenerator idGenerator = new IdGenerator();
169 connection.setClientID(idGenerator.generateId());
170 }
171 }
172
173 session = ServerConnectionFactory.createSession(connection,
174 this.getTransacted(),
175 this.getMQServer(),
176 this.getTopic());
177
178 destination = ServerConnectionFactory.createDestination(session,
179 subject,
180 this.getURL(),
181 this.getMQServer(),
182 this.getTopic());
183
184 if ((ServerConnectionFactory.OPENJMS_SERVER.equals(this.getMQServer()))||
185 (ServerConnectionFactory.MANTARAY_SERVER.equals(this.getMQServer()))) {
186 if (this.getTopic()){
187 connection.start();
188 TopicPublisher topicPublisher = ((TopicSession)session).createPublisher((Topic)destination);
189 publisher = topicPublisher;
190
191 } else {
192 connection.start();
193 QueueSender queuePublisher = ((QueueSession)session).createSender((Queue)destination);
194 publisher = queuePublisher;
195
196 }
197
198 } else {
199 publisher = session.createProducer(destination);
200 }
201
202 long msgIntervalInMins = this.getMsgInterval();
203 long msgIntervalInSecs = msgIntervalInMins * INSECONDS;
204
205 if (msgIntervalInSecs < 0) {
206 msgIntervalInSecs = MSGINTERVALINSECS;
207 }
208
209 if (getDurable()) {
210 publisher.setDeliveryMode(DeliveryMode.PERSISTENT);
211 } else {
212 publisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
213 }
214
215 if (this.getDefMsgInterval()) {
216 while (!stopThread) {
217 publishLoop(session, publisher, text);
218 }
219 ServerConnectionFactory.close(connection, session);
220
221 } else {
222 // set the session, publisher and connection.
223 this.setSession(session);
224 this.setPublisher(publisher);
225 this.setConnection(connection);
226
227 timerPublishLoop = new Timer();
228 timerPublishLoop.scheduleAtFixedRate(new publish(), 0, msgIntervalInSecs * 1000);
229
230 }
231 }
232
233 /**
234 * Sends a message through MessageProducer object.
235 *
236 * @param session - Session oject.
237 * @param publisher - MessageProducer object.
238 * @param text - text that is used to create Message object.
239 * @throws JMSException
240 */
241 protected void publishLoop(Session session, MessageProducer publisher, String text) throws JMSException {
242 if (ServerConnectionFactory.OPENJMS_SERVER.equals(this.getMQServer())) {
243 if (publisher instanceof TopicPublisher) {
244 Message message = ((TopicSession)session).createTextMessage(text);
245 ((TopicPublisher)publisher).publish(message);
246 } else if (publisher instanceof QueueSender){
247 Message message = ((QueueSession)session).createTextMessage(text);
248 ((QueueSender)publisher).send(message);
249 }
250 } else {
251 Message message = session.createTextMessage(text);
252 publisher.send(message);
253 }
254
255 if (this.getTransacted()) {
256 batchCounter++;
257
258 if (batchCounter == this.getBatchSize()) {
259 batchCounter = 0;
260 session.commit();
261 }
262 }
263
264 count(1);
265 }
266
267 /**
268 * @return the current number of messages sent.
269 */
270 public static synchronized int resetCount() {
271 int answer = counter;
272 counter = 0;
273 return answer;
274 }
275
276 /**
277 * Runs and publish the message.
278 *
279 * @throws Exception
280 */
281 public void run() throws Exception {
282
283 start();
284 publish();
285 }
286
287 /**
288 * Retrieves the sample as SampleResult object. There are times that this
289 * is ignored.
290 *
291 * @param e - Entry object.
292 * @return Returns the sample result.
293 */
294 public SampleResult sample(Entry e) { // Entry tends to be ignored ...
295 SampleResult res = new SampleResult();
296 res.setSampleLabel(getName());
297 res.setSamplerData(getURL());
298 res.sampleStart();
299
300 try {
301 //run the benchmark tool code
302 this.run();
303 } catch (Exception ex) {
304 log.debug("Error running producer ", ex);
305 res.setResponseCode("500");
306 res.setResponseMessage(ex.toString());
307 }
308
309 //Calculate response time
310 res.sampleEnd();
311
312 // Set if we were successful or not
313 res.setSuccessful(true);
314
315 return res;
316 }
317
318 /**
319 * Logs the end of the test. This is called only once per
320 * class.
321 */
322 public void testEnded() {
323 log.debug(this + " test ended");
324 }
325
326 /**
327 * Logs the host at the end of the test.
328 *
329 * @param host - the host to be logged.
330 */
331 public void testEnded(String host) {
332 log.debug(this + " test ended on " + host);
333 }
334
335 /**
336 * Logs the start of the test. This is called only once
337 * per class.
338 */
339 public void testStarted() {
340 log.debug(this + " test started");
341 }
342
343 /**
344 * Logs the host at the start of the test. *
345 * @param host - the host to be logged.
346 */
347 public void testStarted(String host) {
348 log.debug(this + " test started on " + host);
349 }
350
351 /**
352 * Logs the iteration event.
353 *
354 * @param event
355 */
356 public void testIterationStart(LoopIterationEvent event) {
357 log.debug(this + " test iteration start on " + event.getIteration());
358 }
359
360 /**
361 * Creates thread for publishing messages.
362 */
363 class newThread extends TimerTask {
364 final String text = getMessage();
365 int numberOfProducer = getNoProducer();
366 int counter = 0;
367
368 public void run() {
369 if (counter < numberOfProducer) {
370 final String subject = subjects[counter % getNoSubject()];
371
372 counter++;
373
374 Thread thread = new Thread() {
375 public void run() {
376 try {
377 if (stopThread) {
378 return;
379 } else {
380 publish(text, subject);
381 }
382 } catch (JMSException e) {
383 log.error("Error publishing message ", e);
384 }
385 }
386 };
387
388 thread.start();
389
390 } else {
391 timerPublish.cancel();
392 }
393 }
394 }
395
396 /**
397 * Starts the publish loop timer.
398 */
399 class publish extends TimerTask {
400 public void run() {
401 try {
402 if (!stopThread) {
403 publishLoop(getSession(), getPublisher(), getMessage());
404 } else {
405 ServerConnectionFactory.close(getConnection(), getSession());
406 timerPublishLoop.cancel();
407 }
408 } catch(JMSException e) {
409 log.error("Could not publish "+e);
410 }
411 }
412 }
413
414 /**
415 * Starts an instance of the Producer tool.
416 */
417 public static void main(String[] args) {
418 System.out.println("##########################################");
419 System.out.println(" Producer * start *");
420 System.out.println("##########################################");
421
422 Producer prod = new Producer();
423
424 if (args.length == 0 ){
425 displayToolParameters();
426 }
427
428 if (args.length > 0){
429 String mqServer = args[0];
430
431 if (mqServer.equalsIgnoreCase("SONICMQ")){
432 prod.setMQServer(ServerConnectionFactory.SONICMQ_SERVER);
433 } else if (mqServer.equalsIgnoreCase("TIBCOMQ")) {
434 prod.setMQServer(ServerConnectionFactory.TIBCOMQ_SERVER);
435 } else if (mqServer.equalsIgnoreCase("JBOSSMQ")) {
436 prod.setMQServer(ServerConnectionFactory.JBOSSMQ_SERVER);
437 } else if (mqServer.equalsIgnoreCase("OPENJMS")) {
438 prod.setMQServer(ServerConnectionFactory.OPENJMS_SERVER);
439 } else if (mqServer.equalsIgnoreCase("JORAM")) {
440 prod.setMQServer(ServerConnectionFactory.JORAM_SERVER);
441 } else if (mqServer.equalsIgnoreCase("MANTARAY")) {
442 prod.setMQServer(ServerConnectionFactory.MANTARAY_SERVER);
443 } else if (mqServer.equalsIgnoreCase("ACTIVEMQ")) {
444 //Run with the default broker
445 } else {
446 System.out.print("Please enter a valid server: [ ");
447 System.out.print("SONICMQ | " );
448 System.out.print("TIBCOMQ | " );
449 System.out.print("JBOSSMQ | " );
450 System.out.print("OPENJMS | " );
451 System.out.print("JORAM |");
452 System.out.print("MANTARAY |");
453 System.out.println("ACTIVEMQ ]");
454 }
455
456 }
457
458 if (args.length > 1) {
459 prod.setURL(args[1]);
460 } else {
461 System.out.println("Please specify the URL.");
462 }
463
464 if (args.length > 2) {
465 prod.setDuration(args[2]);
466 }
467
468 if (args.length > 3) {
469 prod.setRampUp(args[3]);
470 }
471
472 if (args.length > 4) {
473 prod.setNoProducer(args[4]);
474 }
475
476 if (args.length > 5) {
477 prod.setNoSubject(args[5]);
478 }
479
480 if (args.length > 6) {
481 prod.setMsgSize(args[6]);
482 }
483
484 if (args.length > 7) {
485 prod.setDurable(args[7]);
486 }
487
488 if (args.length > 8) {
489 prod.setTopic(args[8]);
490 }
491
492 if (args.length > 9) {
493 prod.setTransacted(args[9]);
494
495 if (args.length > 10) {
496 prod.setBatchSize(args[10]);
497 } else {
498 displayToolParameters();
499 System.out.println("Please specify the batch size.");
500 return;
501 }
502 }
503
504 if (args.length > 11) {
505 prod.setDefMsgInterval(args[11]);
506 if (!prod.getDefMsgInterval()) {
507 if (args.length > 12) {
508 prod.setMsgInterval(args[12]);
509 } else {
510 displayToolParameters();
511 System.out.println("Please specify the message interval.");
512 return;
513 }
514 }
515 }
516
517 prod.setDefMsgInterval("true");
518
519 System.out.println("Runnning Consumer tool with the following parameters:");
520 System.out.println("Server=" + prod.getMQServer());
521 System.out.println("URL="+prod.getURL());
522 System.out.println("Duration="+prod.getDuration());
523 System.out.println("Ramp up="+prod.getRampUp());
524 System.out.println("No. Producer="+prod.getNoProducer());
525 System.out.println("No. Subject="+prod.getNoSubject());
526 System.out.println("Msg Size="+prod.getMsgSize());
527 System.out.println("Is Durable="+prod.getDurable());
528 System.out.println("Is Topic="+prod.getTopic());
529 System.out.println("Is Transacted="+prod.getTransacted());
530
531 try {
532 prod.run();
533 } catch (Exception e){
534 System.out.println("Excception e="+e);
535
536 }
537 System.out.println("##########################################");
538 System.out.println("Producer * end *");
539 System.out.println("##########################################");
540 }
541
542 /**
543 * Prints to the console the Producer tool parameters.
544 */
545 private static void displayToolParameters(){
546 System.out.println("Producer tool usage: ");
547 System.out.print("[Message Queue Server] ");
548 System.out.print("[URL] ");
549 System.out.print("[Duration] ");
550 System.out.print("[Ramp up] ");
551 System.out.print("[No. of producer] ");
552 System.out.print("[No. of subject] ");
553 System.out.print("[Message size] ");
554 System.out.print("[Delivery mode] ");
555 System.out.print("[Is topic] ");
556 System.out.print("[Is transacted] ");
557 System.out.print("[Batch size] ");
558 System.out.print("[Has Message interval] ");
559 System.out.println("[Message interval] ");
560 }
561 }