Docjar: A Java Source and Docuemnt Enginecom.*    java.*    javax.*    org.*    all    new    plug-in

Quick Search    Search Deep

Source code: org/activemq/transport/stomp/Subscription.java


1   /*
2    * Copyright (c) 2005 Your Corporation. All Rights Reserved.
3    */
4   package org.activemq.transport.stomp;
5   
6   import org.activemq.message.ActiveMQDestination;
7   import org.activemq.message.ActiveMQTextMessage;
8   import org.activemq.message.MessageAck;
9   import org.activemq.message.ConsumerInfo;
10  import org.activemq.message.ActiveMQBytesMessage;
11  
12  import javax.jms.JMSException;
13  import javax.jms.DeliveryMode;
14  import java.io.DataOutput;
15  import java.io.IOException;
16  
17  class Subscription
18  {
19      private ActiveMQDestination destination;
20      private int ackMode = 1;
21      private StompWireFormat format;
22      private short consumer_no;
23      private final String consumerId;
24      private final String subscriptionId;
25      public static final String NO_ID = "~~ NO SUCH THING ~~%%@#!Q";
26  
27      Subscription(StompWireFormat format, short consumer_no, String consumerId, String subscriptionId)
28      {
29          this.format = format;
30          this.consumer_no = consumer_no;
31          this.consumerId = consumerId;
32          this.subscriptionId = subscriptionId;
33      }
34  
35      void setDestination(ActiveMQDestination actual_dest)
36      {
37          this.destination = actual_dest;
38      }
39  
40      void receive(ActiveMQTextMessage msg, DataOutput out) throws IOException, JMSException
41      {
42          if (ackMode == CLIENT_ACK)
43          {
44              AckListener listener = new AckListener(msg, consumerId, consumer_no, subscriptionId);
45              format.addAckListener(listener);
46          }
47          else if (ackMode == AUTO_ACK)
48          {
49              MessageAck ack = new MessageAck();
50  //            if (format.isInTransaction()) ack.setTransactionIDString(format.getTransactionId());
51              ack.setDestination(msg.getJMSActiveMQDestination());
52              ack.setConsumerId(consumerId);
53              ack.setMessageID(msg.getJMSMessageID());
54              ack.setMessageRead(true);
55              ack.setSessionId(format.getSessionId());
56              ack.setProducerKey(msg.getProducerKey());
57              ack.setSequenceNumber(msg.getSequenceNumber());
58              ack.setPersistent(msg.getJMSDeliveryMode() == DeliveryMode.PERSISTENT);
59              format.enqueuePacket(ack);
60          }
61          FrameBuilder builder = new FrameBuilder(Stomp.Responses.MESSAGE)
62                  .addHeaders(msg)
63                  .setBody(msg.getText().getBytes());
64          if (!subscriptionId.equals(NO_ID))
65          {
66              builder.addHeader(Stomp.Headers.Message.SUBSCRIPTION, subscriptionId);
67          }
68          out.write(builder.toFrame());
69      }
70  
71      void receive(ActiveMQBytesMessage msg, DataOutput out) throws IOException, JMSException
72      {
73          // @todo refactor this and the other receive form to remoce duplication -bmc
74          if (ackMode == CLIENT_ACK)
75          {
76              AckListener listener = new AckListener(msg, consumerId, consumer_no, subscriptionId);
77              format.addAckListener(listener);
78          }
79          else if (ackMode == AUTO_ACK)
80          {
81              MessageAck ack = new MessageAck();
82  //            if (format.isInTransaction()) ack.setTransactionIDString(format.getTransactionId());
83              ack.setDestination(msg.getJMSActiveMQDestination());
84              ack.setConsumerId(consumerId);
85              ack.setMessageID(msg.getJMSMessageID());
86              ack.setMessageRead(true);
87              ack.setSessionId(format.getSessionId());
88              ack.setProducerKey(msg.getProducerKey());
89              ack.setSequenceNumber(msg.getSequenceNumber());
90              ack.setPersistent(msg.getJMSDeliveryMode() == DeliveryMode.PERSISTENT);
91              format.enqueuePacket(ack);
92          }
93          FrameBuilder builder = new FrameBuilder(Stomp.Responses.MESSAGE)
94                  .addHeaders(msg)
95                  .setBody(msg.getBodyAsBytes().getBuf());
96          if (!subscriptionId.equals(NO_ID))
97          {
98              builder.addHeader(Stomp.Headers.Message.SUBSCRIPTION, subscriptionId);
99          }
100         out.write(builder.toFrame());
101     }
102 
103     ActiveMQDestination getDestination()
104     {
105         return destination;
106     }
107 
108     static final int AUTO_ACK = 1;
109     static final int CLIENT_ACK = 2;
110 
111     public void setAckMode(int clientAck)
112     {
113         this.ackMode = clientAck;
114     }
115 
116     public ConsumerInfo close()
117     {
118         ConsumerInfo unsub = new ConsumerInfo();
119         unsub.setStarted(false);
120         unsub.setDestination(this.destination);
121         unsub.setClientId(format.getClientId());
122         unsub.setConsumerNo(consumer_no);
123         return unsub;
124     }
125 }