Source code: org/activemq/io/impl/MessageAckReader.java
1 /**
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18
19 package org.activemq.io.impl;
20 import java.io.DataInput;
21 import java.io.IOException;
22
23 import org.activemq.message.AbstractPacket;
24 import org.activemq.message.ActiveMQDestination;
25 import org.activemq.message.ActiveMQXid;
26 import org.activemq.message.MessageAck;
27 import org.activemq.message.Packet;
28 import org.activemq.util.BitArray;
29
30 /**
31 * Reads a ConsumerInfo object from a Stream
32 */
33 public class MessageAckReader extends AbstractPacketReader {
34 private AbstractDefaultWireFormat wireFormat;
35
36 MessageAckReader(AbstractDefaultWireFormat wf) {
37 this.wireFormat = wf;
38 }
39
40 MessageAckReader() {
41 }
42
43 /**
44 * Return the type of Packet
45 *
46 * @return integer representation of the type of Packet
47 */
48 public int getPacketType() {
49 return Packet.ACTIVEMQ_MSG_ACK;
50 }
51
52 /**
53 * @return a new Packet instance
54 */
55 public Packet createPacket() {
56 return new MessageAck();
57 }
58
59 /**
60 * build a Packet instance from the data input stream
61 *
62 * @param packet A Packet object
63 * @param dataIn the data input stream to build the packet from
64 * @throws IOException
65 */
66 public void buildPacket(Packet packet, DataInput dataIn) throws IOException {
67 MessageAck ack = (MessageAck) packet;
68 BitArray ba = ack.getBitArray();
69 ba.readFromStream(dataIn);
70 boolean cachingEnabled = ba.get(MessageAck.CACHED_VALUES_INDEX);
71 ack.setMessageRead(ba.get(MessageAck.MESSAGE_READ_INDEX));
72 ack.setPersistent(ba.get(MessageAck.PERSISTENT_INDEX));
73 ack.setExpired(ba.get(MessageAck.EXPIRED_INDEX));
74 if (ba.get(AbstractPacket.RECEIPT_REQUIRED_INDEX)) {
75 ack.setReceiptRequired(true);
76 ack.setId(dataIn.readShort());
77 }
78 if (ba.get(MessageAck.EXTERNAL_MESSAGE_ID_INDEX)) {
79 ack.setExternalMessageId(true);
80 ack.setMessageID(dataIn.readUTF());
81 }
82 else {
83 if (cachingEnabled) {
84 short key = dataIn.readShort();
85 ack.setProducerKey((String) wireFormat.getValueFromReadCache(key));
86 }
87 else {
88 ack.setProducerKey(dataIn.readUTF());
89 }
90 if (ba.get(MessageAck.LONG_SEQUENCE_INDEX)) {
91 ack.setSequenceNumber(dataIn.readLong());
92 }
93 else {
94 ack.setSequenceNumber(dataIn.readInt());
95 }
96 }
97 if (ba.get(AbstractPacket.BROKERS_VISITED_INDEX)) {
98 int visitedLen = dataIn.readShort();
99 for (int i = 0;i < visitedLen;i++) {
100 ack.addBrokerVisited(dataIn.readUTF());
101 }
102 }
103 if (ba.get(MessageAck.TRANSACTION_ID_INDEX)) {
104 if (cachingEnabled) {
105 short key = dataIn.readShort();
106 ack.setTransactionId(wireFormat.getValueFromReadCache(key));
107 } else {
108 if (ba.get(MessageAck.XA_TRANS_INDEX)) {
109 ack.setTransactionId(ActiveMQXid.read(dataIn));
110 }
111 else {
112 ack.setTransactionId(super.readUTF(dataIn));
113 }
114 }
115 }
116 else {
117 ack.setTransactionId(null);
118 }
119 if (cachingEnabled) {
120 short key = dataIn.readShort();
121 ack.setConsumerId((String) wireFormat.getValueFromReadCache(key));
122 key = dataIn.readShort();
123 ack.setDestination((ActiveMQDestination) wireFormat.getValueFromReadCache(key));
124 }
125 else {
126 ack.setConsumerId(dataIn.readUTF());
127 ack.setDestination(ActiveMQDestination.readFromStream(dataIn));
128 }
129 }
130 }