Source code: org/activemq/io/AbstractWireFormat.java
1 /**
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18
19 package org.activemq.io;
20 import java.io.ByteArrayInputStream;
21 import java.io.ByteArrayOutputStream;
22 import java.io.DataInput;
23 import java.io.DataInputStream;
24 import java.io.DataOutputStream;
25 import java.io.IOException;
26 import java.net.DatagramPacket;
27
28 import javax.jms.JMSException;
29
30 import org.activemq.message.Packet;
31 import org.apache.commons.logging.Log;
32 import org.apache.commons.logging.LogFactory;
33
34 /**
35 * Represents a strategy of encoding packets on the wire or on disk using some kind of serialization or wire format.
36 * <p/>We use a default efficient format for Java to Java communication but other formats to other systems can be used,
37 * such as using simple text strings when talking to JavaScript or coming up with other formats for talking to C / C#
38 * languages or proprietary messaging systems we wish to interface with at the wire level etc.
39 *
40 * @version $Revision: 1.1.1.1 $
41 */
42 public abstract class AbstractWireFormat implements WireFormat {
43 private static final Log log = LogFactory.getLog(AbstractWireFormat.class);
44 protected DataOutputStream transportDataOut;
45 protected DataInputStream transportDataIn;
46 protected boolean cachingEnabled;
47
48 /**
49 * Read a packet from a Datagram packet from the given channelID. If the packet is from the same channel ID as it
50 * was sent then we have a loop-back so discard the packet
51 *
52 * @param channelID is the unique channel ID
53 * @param dpacket
54 * @return the packet read from the datagram or null if it should be discarded
55 * @throws IOException
56 */
57 public Packet readPacket(String channelID, DatagramPacket dpacket) throws IOException {
58 DataInput in = new DataInputStream(new ByteArrayInputStream(dpacket.getData(), dpacket.getOffset(), dpacket
59 .getLength()));
60 String id = in.readUTF();
61 if (channelID == null) {
62 log
63 .trace("We do not have a channelID which is probably caused by a synchronization issue, we're receiving messages before we're fully initialised");
64 }
65 else if (channelID.equals(id)) {
66 if (log.isTraceEnabled()) {
67 log.trace("Discarding packet from id: " + id);
68 }
69 return null;
70 }
71 int type = in.readByte();
72 Packet packet = readPacket(type, in);
73 // if (packet instanceof ActiveMQMessage) {
74 // System.out.println("##### read packet from channel: " + id + " in channel: " + channelID + " message: " +
75 // packet);
76 // }
77 //
78 return packet;
79 }
80
81 /**
82 * Writes the given package to a new datagram
83 *
84 * @param channelID is the unique channel ID
85 * @param packet is the packet to write
86 * @return @throws IOException
87 * @throws JMSException
88 */
89 public DatagramPacket writePacket(String channelID, Packet packet) throws IOException, JMSException {
90 ByteArrayOutputStream out = new ByteArrayOutputStream();
91 DataOutputStream dataOut = new DataOutputStream(out);
92 channelID = channelID != null ? channelID : "";
93 dataOut.writeUTF(channelID);
94 writePacket(packet, dataOut);
95 dataOut.close();
96 byte[] data = out.toByteArray();
97 return new DatagramPacket(data, data.length);
98 }
99
100 /**
101 * Reads the packet from the given byte[]
102 *
103 * @param bytes
104 * @param offset
105 * @param length
106 * @return @throws IOException
107 */
108 public Packet fromBytes(byte[] bytes, int offset, int length) throws IOException {
109 DataInput in = new DataInputStream(new ByteArrayInputStream(bytes, offset, length));
110 return readPacket(in);
111 }
112
113 /**
114 * Reads the packet from the given byte[]
115 *
116 * @param bytes
117 * @return @throws IOException
118 */
119 public Packet fromBytes(byte[] bytes) throws IOException {
120 DataInput in = new DataInputStream(new ByteArrayInputStream(bytes));
121 return readPacket(in);
122 }
123
124 /**
125 * A helper method which converts a packet into a byte array
126 *
127 * @param packet
128 * @return a byte array representing the packet using some wire protocol
129 * @throws IOException
130 * @throws JMSException
131 */
132 public byte[] toBytes(Packet packet) throws IOException, JMSException {
133 ByteArrayOutputStream out = new ByteArrayOutputStream();
134 DataOutputStream dataOut = new DataOutputStream(out);
135 writePacket(packet, dataOut);
136 dataOut.close();
137 return out.toByteArray();
138 }
139
140 /**
141 * some transports may register their streams (e.g. Tcp)
142 *
143 * @param dataOut
144 * @param dataIn
145 */
146 public void registerTransportStreams(DataOutputStream dataOut, DataInputStream dataIn) {
147 transportDataOut = dataOut;
148 transportDataIn = dataIn;
149 }
150
151 /**
152 * Some wire formats require a handshake at start-up
153 *
154 * @throws IOException
155 */
156 public void initiateClientSideProtocol() throws IOException {
157 }
158
159 /**
160 * Some wire formats require a handshake at start-up
161 *
162 * @throws IOException
163 */
164 public void initiateServerSideProtocol() throws IOException {
165 }
166
167
168 /**
169 * @return Returns the enableCaching.
170 */
171 public boolean isCachingEnabled() {
172 return cachingEnabled;
173 }
174
175 /**
176 * @param enableCaching The enableCaching to set.
177 */
178 public void setCachingEnabled(boolean enableCaching) {
179 this.cachingEnabled = enableCaching;
180 }
181
182 /**
183 * some wire formats will implement their own fragementation
184 * @return true unless a wire format supports it's own fragmentation
185 */
186 public boolean doesSupportMessageFragmentation(){
187 return true;
188 }
189
190
191 /**
192 * Some wire formats will not be able to understand compressed messages
193 * @return true unless a wire format cannot understand compression
194 */
195 public boolean doesSupportMessageCompression(){
196 return true;
197 }
198 /**
199 * @return Returns the transportDataOut.
200 */
201 public DataOutputStream getTransportDataOut() {
202 return transportDataOut;
203 }
204 /**
205 * @param transportDataOut The transportDataOut to set.
206 */
207 public void setTransportDataOut(DataOutputStream transportDataOut) {
208 this.transportDataOut = transportDataOut;
209 }
210 /**
211 * @return Returns the transportDataIn.
212 */
213 public DataInputStream getTransportDataIn() {
214 return transportDataIn;
215 }
216 /**
217 * @param transportDataIn The transportDataIn to set.
218 */
219 public void setTransportDataIn(DataInputStream transportDataIn) {
220 this.transportDataIn = transportDataIn;
221 }
222
223 /**
224 * @param dataIn
225 * @return
226 * @throws java.io.IOException
227 */
228 public Packet readPacket(DataInput dataIn) throws IOException {
229 int type = -1;
230 while ((type = dataIn.readByte()) == 0);
231
232 if (type == -1){
233 throw new IOException("InputStream now closed");
234 }
235 return readPacket(type, dataIn);
236 }
237 }