Source code: org/activemq/transport/openwire/OpenWirePacketMarshallerSupport.java
1 /**
2 *
3 * Copyright 2005 LogicBlaze, Inc. http://www.logicblaze.com
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18 package org.activemq.transport.openwire;
19
20 import org.activemq.io.impl.AbstractPacketMarshaller;
21 import org.activemq.message.ActiveMQDestination;
22 import org.activemq.message.ActiveMQQueue;
23 import org.activemq.message.ActiveMQTemporaryQueue;
24 import org.activemq.message.ActiveMQTemporaryTopic;
25 import org.activemq.message.ActiveMQTopic;
26 import org.activemq.message.Packet;
27 import org.activemq.message.AbstractPacket;
28
29 import javax.jms.Destination;
30 import javax.jms.JMSException;
31 import javax.jms.TemporaryQueue;
32 import javax.jms.TemporaryTopic;
33 import javax.jms.Topic;
34 import java.io.ByteArrayInputStream;
35 import java.io.ByteArrayOutputStream;
36 import java.io.DataInput;
37 import java.io.DataOutput;
38 import java.io.IOException;
39 import java.util.Map;
40 import java.util.Properties;
41
42 /**
43 * @version $Revision: 1.1 $
44 */
45 public abstract class OpenWirePacketMarshallerSupport extends AbstractPacketMarshaller {
46
47 public void writePacket(Packet packet, DataOutput dataOut) throws IOException {
48 dataOut.writeShort(packet.getId());
49 writeBoolean(packet.isReceiptRequired(), dataOut);
50 writeString(packet.getBrokersVisitedAsString(), dataOut);
51 }
52
53 public void buildPacket(Packet p, DataInput dataIn) throws IOException {
54 AbstractPacket packet = (AbstractPacket) p;
55 packet.setId(dataIn.readShort());
56 packet.setReceiptRequired(readBoolean(dataIn));
57 packet.setBrokersVisitedAsString(readString(dataIn));
58 }
59
60 public void writeProperties(Map map, DataOutput dataOut) throws IOException {
61 if (map instanceof Properties || map == null) {
62 writeProperties((Properties) map, dataOut);
63 }
64 else {
65 Properties properties = new Properties();
66 properties.putAll(map);
67 writeProperties(properties, dataOut);
68 }
69 }
70
71 public void writeProperties(Properties properties, DataOutput dataOut) throws IOException {
72 // lets turn the propertes into a string
73 String text = "";
74 if (properties != null) {
75 ByteArrayOutputStream out = new ByteArrayOutputStream();
76 properties.store(out, null);
77 text = new String(out.toByteArray());
78 }
79 dataOut.writeUTF(text);
80 }
81
82 public Properties readProperties(DataInput dataIn) throws IOException {
83 Properties properties = null;
84 String text = dataIn.readUTF();
85 if (text != null) {
86 if (text.length() > 0) {
87 properties = new Properties();
88 properties.load(new ByteArrayInputStream(text.getBytes()));
89 }
90 }
91 return properties;
92 }
93
94
95 protected ActiveMQDestination readDestination(DataInput dataIn) throws IOException {
96 int data = dataIn.readByte();
97 String text = readString(dataIn);
98 switch (data) {
99 case ActiveMQDestination.ACTIVEMQ_QUEUE:
100 return new ActiveMQQueue(text);
101
102 case ActiveMQDestination.ACTIVEMQ_TEMPORARY_QUEUE:
103 return new ActiveMQTemporaryQueue(text);
104
105 case ActiveMQDestination.ACTIVEMQ_TEMPORARY_TOPIC:
106 return new ActiveMQTemporaryTopic(text);
107
108 case ActiveMQDestination.ACTIVEMQ_TOPIC:
109 return new ActiveMQTopic(text);
110
111 default:
112 throw new IOException("Unknown destination type: " + data);
113 }
114 }
115
116 protected void writeDestination(Destination destination, DataOutput dataOut) throws IOException {
117 try {
118 writeDestination(ActiveMQDestination.transformDestination(destination), dataOut);
119 }
120 catch (JMSException e) {
121 throw new IOException("Failed to convert destination into ActiveMQDestination: " + e);
122 }
123 }
124
125 protected void writeDestination(ActiveMQDestination destination, DataOutput dataOut) throws IOException {
126 String text = null;
127 int data = ActiveMQDestination.ACTIVEMQ_QUEUE;
128 if (destination instanceof Topic) {
129 if (destination instanceof TemporaryTopic) {
130 data = ActiveMQDestination.ACTIVEMQ_TEMPORARY_TOPIC;
131 }
132 else {
133 data = ActiveMQDestination.ACTIVEMQ_TOPIC;
134 }
135 }
136 else if (destination instanceof TemporaryQueue) {
137 data = ActiveMQDestination.ACTIVEMQ_TEMPORARY_QUEUE;
138 }
139 if (destination != null) {
140 text = destination.getPhysicalName();
141 }
142 dataOut.writeByte(data);
143 writeString(text, dataOut);
144 }
145
146 protected String readString(DataInput dataIn) throws IOException {
147 // lets write double byte text
148 int length = dataIn.readInt();
149 if (length < 0) {
150 return null;
151 }
152 byte[] bytes = new byte[length];
153 dataIn.readFully(bytes);
154 return new String(bytes);
155 }
156
157 protected void writeString(String text, DataOutput dataOut) throws IOException {
158 if (text == null) {
159 dataOut.writeInt(-1);
160 }
161 else {
162 byte[] bytes = text.getBytes();
163 dataOut.writeInt(bytes.length);
164 dataOut.write(bytes);
165 }
166 }
167
168 protected byte[] readBytes(DataInput dataIn) throws IOException {
169 int length = dataIn.readInt();
170 if (length < 0) {
171 return null;
172 }
173 byte[] answer = new byte[length];
174 dataIn.readFully(answer);
175 return answer;
176 }
177
178 protected void writeBytes(byte[] data, DataOutput dataOut) throws IOException {
179 if (data == null) {
180 dataOut.writeInt(-1);
181 }
182 else {
183 dataOut.writeInt(data.length);
184 dataOut.write(data);
185 }
186 }
187
188 protected void writeBoolean(boolean value, DataOutput dataOut) throws IOException {
189 dataOut.writeByte(value ? 1 : 0);
190 }
191
192 protected boolean readBoolean(DataInput dataIn) throws IOException {
193 return dataIn.readByte() > 0;
194 }
195 }