Source code: org/activemq/io/impl/MessageAckWriter.java
1 /**
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18
19
20 package org.activemq.io.impl;
21 import java.io.DataOutput;
22 import java.io.IOException;
23
24 import org.activemq.message.AbstractPacket;
25 import org.activemq.message.ActiveMQDestination;
26 import org.activemq.message.ActiveMQXid;
27 import org.activemq.message.MessageAck;
28 import org.activemq.message.Packet;
29 import org.activemq.util.BitArray;
30
31 /**
32 * Writes a ConsumerInfo object to a Stream
33 */
34
35 public class MessageAckWriter extends AbstractPacketWriter {
36 private AbstractDefaultWireFormat wireFormat;
37
38 MessageAckWriter(AbstractDefaultWireFormat wf){
39 this.wireFormat = wf;
40 }
41
42 MessageAckWriter(){
43 }
44
45 /**
46 * Return the type of Packet
47 *
48 * @return integer representation of the type of Packet
49 */
50
51 public int getPacketType() {
52 return Packet.ACTIVEMQ_MSG_ACK;
53 }
54
55 /**
56 * Write a Packet instance to data output stream
57 *
58 * @param packet the instance to be seralized
59 * @param dataOut the output stream
60 * @throws IOException thrown if an error occurs
61 */
62
63 public void writePacket(Packet packet, DataOutput dataOut) throws IOException {
64 MessageAck ack = (MessageAck) packet;
65
66 boolean cachingEnabled = wireFormat != null ? wireFormat.isCachingEnabled() : false;
67 boolean longSequence = ack.getSequenceNumber() > Integer.MAX_VALUE;
68
69
70 Object[] visited = ack.getBrokersVisited();
71 boolean writeVisited = visited != null && visited.length > 0;
72 BitArray ba = ack.getBitArray();
73 ba.reset();
74 ba.set(AbstractPacket.RECEIPT_REQUIRED_INDEX, ack.isReceiptRequired());
75 ba.set(AbstractPacket.BROKERS_VISITED_INDEX,writeVisited);
76 ba.set(MessageAck.MESSAGE_READ_INDEX, ack.isMessageRead());
77 ba.set(MessageAck.TRANSACTION_ID_INDEX, ack.isPartOfTransaction());
78 ba.set(MessageAck.XA_TRANS_INDEX, ack.isXaTransacted());
79 ba.set(MessageAck.PERSISTENT_INDEX,ack.isPersistent());
80 ba.set(MessageAck.EXPIRED_INDEX,ack.isExpired());
81 ba.set(MessageAck.EXTERNAL_MESSAGE_ID_INDEX, ack.isExternalMessageId());
82 ba.set(MessageAck.CACHED_VALUES_INDEX,cachingEnabled);
83 ba.set(MessageAck.LONG_SEQUENCE_INDEX, longSequence);
84 ba.writeToStream(dataOut);
85
86 if (ack.isReceiptRequired()){
87 dataOut.writeShort(ack.getId());
88 }
89 if (ack.isExternalMessageId()){
90 writeUTF(ack.getMessageID(),dataOut);
91 }else {
92 if (cachingEnabled){
93 dataOut.writeShort(wireFormat.getWriteCachedKey(ack.getProducerKey()));
94 }else{
95 writeUTF(ack.getProducerKey(),dataOut);
96 }
97 if (longSequence){
98 dataOut.writeLong(ack.getSequenceNumber());
99 }else {
100 dataOut.writeInt((int)ack.getSequenceNumber());
101 }
102 }
103 if (writeVisited){
104 dataOut.writeShort(visited.length);
105 for(int i =0; i < visited.length; i++){
106 final String brokerName = visited[i].toString();
107 if (brokerName != null) {
108 dataOut.writeUTF(brokerName);
109 }
110 }
111 }
112
113 if (ack.isPartOfTransaction()) {
114 if (cachingEnabled){
115 dataOut.writeShort(wireFormat.getWriteCachedKey(ack.getTransactionId()));
116 } else {
117 if( ack.isXaTransacted()) {
118 ActiveMQXid xid = (ActiveMQXid) ack.getTransactionId();
119 xid.write(dataOut);
120 } else {
121 super.writeUTF((String) ack.getTransactionId(), dataOut);
122 }
123 }
124 }
125
126 if (cachingEnabled){
127 dataOut.writeShort(wireFormat.getWriteCachedKey(ack.getConsumerId()));
128 dataOut.writeShort(wireFormat.getWriteCachedKey(ack.getDestination()));
129 }else {
130 super.writeUTF(ack.getConsumerId(), dataOut);
131 ActiveMQDestination.writeToStream((ActiveMQDestination) ack.getDestination(), dataOut);
132 }
133 }
134
135
136 }