Source code: org/activemq/advisories/ProducerDemandAdvisor.java
1 /**
2 *
3 * Copyright 2005 Hiram Chirino
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18 package org.activemq.advisories;
19
20 import javax.jms.Connection;
21 import javax.jms.Destination;
22 import javax.jms.JMSException;
23 import javax.jms.Message;
24 import javax.jms.MessageConsumer;
25 import javax.jms.MessageListener;
26 import javax.jms.ObjectMessage;
27 import javax.jms.Session;
28
29 import org.activemq.message.ActiveMQDestination;
30 import org.activemq.message.ConsumerInfo;
31 import org.apache.commons.logging.Log;
32 import org.apache.commons.logging.LogFactory;
33
34 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
35
36 /**
37 * A ProducerDemandAdvisor is used to know when a destination is in demand.
38 *
39 * Sometimes generating messages to send to a destination is very expensive
40 * and the application would like to avoid producing messages if there are no
41 * active consumers for the destination. There is a "demand" for messages
42 * when a consumer does come active.
43 *
44 * This object uses Advisory messages to know when consumer go active and
45 * inactive.
46 */
47 public class ProducerDemandAdvisor {
48
49 private static final Log log = LogFactory.getLog(ProducerDemandAdvisor.class);
50
51 private final ActiveMQDestination destination;
52 private Connection connection;
53 private Session session;
54 private SynchronizedBoolean started = new SynchronizedBoolean(false);
55 private int consumerCount;
56 private ProducerDemandListener demandListener;
57
58 public ProducerDemandAdvisor( Connection connection, final Destination destination ) throws JMSException {
59 this.connection = connection;
60 this.destination = ActiveMQDestination.transformDestination(destination);
61 }
62
63 /**
64 * @param destination
65 */
66 private void fireDemandEvent() {
67 demandListener.onEvent( new ProducerDemandEvent(destination, isInDemand()));
68 }
69
70 public boolean isInDemand() {
71 return consumerCount>0;
72 }
73
74 public ProducerDemandListener getDemandListener() {
75 return demandListener;
76 }
77
78 synchronized public void setDemandListener(ProducerDemandListener demandListener) {
79 this.demandListener = demandListener;
80 fireDemandEvent();
81 }
82
83 public void start() throws JMSException {
84 if (started.commit(false, true)) {
85 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
86 MessageConsumer consumer = session.createConsumer(destination.getTopicForConsumerAdvisory());
87 consumer.setMessageListener(new MessageListener(){
88 public void onMessage(Message msg) {
89 process(msg);
90 }
91 });
92 }
93 }
94
95 public void stop() throws JMSException {
96 if (started.commit(true, false)) {
97 if (session != null) {
98 session.close();
99 }
100 }
101 }
102
103 protected void process(Message msg) {
104 if (msg instanceof ObjectMessage) {
105 try {
106 ConsumerInfo info = (ConsumerInfo) ((ObjectMessage) msg).getObject();
107 ConsumerAdvisoryEvent event = new ConsumerAdvisoryEvent(info);
108
109
110 boolean inDemand = isInDemand();
111
112 if ( info.isStarted() ) {
113 consumerCount++;
114 } else {
115 consumerCount--;
116 }
117
118 // Notify listener if there was a change in demand.
119 if (inDemand ^ isInDemand() && demandListener != null) {
120 fireDemandEvent();
121 }
122 } catch (JMSException e) {
123 log.error("Failed to process message: " + msg);
124 }
125 }
126 }
127
128 }