Docjar: A Java Source and Docuemnt Enginecom.*    java.*    javax.*    org.*    all    new    plug-in

Quick Search    Search Deep

Source code: org/activemq/io/impl/MessageAckWriter.java


1   /**
2    *
3    * Copyright 2004 Protique Ltd
4    *
5    * Licensed under the Apache License, Version 2.0 (the "License");
6    * you may not use this file except in compliance with the License.
7    * You may obtain a copy of the License at
8    *
9    * http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   *
17   **/
18  
19  
20  package org.activemq.io.impl;
21  import java.io.DataOutput;
22  import java.io.IOException;
23  
24  import org.activemq.message.AbstractPacket;
25  import org.activemq.message.ActiveMQDestination;
26  import org.activemq.message.ActiveMQXid;
27  import org.activemq.message.MessageAck;
28  import org.activemq.message.Packet;
29  import org.activemq.util.BitArray;
30  
31  /**
32   * Writes a ConsumerInfo object to a Stream
33   */
34  
35  public class MessageAckWriter extends AbstractPacketWriter {
36      private AbstractDefaultWireFormat wireFormat;
37      
38      MessageAckWriter(AbstractDefaultWireFormat wf){
39          this.wireFormat = wf;
40      }
41      
42      MessageAckWriter(){
43      }
44  
45      /**
46       * Return the type of Packet
47       *
48       * @return integer representation of the type of Packet
49       */
50  
51      public int getPacketType() {
52          return Packet.ACTIVEMQ_MSG_ACK;
53      }
54  
55      /**
56       * Write a Packet instance to data output stream
57       *
58       * @param packet  the instance to be seralized
59       * @param dataOut the output stream
60       * @throws IOException thrown if an error occurs
61       */
62  
63      public void writePacket(Packet packet, DataOutput dataOut) throws IOException {
64          MessageAck ack = (MessageAck) packet;
65          
66          boolean cachingEnabled = wireFormat != null ? wireFormat.isCachingEnabled() : false;
67          boolean longSequence = ack.getSequenceNumber() > Integer.MAX_VALUE;
68          
69         
70          Object[] visited = ack.getBrokersVisited();
71          boolean writeVisited = visited != null && visited.length > 0;
72          BitArray ba = ack.getBitArray();
73          ba.reset();
74          ba.set(AbstractPacket.RECEIPT_REQUIRED_INDEX, ack.isReceiptRequired());
75          ba.set(AbstractPacket.BROKERS_VISITED_INDEX,writeVisited);
76          ba.set(MessageAck.MESSAGE_READ_INDEX, ack.isMessageRead());
77          ba.set(MessageAck.TRANSACTION_ID_INDEX, ack.isPartOfTransaction());
78          ba.set(MessageAck.XA_TRANS_INDEX, ack.isXaTransacted());
79          ba.set(MessageAck.PERSISTENT_INDEX,ack.isPersistent());
80          ba.set(MessageAck.EXPIRED_INDEX,ack.isExpired());
81          ba.set(MessageAck.EXTERNAL_MESSAGE_ID_INDEX, ack.isExternalMessageId());
82          ba.set(MessageAck.CACHED_VALUES_INDEX,cachingEnabled);
83          ba.set(MessageAck.LONG_SEQUENCE_INDEX, longSequence);
84          ba.writeToStream(dataOut);
85          
86          if (ack.isReceiptRequired()){
87              dataOut.writeShort(ack.getId());
88          }
89          if (ack.isExternalMessageId()){
90              writeUTF(ack.getMessageID(),dataOut);
91          }else {
92              if (cachingEnabled){
93                  dataOut.writeShort(wireFormat.getWriteCachedKey(ack.getProducerKey()));
94              }else{
95                  writeUTF(ack.getProducerKey(),dataOut);
96              }
97              if (longSequence){
98                  dataOut.writeLong(ack.getSequenceNumber());
99              }else {
100                 dataOut.writeInt((int)ack.getSequenceNumber());
101             }
102         }
103         if (writeVisited){
104             dataOut.writeShort(visited.length);
105             for(int i =0; i < visited.length; i++){
106                 final String brokerName = visited[i].toString();
107                 if (brokerName != null) {
108                     dataOut.writeUTF(brokerName);
109                 }
110             }
111         }
112          
113         if (ack.isPartOfTransaction()) {
114             if (cachingEnabled){
115                 dataOut.writeShort(wireFormat.getWriteCachedKey(ack.getTransactionId()));
116             } else {
117                 if( ack.isXaTransacted()) {
118                     ActiveMQXid xid = (ActiveMQXid) ack.getTransactionId();
119                     xid.write(dataOut);
120                 } else {
121                     super.writeUTF((String) ack.getTransactionId(), dataOut);
122                 }            
123             }
124         }
125 
126         if (cachingEnabled){
127             dataOut.writeShort(wireFormat.getWriteCachedKey(ack.getConsumerId()));
128             dataOut.writeShort(wireFormat.getWriteCachedKey(ack.getDestination()));
129         }else {
130             super.writeUTF(ack.getConsumerId(), dataOut);
131             ActiveMQDestination.writeToStream((ActiveMQDestination) ack.getDestination(), dataOut);
132         }
133     }
134 
135 
136 }