Docjar: A Java Source and Docuemnt Enginecom.*    java.*    javax.*    org.*    all    new    plug-in

Quick Search    Search Deep

Source code: org/activemq/spring/SpringConsumer.java


1   /** 
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  package org.activemq.spring;
19  
20  import org.apache.commons.logging.Log;
21  import org.apache.commons.logging.LogFactory;
22  import org.activemq.service.Service;
23  import org.springframework.jms.core.JmsTemplate;
24  
25  import javax.jms.Connection;
26  import javax.jms.ConnectionFactory;
27  import javax.jms.Destination;
28  import javax.jms.JMSException;
29  import javax.jms.Message;
30  import javax.jms.MessageConsumer;
31  import javax.jms.MessageListener;
32  import javax.jms.Session;
33  
34  /**
35   * @author Mike Perham
36   * @version $Revision$
37   */
38  public class SpringConsumer extends ConsumerBean implements Service, MessageListener {
39      private static final Log log = LogFactory.getLog(SpringConsumer.class);
40  
41      private JmsTemplate template;
42      private String myId = "foo";
43      private Destination destination;
44      private Connection connection;
45      private Session session;
46      private MessageConsumer consumer;
47  
48      public void start() throws JMSException {
49          String selector = "next = '" + myId + "'";
50  
51          try {
52              ConnectionFactory factory = template.getConnectionFactory();
53              connection = factory.createConnection();
54  
55              // we might be a reusable connection in spring
56              // so lets only set the client ID once if its not set
57              synchronized (connection) {
58                  if (connection.getClientID() == null) {
59                      connection.setClientID(myId);
60                  }
61              }
62  
63              connection.start();
64  
65              session = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE);
66              consumer = session.createConsumer(destination, selector, false);
67              consumer.setMessageListener(this);
68          }
69          catch (JMSException ex) {
70              log.error("", ex);
71              throw ex;
72          }
73      }
74  
75  
76      public void stop() throws JMSException {
77      if( consumer!=null )
78        consumer.close();
79      if( session!=null )
80        session.close();
81      if( connection!=null )
82        connection.close();
83      }
84  
85      public void onMessage(Message message) {
86          super.onMessage(message);
87          try {
88              message.acknowledge();
89          }
90          catch (JMSException e) {
91              log.error("Failed to acknowledge: " + e, e);
92          }
93      }
94  
95      // Properties
96      //-------------------------------------------------------------------------
97      public Destination getDestination() {
98          return destination;
99      }
100 
101     public void setDestination(Destination destination) {
102         this.destination = destination;
103     }
104 
105     public String getMyId() {
106         return myId;
107     }
108 
109     public void setMyId(String myId) {
110         this.myId = myId;
111     }
112 
113     public JmsTemplate getTemplate() {
114         return template;
115     }
116 
117     public void setTemplate(JmsTemplate template) {
118         this.template = template;
119     }
120 }