Source code: org/activemq/perf/PerfConsumer.java
1 /**
2 * <a href="http://activemq.org">ActiveMQ: The Open Source Message Fabric</a>
3 *
4 * Copyright 2005 (C) Simula Labs Inc.
5 *
6 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
12 * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
13 * specific language governing permissions and limitations under the License.
14 *
15 */
16 package org.activemq.perf;
17
18 import javax.jms.Connection;
19 import javax.jms.ConnectionFactory;
20 import javax.jms.Destination;
21 import javax.jms.JMSException;
22 import javax.jms.Message;
23 import javax.jms.MessageConsumer;
24 import javax.jms.MessageListener;
25 import javax.jms.Session;
26 import javax.jms.Topic;
27 /**
28 * @version $Revision: 1.3 $
29 */
30 public class PerfConsumer implements MessageListener{
31 protected Connection connection;
32 protected MessageConsumer consumer;
33 protected PerfRate rate=new PerfRate();
34 public PerfConsumer(ConnectionFactory fac,Destination dest,String consumerName) throws JMSException{
35 connection=fac.createConnection();
36 Session s=connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
37 if(dest instanceof Topic&&consumerName!=null&&consumerName.length()>0){
38 consumer=s.createDurableSubscriber((Topic) dest,consumerName);
39 }else{
40 consumer=s.createConsumer(dest);
41 }
42 consumer.setMessageListener(this);
43 }
44 public PerfConsumer(ConnectionFactory fac,Destination dest) throws JMSException{
45 this(fac,dest,null);
46 }
47 public void start() throws JMSException{
48 connection.start();
49 rate.getRate();
50 }
51 public void stop() throws JMSException{
52 connection.stop();
53 }
54 public void shutDown() throws JMSException{
55 connection.close();
56 }
57 public PerfRate getRate(){
58 return rate;
59 }
60 public void onMessage(Message msg){
61 rate.increment();
62 }
63 }