Docjar: A Java Source and Docuemnt Enginecom.*    java.*    javax.*    org.*    all    new    plug-in

Quick Search    Search Deep

Source code: org/activemq/io/impl/MessageAckReader.java


1   /**
2    *
3    * Copyright 2004 Protique Ltd
4    *
5    * Licensed under the Apache License, Version 2.0 (the "License");
6    * you may not use this file except in compliance with the License.
7    * You may obtain a copy of the License at
8    *
9    * http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   *
17   **/
18  
19  package org.activemq.io.impl;
20  import java.io.DataInput;
21  import java.io.IOException;
22  
23  import org.activemq.message.AbstractPacket;
24  import org.activemq.message.ActiveMQDestination;
25  import org.activemq.message.ActiveMQXid;
26  import org.activemq.message.MessageAck;
27  import org.activemq.message.Packet;
28  import org.activemq.util.BitArray;
29  
30  /**
31   * Reads a ConsumerInfo object from a Stream
32   */
33  public class MessageAckReader extends AbstractPacketReader {
34      private AbstractDefaultWireFormat wireFormat;
35  
36      MessageAckReader(AbstractDefaultWireFormat wf) {
37          this.wireFormat = wf;
38      }
39  
40      MessageAckReader() {
41      }
42  
43      /**
44       * Return the type of Packet
45       * 
46       * @return integer representation of the type of Packet
47       */
48      public int getPacketType() {
49          return Packet.ACTIVEMQ_MSG_ACK;
50      }
51  
52      /**
53       * @return a new Packet instance
54       */
55      public Packet createPacket() {
56          return new MessageAck();
57      }
58  
59      /**
60       * build a Packet instance from the data input stream
61       * 
62       * @param packet A Packet object
63       * @param dataIn the data input stream to build the packet from
64       * @throws IOException
65       */
66      public void buildPacket(Packet packet, DataInput dataIn) throws IOException {
67          MessageAck ack = (MessageAck) packet;
68          BitArray ba = ack.getBitArray();
69          ba.readFromStream(dataIn);
70          boolean cachingEnabled = ba.get(MessageAck.CACHED_VALUES_INDEX);
71          ack.setMessageRead(ba.get(MessageAck.MESSAGE_READ_INDEX));
72          ack.setPersistent(ba.get(MessageAck.PERSISTENT_INDEX));
73          ack.setExpired(ba.get(MessageAck.EXPIRED_INDEX));
74          if (ba.get(AbstractPacket.RECEIPT_REQUIRED_INDEX)) {
75              ack.setReceiptRequired(true);
76              ack.setId(dataIn.readShort());
77          }
78          if (ba.get(MessageAck.EXTERNAL_MESSAGE_ID_INDEX)) {
79              ack.setExternalMessageId(true);
80              ack.setMessageID(dataIn.readUTF());
81          }
82          else {
83              if (cachingEnabled) {
84                  short key = dataIn.readShort();
85                  ack.setProducerKey((String) wireFormat.getValueFromReadCache(key));
86              }
87              else {
88                  ack.setProducerKey(dataIn.readUTF());
89              }
90              if (ba.get(MessageAck.LONG_SEQUENCE_INDEX)) {
91                  ack.setSequenceNumber(dataIn.readLong());
92              }
93              else {
94                  ack.setSequenceNumber(dataIn.readInt());
95              }
96          }
97          if (ba.get(AbstractPacket.BROKERS_VISITED_INDEX)) {
98              int visitedLen = dataIn.readShort();
99              for (int i = 0;i < visitedLen;i++) {
100                 ack.addBrokerVisited(dataIn.readUTF());
101             }
102         }
103         if (ba.get(MessageAck.TRANSACTION_ID_INDEX)) {
104             if (cachingEnabled) {
105                 short key = dataIn.readShort();
106                 ack.setTransactionId(wireFormat.getValueFromReadCache(key));
107             } else {
108                 if (ba.get(MessageAck.XA_TRANS_INDEX)) {
109                     ack.setTransactionId(ActiveMQXid.read(dataIn));
110                 }
111                 else {
112                     ack.setTransactionId(super.readUTF(dataIn));
113                 }
114             }
115         }
116         else {
117             ack.setTransactionId(null);
118         }
119         if (cachingEnabled) {
120             short key = dataIn.readShort();
121             ack.setConsumerId((String) wireFormat.getValueFromReadCache(key));
122             key = dataIn.readShort();
123             ack.setDestination((ActiveMQDestination) wireFormat.getValueFromReadCache(key));
124         }
125         else {
126             ack.setConsumerId(dataIn.readUTF());
127             ack.setDestination(ActiveMQDestination.readFromStream(dataIn));
128         }
129     }
130 }