Source code: org/activemq/io/impl/AbstractPacketMarshaller.java
1 /**
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18 package org.activemq.io.impl;
19
20 import org.activemq.message.Packet;
21 import org.activemq.message.AbstractPacket;
22 import org.activemq.message.ActiveMQDestination;
23 import org.activemq.util.SerializationHelper;
24 import org.activemq.util.BitArray;
25
26 import java.io.IOException;
27 import java.io.DataOutput;
28 import java.io.ByteArrayOutputStream;
29 import java.io.DataOutputStream;
30 import java.io.DataInput;
31
32 /**
33 * @version $Revision: 1.1 $
34 */
35 public abstract class AbstractPacketMarshaller extends AbstractPacketReader implements PacketWriter {
36 protected int wireFormatVersion = DefaultWireFormat.WIRE_FORMAT_VERSION;
37
38
39 /**
40 * simple helper method to ensure null strings are catered for
41 *
42 * @param str
43 * @param dataOut
44 * @throws IOException
45 */
46 protected void writeUTF(String str, DataOutput dataOut) throws IOException {
47 if (str == null) {
48 str = "";
49 }
50 dataOut.writeUTF(str);
51 }
52
53 /**
54 * Reads a destination from the input stream
55 */
56 protected ActiveMQDestination readDestination(DataInput dataIn) throws IOException {
57 return ActiveMQDestination.readFromStream(dataIn);
58 }
59
60 /**
61 * Writes the given destination to the stream
62 */
63 protected void writeDestination(ActiveMQDestination destination, DataOutput dataOut) throws IOException {
64 ActiveMQDestination.writeToStream(destination, dataOut);
65 }
66
67 /**
68 * @param packet
69 * @return true if this PacketWriter can write this type of Packet
70 */
71 public boolean canWrite(Packet packet) {
72 return packet.getPacketType() == this.getPacketType();
73 }
74
75 /**
76 * Simple (but inefficent) utility method to write an object on to a stream
77 *
78 * @param object
79 * @param dataOut
80 * @throws IOException
81 */
82 protected void writeObject(Object object, DataOutput dataOut) throws IOException {
83 if (object != null) {
84 byte[] data = SerializationHelper.writeObject(object);
85 dataOut.writeInt(data.length);
86 dataOut.write(data);
87 }
88 else {
89 dataOut.writeInt(0);
90 }
91 }
92
93 /**
94 * Serializes a Packet int a byte array
95 *
96 * @param packet
97 * @return the byte[]
98 * @throws IOException
99 */
100 public byte[] writePacketToByteArray(Packet packet) throws IOException {
101 ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
102 DataOutputStream dataOut = new DataOutputStream(bytesOut);
103 writePacket(packet, dataOut);
104 dataOut.flush();
105 return bytesOut.toByteArray();
106 }
107
108 /**
109 * Write a Packet instance to data output stream
110 *
111 * @param p the instance to be seralized
112 * @param dataOut the output stream
113 * @throws IOException thrown if an error occurs
114 */
115 public void writePacket(Packet p, DataOutput dataOut) throws IOException {
116 AbstractPacket packet = (AbstractPacket)p;
117 dataOut.writeShort(packet.getId());
118 BitArray ba = packet.getBitArray();
119 ba.set(AbstractPacket.RECEIPT_REQUIRED_INDEX, packet.isReceiptRequired());
120 Object[] visited = packet.getBrokersVisited();
121 boolean writeVisited = visited != null && visited.length > 0;
122 ba.set(AbstractPacket.BROKERS_VISITED_INDEX,writeVisited);
123 ba.writeToStream(dataOut);
124 if (writeVisited){
125 dataOut.writeShort(visited.length);
126 for(int i =0; i < visited.length; i++){
127 dataOut.writeUTF(visited[i].toString());
128 }
129 }
130 }
131
132 /**
133 * Set the wire format version
134 * @param version
135 */
136 public void setWireFormatVersion(int version){
137 this.wireFormatVersion = version;
138 }
139
140 /**
141 * @return the wire format version
142 */
143 public int getWireFormatVersion(){
144 return wireFormatVersion;
145 }
146
147 }