Docjar: A Java Source and Docuemnt Enginecom.*    java.*    javax.*    org.*    all    new    plug-in

Quick Search    Search Deep

Source code: org/activemq/sampler/Producer.java


1   /**
2    *
3    * Copyright 2004 Protique Ltd
4    *
5    * Licensed under the Apache License, Version 2.0 (the "License");
6    * you may not use this file except in compliance with the License.
7    * You may obtain a copy of the License at
8    *
9    * http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   *
17   **/
18  package org.activemq.sampler;
19  
20  import org.apache.jmeter.engine.event.LoopIterationEvent;
21  import org.apache.jmeter.samplers.Entry;
22  import org.apache.jmeter.samplers.SampleResult;
23  import org.apache.jmeter.testelement.TestListener;
24  import org.apache.jmeter.util.JMeterUtils;
25  import org.apache.jorphan.logging.LoggingManager;
26  import org.apache.log.Logger;
27  import org.activemq.util.connection.ServerConnectionFactory;
28  import org.activemq.util.IdGenerator;
29  
30  import javax.jms.Connection;
31  import javax.jms.Session;
32  import javax.jms.JMSException;
33  import javax.jms.Topic;
34  import javax.jms.Queue;
35  import javax.jms.TopicSession;
36  import javax.jms.QueueSession;
37  import javax.jms.Destination;
38  import javax.jms.Message;
39  import javax.jms.TopicPublisher;
40  import javax.jms.QueueSender;
41  import javax.jms.MessageProducer;
42  import javax.jms.DeliveryMode;
43  
44  import java.io.File;
45  import java.io.FileInputStream;
46  import java.io.FileNotFoundException;
47  import java.io.IOException;
48  import java.util.Properties;
49  import java.util.Timer;
50  import java.util.TimerTask;
51  
52  /**
53   * A sampler which understands Tcp requests.
54   */
55  public class Producer extends Sampler implements TestListener {
56  
57      public static int counter;
58  
59      private static final Logger log = LoggingManager.getLoggerForClass();
60  
61      // Otherwise, the response is scanned for these strings
62      private static final String STATUS_PREFIX = JMeterUtils.getPropDefault("tcp.status.prefix", "");
63      private static final String STATUS_SUFFIX = JMeterUtils.getPropDefault("tcp.status.suffix", "");
64      private static final String STATUS_PROPERTIES = JMeterUtils.getPropDefault("tcp.status.properties", "");
65  
66      private static final Properties statusProps = new Properties();
67      private static final long INSECONDS = 60;
68      private static final long MSGINTERVALINSECS = 60;
69  
70      private Timer timerPublish;
71      private Timer timerPublishLoop;
72  
73      private int batchCounter = 0;
74  
75      static {
76          log.info("Protocol Handler name=" + getClassname());
77          log.info("Status prefix=" + STATUS_PREFIX);
78          log.info("Status suffix=" + STATUS_SUFFIX);
79          log.info("Status properties=" + STATUS_PROPERTIES);
80  
81          if (STATUS_PROPERTIES.length() > 0) {
82              File f = new File(STATUS_PROPERTIES);
83  
84              try {
85                  statusProps.load(new FileInputStream(f));
86                  log.info("Successfully loaded properties");
87                  //haveStatusProps = true;
88              } catch (FileNotFoundException e) {
89                  log.info("Property file not found");
90              } catch (IOException e) {
91                  log.info("Property file error " + e.toString());
92              }
93          }
94      }
95  
96      /**
97       * Constructor for ProducerSampler object.
98       */
99      public Producer() {
100         log.debug("Created " + this);
101         protocolHandler = this.getProtocol(); //from superclass sampler.
102         log.debug("Using Protocol Handler: " + protocolHandler.getClass().getName());
103     }
104 
105     /**
106      * Increments the int variable.
107      *
108      * @param count - variable incremented.
109      */
110     protected synchronized void count(int count) {
111         counter += count;
112     }
113 
114     /**
115      * @return Returns the message.
116      */
117     protected String getMessage() {
118         StringBuffer buffer = new StringBuffer();
119 
120         for (int i = 0; i < getMsgSize(); i++) {
121             char ch = 'X';
122             buffer.append(ch);
123         }
124 
125         return buffer.toString();
126     }
127 
128     /**
129      * Retrieves the message then sends it via tcp.
130      *
131      * @throws Exception
132      */
133     protected void publish() throws Exception {
134         long threadRampUp = 0;
135 
136         if (getNoProducer() > 0) {
137             threadRampUp = (long) ((double) (getRampUp() * INSECONDS) / ((double) getNoProducer()) * 1000);
138         }
139 
140         timerPublish = new Timer();
141         timerPublish.scheduleAtFixedRate(new newThread(), 0, threadRampUp);
142     }
143 
144     /**
145      * Sends the information from the client via tcp.
146      *
147      * @param text    - message that is sent.
148      * @param subject - subject of the message to be sent.
149      * @throws JMSException
150      */
151     protected void publish(String text, String subject) throws JMSException {
152         Destination destination = null;
153         Session session = null;
154         MessageProducer publisher = null;
155         Connection connection = null;
156 
157         connection = ServerConnectionFactory.createConnectionFactory(this.getURL(),
158                                                                      this.getMQServer(),
159                                                                      this.getTopic(),
160                                                                      this.getEmbeddedBroker());
161 
162         if (this.getDurable()) {
163             if ((ServerConnectionFactory.JORAM_SERVER.equals(this.getMQServer()))||
164                 (ServerConnectionFactory.MANTARAY_SERVER.equals(this.getMQServer()))) {
165                 //Id set be server
166 
167             } else {
168                 IdGenerator idGenerator = new IdGenerator();
169                 connection.setClientID(idGenerator.generateId());
170             }
171         }
172 
173         session = ServerConnectionFactory.createSession(connection,
174                                                         this.getTransacted(),
175                                                         this.getMQServer(),
176                                                         this.getTopic());
177 
178         destination = ServerConnectionFactory.createDestination(session,
179                                                                 subject,
180                                                                 this.getURL(),
181                                                                 this.getMQServer(),
182                                                                 this.getTopic());
183 
184         if ((ServerConnectionFactory.OPENJMS_SERVER.equals(this.getMQServer()))||
185             (ServerConnectionFactory.MANTARAY_SERVER.equals(this.getMQServer()))) {
186             if (this.getTopic()){
187                 connection.start();
188                 TopicPublisher topicPublisher = ((TopicSession)session).createPublisher((Topic)destination);
189                 publisher = topicPublisher;
190 
191             } else {
192                 connection.start();
193                 QueueSender queuePublisher = ((QueueSession)session).createSender((Queue)destination);
194                 publisher = queuePublisher;
195 
196             }
197 
198         } else {
199             publisher = session.createProducer(destination);
200         }
201 
202         long msgIntervalInMins = this.getMsgInterval();
203         long msgIntervalInSecs = msgIntervalInMins * INSECONDS;
204 
205         if (msgIntervalInSecs < 0) {
206             msgIntervalInSecs = MSGINTERVALINSECS;
207         }
208 
209         if (getDurable()) {
210             publisher.setDeliveryMode(DeliveryMode.PERSISTENT);
211         } else {
212             publisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
213         }
214 
215         if (this.getDefMsgInterval()) {
216             while (!stopThread) {
217                 publishLoop(session, publisher, text);
218             }
219             ServerConnectionFactory.close(connection, session);
220 
221         } else {
222             // set the session, publisher and connection.
223             this.setSession(session);
224             this.setPublisher(publisher);
225             this.setConnection(connection);
226 
227             timerPublishLoop = new Timer();
228             timerPublishLoop.scheduleAtFixedRate(new publish(), 0, msgIntervalInSecs * 1000);
229 
230         }
231     }
232 
233     /**
234      * Sends a message through MessageProducer object.
235      *
236      * @param session   - Session oject.
237      * @param publisher - MessageProducer object.
238      * @param text      - text that is used to create Message object.
239      * @throws JMSException
240      */
241     protected void publishLoop(Session session, MessageProducer publisher, String text) throws JMSException {
242         if (ServerConnectionFactory.OPENJMS_SERVER.equals(this.getMQServer())) {
243             if (publisher instanceof TopicPublisher) {
244                 Message message = ((TopicSession)session).createTextMessage(text);
245                 ((TopicPublisher)publisher).publish(message);
246             } else if (publisher instanceof QueueSender){
247                 Message message = ((QueueSession)session).createTextMessage(text);
248                 ((QueueSender)publisher).send(message);
249             }
250         } else {
251             Message message = session.createTextMessage(text);
252             publisher.send(message);
253         }
254 
255         if (this.getTransacted()) {
256             batchCounter++;
257 
258             if (batchCounter == this.getBatchSize()) {
259                 batchCounter = 0;
260                 session.commit();
261             }
262         }
263 
264         count(1);
265     }
266 
267     /**
268      * @return the current number of messages sent.
269      */
270     public static synchronized int resetCount() {
271         int answer = counter;
272         counter = 0;
273         return answer;
274     }
275 
276     /**
277      * Runs and publish the message.
278      *
279      * @throws Exception
280      */
281     public void run() throws Exception {
282 
283         start();
284         publish();
285     }
286 
287     /**
288      * Retrieves the sample as SampleResult object. There are times that this
289      * is ignored.
290      *
291      * @param e - Entry object.
292      * @return Returns the sample result.
293      */
294     public SampleResult sample(Entry e) { // Entry tends to be ignored ...
295         SampleResult res = new SampleResult();
296         res.setSampleLabel(getName());
297         res.setSamplerData(getURL());
298         res.sampleStart();
299 
300         try {
301             //run the benchmark tool code
302             this.run();
303         } catch (Exception ex) {
304             log.debug("Error running producer ", ex);
305             res.setResponseCode("500");
306             res.setResponseMessage(ex.toString());
307         }
308 
309         //Calculate response time
310         res.sampleEnd();
311 
312         // Set if we were successful or not
313         res.setSuccessful(true);
314 
315         return res;
316     }
317 
318     /**
319      * Logs the end of the test. This is called only once per
320      * class.
321      */
322     public void testEnded() {
323         log.debug(this + " test ended");
324     }
325 
326     /**
327      * Logs the host at the end of the test.
328      *
329      * @param host - the host to be logged.
330      */
331     public void testEnded(String host) {
332         log.debug(this + " test ended on " + host);
333     }
334 
335     /**
336      * Logs the start of the test. This is called only once
337      * per class.
338      */
339     public void testStarted() {
340         log.debug(this + " test started");
341     }
342 
343     /**
344      * Logs the host at the start of the test.     *
345      * @param host - the host to be logged.
346      */
347     public void testStarted(String host) {
348         log.debug(this + " test started on " + host);
349     }
350 
351     /**
352      * Logs the iteration event.
353      *
354      * @param event
355      */
356     public void testIterationStart(LoopIterationEvent event) {
357         log.debug(this + " test iteration start on " + event.getIteration());
358     }
359 
360     /**
361      * Creates thread for publishing messages.
362      */
363     class newThread extends TimerTask {
364         final String text = getMessage();
365         int numberOfProducer = getNoProducer();
366         int counter = 0;
367 
368         public void run() {
369              if (counter < numberOfProducer) {
370                  final String subject = subjects[counter % getNoSubject()];
371 
372                  counter++;
373 
374                  Thread thread = new Thread() {
375                     public void run() {
376                         try {
377                               if (stopThread) {
378                                   return;
379                               } else {
380                                   publish(text, subject);
381                               }
382                         } catch (JMSException e) {
383                             log.error("Error publishing message ", e);
384                         }
385                     }
386                 };
387 
388                 thread.start();
389 
390              } else {
391                  timerPublish.cancel();
392              }
393         }
394     }
395 
396     /**
397      * Starts the publish loop timer.
398      */
399     class publish extends TimerTask {
400         public void run() {
401             try {
402                 if (!stopThread) {
403                     publishLoop(getSession(), getPublisher(), getMessage());
404                 } else {
405                     ServerConnectionFactory.close(getConnection(), getSession());
406                     timerPublishLoop.cancel();
407                 }
408             } catch(JMSException e) {
409                 log.error("Could not publish "+e);
410             }
411         }
412     }
413 
414     /**
415      * Starts an instance of the Producer tool.
416      */
417     public static void main(String[] args) {
418         System.out.println("##########################################");
419         System.out.println(" Producer * start *");
420         System.out.println("##########################################");
421 
422         Producer prod = new Producer();
423 
424         if (args.length == 0 ){
425             displayToolParameters();
426         }
427 
428         if (args.length > 0){
429             String mqServer = args[0];
430 
431             if (mqServer.equalsIgnoreCase("SONICMQ")){
432                 prod.setMQServer(ServerConnectionFactory.SONICMQ_SERVER);
433             } else if (mqServer.equalsIgnoreCase("TIBCOMQ")) {
434                 prod.setMQServer(ServerConnectionFactory.TIBCOMQ_SERVER);
435             } else if (mqServer.equalsIgnoreCase("JBOSSMQ")) {
436                 prod.setMQServer(ServerConnectionFactory.JBOSSMQ_SERVER);
437             } else if (mqServer.equalsIgnoreCase("OPENJMS")) {
438                 prod.setMQServer(ServerConnectionFactory.OPENJMS_SERVER);
439             } else if (mqServer.equalsIgnoreCase("JORAM")) {
440                 prod.setMQServer(ServerConnectionFactory.JORAM_SERVER);
441             } else if (mqServer.equalsIgnoreCase("MANTARAY")) {
442                 prod.setMQServer(ServerConnectionFactory.MANTARAY_SERVER);
443             } else if (mqServer.equalsIgnoreCase("ACTIVEMQ")) {
444                 //Run with the default broker
445             } else {
446                 System.out.print("Please enter a valid server: [ ");
447                 System.out.print("SONICMQ | " );
448                 System.out.print("TIBCOMQ | " );
449                 System.out.print("JBOSSMQ | " );
450                 System.out.print("OPENJMS | " );
451                 System.out.print("JORAM |");
452                 System.out.print("MANTARAY |");
453                 System.out.println("ACTIVEMQ ]");
454             }
455 
456         }
457 
458         if (args.length > 1) {
459             prod.setURL(args[1]);
460         } else {
461             System.out.println("Please specify the URL.");
462         }
463 
464         if (args.length > 2) {
465             prod.setDuration(args[2]);
466         }
467 
468         if (args.length > 3) {
469             prod.setRampUp(args[3]);
470         }
471 
472         if (args.length > 4) {
473             prod.setNoProducer(args[4]);
474         }
475 
476         if (args.length > 5) {
477             prod.setNoSubject(args[5]);
478         }
479 
480         if (args.length > 6) {
481             prod.setMsgSize(args[6]);
482         }
483 
484         if (args.length > 7) {
485             prod.setDurable(args[7]);
486         }
487 
488         if (args.length > 8) {
489             prod.setTopic(args[8]);
490         }
491 
492         if (args.length > 9) {
493             prod.setTransacted(args[9]);
494 
495             if (args.length > 10) {
496                 prod.setBatchSize(args[10]);
497             } else {
498                 displayToolParameters();
499                 System.out.println("Please specify the batch size.");
500                 return;
501             }
502         }
503 
504         if (args.length > 11) {
505             prod.setDefMsgInterval(args[11]);
506             if (!prod.getDefMsgInterval()) {
507                 if (args.length > 12) {
508                     prod.setMsgInterval(args[12]);
509                 } else {
510                     displayToolParameters();
511                     System.out.println("Please specify the message interval.");
512                     return;
513                  }
514              }
515         }
516 
517         prod.setDefMsgInterval("true");
518 
519         System.out.println("Runnning Consumer tool with the following parameters:");
520         System.out.println("Server=" + prod.getMQServer());
521         System.out.println("URL="+prod.getURL());
522         System.out.println("Duration="+prod.getDuration());
523         System.out.println("Ramp up="+prod.getRampUp());
524         System.out.println("No. Producer="+prod.getNoProducer());
525         System.out.println("No. Subject="+prod.getNoSubject());
526         System.out.println("Msg Size="+prod.getMsgSize());
527         System.out.println("Is Durable="+prod.getDurable());
528         System.out.println("Is Topic="+prod.getTopic());
529         System.out.println("Is Transacted="+prod.getTransacted());
530 
531         try {
532             prod.run();
533         } catch (Exception e){
534             System.out.println("Excception e="+e);
535 
536         }
537         System.out.println("##########################################");
538         System.out.println("Producer * end *");
539         System.out.println("##########################################");
540     }
541 
542     /**
543      * Prints to the console the Producer tool parameters.
544      */
545     private static void displayToolParameters(){
546         System.out.println("Producer tool usage: ");
547         System.out.print("[Message Queue Server] ");
548         System.out.print("[URL] ");
549         System.out.print("[Duration] ");
550         System.out.print("[Ramp up] ");
551         System.out.print("[No. of producer] ");
552         System.out.print("[No. of subject] ");
553         System.out.print("[Message size] ");
554         System.out.print("[Delivery mode] ");
555         System.out.print("[Is topic] ");
556         System.out.print("[Is transacted] ");
557         System.out.print("[Batch size] ");
558         System.out.print("[Has Message interval] ");
559         System.out.println("[Message interval] ");
560     }
561 }