Docjar: A Java Source and Docuemnt Enginecom.*    java.*    javax.*    org.*    all    new    plug-in

Quick Search    Search Deep

Source code: org/activemq/ActiveMQMessageProducer.java


1   /** 
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  package org.activemq;
19  
20  import javax.jms.DeliveryMode;
21  import javax.jms.Destination;
22  import javax.jms.IllegalStateException;
23  import javax.jms.InvalidDestinationException;
24  import javax.jms.JMSException;
25  import javax.jms.Message;
26  import javax.jms.MessageFormatException;
27  import javax.jms.MessageProducer;
28  
29  import org.activemq.management.JMSProducerStatsImpl;
30  import org.activemq.management.StatsCapable;
31  import org.activemq.management.StatsImpl;
32  import org.activemq.message.ActiveMQDestination;
33  import org.activemq.util.IdGenerator;
34  
35  /**
36   * A client uses a <CODE>MessageProducer</CODE> object to send messages to a
37   * destination. A <CODE>MessageProducer</CODE> object is created by passing a
38   * <CODE>Destination</CODE> object to a message-producer creation method
39   * supplied by a session.
40   * <P>
41   * <CODE>MessageProducer</CODE> is the parent interface for all message
42   * producers.
43   * <P>
44   * A client also has the option of creating a message producer without
45   * supplying a destination. In this case, a destination must be provided with
46   * every send operation. A typical use for this kind of message producer is to
47   * send replies to requests using the request's <CODE>JMSReplyTo</CODE>
48   * destination.
49   * <P>
50   * A client can specify a default delivery mode, priority, and time to live for
51   * messages sent by a message producer. It can also specify the delivery mode,
52   * priority, and time to live for an individual message.
53   * <P>
54   * A client can specify a time-to-live value in milliseconds for each message
55   * it sends. This value defines a message expiration time that is the sum of
56   * the message's time-to-live and the GMT when it is sent (for transacted
57   * sends, this is the time the client sends the message, not the time the
58   * transaction is committed).
59   * <P>
60   * A JMS provider should do its best to expire messages accurately; however,
61   * the JMS API does not define the accuracy provided.
62   *
63   * @version $Revision: 1.1.1.1 $
64   * @see javax.jms.TopicPublisher
65   * @see javax.jms.QueueSender
66   * @see javax.jms.Session#createProducer
67   */
68  public class ActiveMQMessageProducer implements MessageProducer, StatsCapable, Closeable {
69      protected ActiveMQSession session;
70      private IdGenerator idGenerator;
71      protected boolean closed;
72      private boolean disableMessageID;
73      private boolean disableMessageTimestamp;
74      private int defaultDeliveryMode;
75      private int defaultPriority;
76      private long defaultTimeToLive;
77      protected ActiveMQDestination defaultDestination;
78      private long startTime;
79      private JMSProducerStatsImpl stats;
80      private short producerId;
81     
82      private boolean reuseMessageId; //hint to reuse the same messageId - if already set
83  
84      protected ActiveMQMessageProducer(ActiveMQSession theSession, ActiveMQDestination destination) throws JMSException {
85          this.session = theSession;
86          this.defaultDestination = destination;
87          this.idGenerator = new IdGenerator();
88          this.disableMessageID = false;
89          this.disableMessageTimestamp = theSession.connection.isDisableTimeStampsByDefault();
90          this.defaultDeliveryMode = Message.DEFAULT_DELIVERY_MODE;
91          this.defaultPriority = Message.DEFAULT_PRIORITY;
92          this.defaultTimeToLive = Message.DEFAULT_TIME_TO_LIVE;
93          this.startTime = System.currentTimeMillis();
94          this.session.addProducer(this);
95          this.stats = new JMSProducerStatsImpl(theSession.getSessionStats(), destination);
96      }
97  
98      public StatsImpl getStats() {
99          return stats;
100     }
101 
102     public JMSProducerStatsImpl getProducerStats() {
103         return stats;
104     }
105 
106     /**
107      * Sets whether message IDs are disabled.
108      * <P>
109      * Since message IDs take some effort to create and increase a message's
110      * size, some JMS providers may be able to optimize message overhead if
111      * they are given a hint that the message ID is not used by an application.
112      * By calling the <CODE>setDisableMessageID</CODE> method on this message
113      * producer, a JMS client enables this potential optimization for all
114      * messages sent by this message producer. If the JMS provider accepts this
115      * hint, these messages must have the message ID set to null; if the
116      * provider ignores the hint, the message ID must be set to its normal
117      * unique value.
118      * <P>
119      * Message IDs are enabled by default.
120      *
121      * @param value indicates if message IDs are disabled
122      * @throws JMSException if the JMS provider fails to close the producer due to
123      *                      some internal error.
124      */
125     public void setDisableMessageID(boolean value) throws JMSException {
126         checkClosed();
127         this.disableMessageID = value;
128     }
129 
130     /**
131      * Gets an indication of whether message IDs are disabled.
132      *
133      * @return an indication of whether message IDs are disabled
134      * @throws JMSException if the JMS provider fails to determine if message IDs are
135      *                      disabled due to some internal error.
136      */
137     public boolean getDisableMessageID() throws JMSException {
138         checkClosed();
139         return this.disableMessageID;
140     }
141 
142     /**
143      * Sets whether message timestamps are disabled.
144      * <P>
145      * Since timestamps take some effort to create and increase a message's
146      * size, some JMS providers may be able to optimize message overhead if
147      * they are given a hint that the timestamp is not used by an application.
148      * By calling the <CODE>setDisableMessageTimestamp</CODE> method on this
149      * message producer, a JMS client enables this potential optimization for
150      * all messages sent by this message producer. If the JMS provider accepts
151      * this hint, these messages must have the timestamp set to zero; if the
152      * provider ignores the hint, the timestamp must be set to its normal
153      * value.
154      * <P>
155      * Message timestamps are enabled by default.
156      *
157      * @param value indicates if message timestamps are disabled
158      * @throws JMSException if the JMS provider fails to close the producer due to
159      *                      some internal error.
160      */
161     public void setDisableMessageTimestamp(boolean value) throws JMSException {
162         checkClosed();
163         this.disableMessageTimestamp = value;
164     }
165 
166     /**
167      * Gets an indication of whether message timestamps are disabled.
168      *
169      * @return an indication of whether message timestamps are disabled
170      * @throws JMSException if the JMS provider fails to close the producer due to
171      *                      some internal error.
172      */
173     public boolean getDisableMessageTimestamp() throws JMSException {
174         checkClosed();
175         return this.disableMessageTimestamp;
176     }
177 
178     /**
179      * Sets the producer's default delivery mode.
180      * <P>
181      * Delivery mode is set to <CODE>PERSISTENT</CODE> by default.
182      *
183      * @param newDeliveryMode the message delivery mode for this message producer; legal
184      *                        values are <code>DeliveryMode.NON_PERSISTENT</code> and
185      *                        <code>DeliveryMode.PERSISTENT</code>
186      * @throws JMSException if the JMS provider fails to set the delivery mode due to
187      *                      some internal error.
188      * @see javax.jms.MessageProducer#getDeliveryMode
189      * @see javax.jms.DeliveryMode#NON_PERSISTENT
190      * @see javax.jms.DeliveryMode#PERSISTENT
191      * @see javax.jms.Message#DEFAULT_DELIVERY_MODE
192      */
193     public void setDeliveryMode(int newDeliveryMode) throws JMSException {
194         if (newDeliveryMode != DeliveryMode.PERSISTENT && newDeliveryMode != DeliveryMode.NON_PERSISTENT) {
195             throw new IllegalStateException("unkown delivery mode: " + newDeliveryMode);
196         }
197         checkClosed();
198         this.defaultDeliveryMode = newDeliveryMode;
199     }
200 
201     /**
202      * Gets the producer's default delivery mode.
203      *
204      * @return the message delivery mode for this message producer
205      * @throws JMSException if the JMS provider fails to close the producer due to
206      *                      some internal error.
207      */
208     public int getDeliveryMode() throws JMSException {
209         checkClosed();
210         return this.defaultDeliveryMode;
211     }
212 
213     /**
214      * Sets the producer's default priority.
215      * <P>
216      * The JMS API defines ten levels of priority value, with 0 as the lowest
217      * priority and 9 as the highest. Clients should consider priorities 0-4 as
218      * gradations of normal priority and priorities 5-9 as gradations of
219      * expedited priority. Priority is set to 4 by default.
220      *
221      * @param newDefaultPriority the message priority for this message producer; must be a
222      *                           value between 0 and 9
223      * @throws JMSException if the JMS provider fails to set the delivery mode due to
224      *                      some internal error.
225      * @see javax.jms.MessageProducer#getPriority
226      * @see javax.jms.Message#DEFAULT_PRIORITY
227      */
228     public void setPriority(int newDefaultPriority) throws JMSException {
229         if (newDefaultPriority < 0 || newDefaultPriority > 9) {
230             throw new IllegalStateException("default priority must be a value between 0 and 9");
231         }
232         checkClosed();
233         this.defaultPriority = newDefaultPriority;
234     }
235 
236     /**
237      * Gets the producer's default priority.
238      *
239      * @return the message priority for this message producer
240      * @throws JMSException if the JMS provider fails to close the producer due to
241      *                      some internal error.
242      * @see javax.jms.MessageProducer#setPriority
243      */
244     public int getPriority() throws JMSException {
245         checkClosed();
246         return this.defaultPriority;
247     }
248 
249     /**
250      * Sets the default length of time in milliseconds from its dispatch time
251      * that a produced message should be retained by the message system.
252      * <P>
253      * Time to live is set to zero by default.
254      *
255      * @param timeToLive the message time to live in milliseconds; zero is unlimited
256      * @throws JMSException if the JMS provider fails to set the time to live due to
257      *                      some internal error.
258      * @see javax.jms.MessageProducer#getTimeToLive
259      * @see javax.jms.Message#DEFAULT_TIME_TO_LIVE
260      */
261     public void setTimeToLive(long timeToLive) throws JMSException {
262         if (timeToLive < 0l) {
263             throw new IllegalStateException("cannot set a negative timeToLive");
264         }
265         checkClosed();
266         this.defaultTimeToLive = timeToLive;
267     }
268 
269     /**
270      * Gets the default length of time in milliseconds from its dispatch time
271      * that a produced message should be retained by the message system.
272      *
273      * @return the message time to live in milliseconds; zero is unlimited
274      * @throws JMSException if the JMS provider fails to get the time to live due to
275      *                      some internal error.
276      * @see javax.jms.MessageProducer#setTimeToLive
277      */
278     public long getTimeToLive() throws JMSException {
279         checkClosed();
280         return this.defaultTimeToLive;
281     }
282 
283     /**
284      * Gets the destination associated with this <CODE>MessageProducer</CODE>.
285      *
286      * @return this producer's <CODE>Destination/ <CODE>
287      * @throws JMSException if the JMS provider fails to close the producer due to
288      *                      some internal error.
289      * @since 1.1
290      */
291     public Destination getDestination() throws JMSException {
292         checkClosed();
293         return this.defaultDestination;
294     }
295 
296     /**
297      * Closes the message producer.
298      * <P>
299      * Since a provider may allocate some resources on behalf of a <CODE>
300      * MessageProducer</CODE> outside the Java virtual machine, clients should
301      * close them when they are not needed. Relying on garbage collection to
302      * eventually reclaim these resources may not be timely enough.
303      *
304      * @throws JMSException if the JMS provider fails to close the producer due to
305      *                      some internal error.
306      */
307     public void close() throws JMSException {
308         this.session.removeProducer(this);
309         closed = true;
310     }
311 
312     protected void checkClosed() throws IllegalStateException {
313         if (closed) {
314             throw new IllegalStateException("The producer is closed");
315         }
316     }
317 
318     /**
319      * Sends a message using the <CODE>MessageProducer</CODE>'s default
320      * delivery mode, priority, and time to live.
321      *
322      * @param message the message to send
323      * @throws JMSException                if the JMS provider fails to send the message due to some
324      *                                     internal error.
325      * @throws MessageFormatException      if an invalid message is specified.
326      * @throws InvalidDestinationException if a client uses this method with a <CODE>
327      *                                     MessageProducer</CODE> with an invalid destination.
328      * @throws java.lang.UnsupportedOperationException
329      *                                     if a client uses this method with a <CODE>
330      *                                     MessageProducer</CODE> that did not specify a
331      *                                     destination at creation time.
332      * @see javax.jms.Session#createProducer
333      * @see javax.jms.MessageProducer
334      * @since 1.1
335      */
336     public void send(Message message) throws JMSException {
337         this.send(this.defaultDestination, message, this.defaultDeliveryMode, this.defaultPriority,
338                 this.defaultTimeToLive);
339     }
340 
341     /**
342      * Sends a message to the destination, specifying delivery mode, priority,
343      * and time to live.
344      *
345      * @param message      the message to send
346      * @param deliveryMode the delivery mode to use
347      * @param priority     the priority for this message
348      * @param timeToLive   the message's lifetime (in milliseconds)
349      * @throws JMSException                if the JMS provider fails to send the message due to some
350      *                                     internal error.
351      * @throws MessageFormatException      if an invalid message is specified.
352      * @throws InvalidDestinationException if a client uses this method with a <CODE>
353      *                                     MessageProducer</CODE> with an invalid destination.
354      * @throws java.lang.UnsupportedOperationException
355      *                                     if a client uses this method with a <CODE>
356      *                                     MessageProducer</CODE> that did not specify a
357      *                                     destination at creation time.
358      * @see javax.jms.Session#createProducer
359      * @since 1.1
360      */
361     public void send(Message message, int deliveryMode, int priority, long timeToLive) throws JMSException {
362         this.send(this.defaultDestination, message, deliveryMode, priority, timeToLive);
363     }
364 
365     /**
366      * Sends a message to a destination for an unidentified message producer.
367      * Uses the <CODE>MessageProducer</CODE>'s default delivery mode,
368      * priority, and time to live.
369      * <P>
370      * Typically, a message producer is assigned a destination at creation
371      * time; however, the JMS API also supports unidentified message producers,
372      * which require that the destination be supplied every time a message is
373      * sent.
374      *
375      * @param destination the destination to send this message to
376      * @param message     the message to send
377      * @throws JMSException                if the JMS provider fails to send the message due to some
378      *                                     internal error.
379      * @throws MessageFormatException      if an invalid message is specified.
380      * @throws InvalidDestinationException if a client uses this method with an invalid destination.
381      * @throws java.lang.UnsupportedOperationException
382      *                                     if a client uses this method with a <CODE>
383      *                                     MessageProducer</CODE> that specified a destination at
384      *                                     creation time.
385      * @see javax.jms.Session#createProducer
386      * @see javax.jms.MessageProducer
387      * @since 1.1
388      */
389     public void send(Destination destination, Message message) throws JMSException {
390         this.send(destination, message, this.defaultDeliveryMode, this.defaultPriority, this.defaultTimeToLive);
391     }
392 
393     /**
394      * Sends a message to a destination for an unidentified message producer,
395      * specifying delivery mode, priority and time to live.
396      * <P>
397      * Typically, a message producer is assigned a destination at creation
398      * time; however, the JMS API also supports unidentified message producers,
399      * which require that the destination be supplied every time a message is
400      * sent.
401      *
402      * @param destination  the destination to send this message to
403      * @param message      the message to send
404      * @param deliveryMode the delivery mode to use
405      * @param priority     the priority for this message
406      * @param timeToLive   the message's lifetime (in milliseconds)
407      * @throws JMSException                if the JMS provider fails to send the message due to some
408      *                                     internal error.
409      * @throws MessageFormatException      if an invalid message is specified.
410      * @throws InvalidDestinationException if a client uses this method with an invalid destination.
411      * @see javax.jms.Session#createProducer
412      * @since 1.1
413      */
414     public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive)
415             throws JMSException {
416         checkClosed();
417         if (destination == null) {
418             throw new UnsupportedOperationException("Dont understand null destinations");
419         }        
420         if( this.defaultDestination!=null && destination!=this.defaultDestination && session.connection.isJ2EEcompliant()) {
421             throw new UnsupportedOperationException("This producer can only send messages to: "+defaultDestination.getPhysicalName());
422         }
423         if (this.defaultDestination==null){
424             this.session.connection.startAdvisoryForTempDestination(destination);
425         }
426         this.session.send(this, destination, message, deliveryMode, priority, timeToLive,reuseMessageId);
427         stats.onMessage(message);
428     }
429     
430     /**
431      * @return Returns the reuseMessageId.
432      */
433     public boolean isResuseMessageId() {
434         return reuseMessageId;
435     }
436     /**
437      * @param reuseMessageId The resuseMessageId to set.
438      */
439     public void setReuseMessageId(boolean reuseMessageId) {
440         this.reuseMessageId = reuseMessageId;
441     }
442 
443 
444     /**
445      * @return Returns the producerId.
446      */
447     protected short getProducerId() {
448         return producerId;
449     }
450     
451     /**
452      * @param producerId The producerId to set.
453      */
454     public void setProducerId(short producerId) {
455         this.producerId = producerId;
456     }
457 
458 
459     protected long getStartTime() {
460         return this.startTime;
461     }
462 
463     protected IdGenerator getIdGenerator() {
464         return this.idGenerator;
465     }
466     
467     protected long getNextSequenceNumber(){
468         return idGenerator.getNextSequence();
469     }
470     
471     protected String getProducerMessageKey(){
472         return idGenerator.getSeed();
473     }
474 }