Docjar: A Java Source and Docuemnt Enginecom.*    java.*    javax.*    org.*    all    new    plug-in

Quick Search    Search Deep

Source code: org/activemq/advisories/ProducerDemandAdvisor.java


1   /** 
2    * 
3    * Copyright 2005 Hiram Chirino
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  package org.activemq.advisories;
19  
20  import javax.jms.Connection;
21  import javax.jms.Destination;
22  import javax.jms.JMSException;
23  import javax.jms.Message;
24  import javax.jms.MessageConsumer;
25  import javax.jms.MessageListener;
26  import javax.jms.ObjectMessage;
27  import javax.jms.Session;
28  
29  import org.activemq.message.ActiveMQDestination;
30  import org.activemq.message.ConsumerInfo;
31  import org.apache.commons.logging.Log;
32  import org.apache.commons.logging.LogFactory;
33  
34  import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
35  
36  /**
37   * A ProducerDemandAdvisor is used to know when a destination is in demand.
38   * 
39   * Sometimes generating messages to send to a destination is very expensive 
40   * and the application would like to avoid producing messages if there are no 
41   * active consumers for the destination.  There is a "demand" for messages
42   * when a consumer does come active.
43   * 
44   * This object uses Advisory messages to know when consumer go active and 
45   * inactive.
46   */
47  public class ProducerDemandAdvisor {
48      
49      private static final Log log = LogFactory.getLog(ProducerDemandAdvisor.class);
50  
51      private final ActiveMQDestination destination;
52      private Connection connection;
53      private Session session;
54      private SynchronizedBoolean started = new SynchronizedBoolean(false);
55      private int consumerCount;
56      private ProducerDemandListener demandListener;
57      
58      public ProducerDemandAdvisor( Connection connection, final Destination destination ) throws JMSException {
59          this.connection = connection;
60          this.destination = ActiveMQDestination.transformDestination(destination);
61      }
62       
63      /**
64       * @param destination
65       */
66      private void fireDemandEvent() {
67          demandListener.onEvent( new ProducerDemandEvent(destination, isInDemand()));
68      }
69  
70      public boolean isInDemand() {
71          return consumerCount>0;
72      }
73      
74      public ProducerDemandListener getDemandListener() {
75          return demandListener;
76      }
77  
78      synchronized public void setDemandListener(ProducerDemandListener demandListener) {
79          this.demandListener = demandListener;
80          fireDemandEvent();
81      }
82  
83      public void start() throws JMSException {
84          if (started.commit(false, true)) {
85              session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
86              MessageConsumer consumer = session.createConsumer(destination.getTopicForConsumerAdvisory());
87              consumer.setMessageListener(new MessageListener(){
88                  public void onMessage(Message msg) {
89                      process(msg);
90                  }
91              });
92          }
93      }
94  
95      public void stop() throws JMSException {
96          if (started.commit(true, false)) {
97              if (session != null) {
98                  session.close();
99              }
100         }
101     }
102     
103     protected void process(Message msg) {
104         if (msg instanceof ObjectMessage) {
105             try {
106                 ConsumerInfo info = (ConsumerInfo) ((ObjectMessage) msg).getObject();
107                 ConsumerAdvisoryEvent event = new ConsumerAdvisoryEvent(info);
108                 
109                 
110                 boolean inDemand = isInDemand();
111 
112                 if ( info.isStarted() ) {
113                     consumerCount++;
114                 } else {
115                     consumerCount--;
116                 }
117                 
118                 // Notify listener if there was a change in demand.
119                 if (inDemand ^ isInDemand() && demandListener != null) {
120                     fireDemandEvent();
121                 }
122             } catch (JMSException e) {
123                 log.error("Failed to process message: " + msg);
124             }
125         }
126     }
127 
128 }