Source code: org/activemq/transport/stomp/Subscription.java
1 /*
2 * Copyright (c) 2005 Your Corporation. All Rights Reserved.
3 */
4 package org.activemq.transport.stomp;
5
6 import org.activemq.message.ActiveMQDestination;
7 import org.activemq.message.ActiveMQTextMessage;
8 import org.activemq.message.MessageAck;
9 import org.activemq.message.ConsumerInfo;
10 import org.activemq.message.ActiveMQBytesMessage;
11
12 import javax.jms.JMSException;
13 import javax.jms.DeliveryMode;
14 import java.io.DataOutput;
15 import java.io.IOException;
16
17 class Subscription
18 {
19 private ActiveMQDestination destination;
20 private int ackMode = 1;
21 private StompWireFormat format;
22 private short consumer_no;
23 private final String consumerId;
24 private final String subscriptionId;
25 public static final String NO_ID = "~~ NO SUCH THING ~~%%@#!Q";
26
27 Subscription(StompWireFormat format, short consumer_no, String consumerId, String subscriptionId)
28 {
29 this.format = format;
30 this.consumer_no = consumer_no;
31 this.consumerId = consumerId;
32 this.subscriptionId = subscriptionId;
33 }
34
35 void setDestination(ActiveMQDestination actual_dest)
36 {
37 this.destination = actual_dest;
38 }
39
40 void receive(ActiveMQTextMessage msg, DataOutput out) throws IOException, JMSException
41 {
42 if (ackMode == CLIENT_ACK)
43 {
44 AckListener listener = new AckListener(msg, consumerId, consumer_no, subscriptionId);
45 format.addAckListener(listener);
46 }
47 else if (ackMode == AUTO_ACK)
48 {
49 MessageAck ack = new MessageAck();
50 // if (format.isInTransaction()) ack.setTransactionIDString(format.getTransactionId());
51 ack.setDestination(msg.getJMSActiveMQDestination());
52 ack.setConsumerId(consumerId);
53 ack.setMessageID(msg.getJMSMessageID());
54 ack.setMessageRead(true);
55 ack.setSessionId(format.getSessionId());
56 ack.setProducerKey(msg.getProducerKey());
57 ack.setSequenceNumber(msg.getSequenceNumber());
58 ack.setPersistent(msg.getJMSDeliveryMode() == DeliveryMode.PERSISTENT);
59 format.enqueuePacket(ack);
60 }
61 FrameBuilder builder = new FrameBuilder(Stomp.Responses.MESSAGE)
62 .addHeaders(msg)
63 .setBody(msg.getText().getBytes());
64 if (!subscriptionId.equals(NO_ID))
65 {
66 builder.addHeader(Stomp.Headers.Message.SUBSCRIPTION, subscriptionId);
67 }
68 out.write(builder.toFrame());
69 }
70
71 void receive(ActiveMQBytesMessage msg, DataOutput out) throws IOException, JMSException
72 {
73 // @todo refactor this and the other receive form to remoce duplication -bmc
74 if (ackMode == CLIENT_ACK)
75 {
76 AckListener listener = new AckListener(msg, consumerId, consumer_no, subscriptionId);
77 format.addAckListener(listener);
78 }
79 else if (ackMode == AUTO_ACK)
80 {
81 MessageAck ack = new MessageAck();
82 // if (format.isInTransaction()) ack.setTransactionIDString(format.getTransactionId());
83 ack.setDestination(msg.getJMSActiveMQDestination());
84 ack.setConsumerId(consumerId);
85 ack.setMessageID(msg.getJMSMessageID());
86 ack.setMessageRead(true);
87 ack.setSessionId(format.getSessionId());
88 ack.setProducerKey(msg.getProducerKey());
89 ack.setSequenceNumber(msg.getSequenceNumber());
90 ack.setPersistent(msg.getJMSDeliveryMode() == DeliveryMode.PERSISTENT);
91 format.enqueuePacket(ack);
92 }
93 FrameBuilder builder = new FrameBuilder(Stomp.Responses.MESSAGE)
94 .addHeaders(msg)
95 .setBody(msg.getBodyAsBytes().getBuf());
96 if (!subscriptionId.equals(NO_ID))
97 {
98 builder.addHeader(Stomp.Headers.Message.SUBSCRIPTION, subscriptionId);
99 }
100 out.write(builder.toFrame());
101 }
102
103 ActiveMQDestination getDestination()
104 {
105 return destination;
106 }
107
108 static final int AUTO_ACK = 1;
109 static final int CLIENT_ACK = 2;
110
111 public void setAckMode(int clientAck)
112 {
113 this.ackMode = clientAck;
114 }
115
116 public ConsumerInfo close()
117 {
118 ConsumerInfo unsub = new ConsumerInfo();
119 unsub.setStarted(false);
120 unsub.setDestination(this.destination);
121 unsub.setClientId(format.getClientId());
122 unsub.setConsumerNo(consumer_no);
123 return unsub;
124 }
125 }