Source code: org/activemq/io/impl/StatelessDefaultWireFormat.java
1 /**
2 *
3 * Copyright 2004 Protique Ltd
4 * Copyright 2004 Hiram Chirino
5 *
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 *
18 **/
19 package org.activemq.io.impl;
20
21 import java.io.ByteArrayOutputStream;
22 import java.io.DataInput;
23 import java.io.DataInputStream;
24 import java.io.DataOutput;
25 import java.io.DataOutputStream;
26 import java.io.IOException;
27 import java.io.ObjectStreamException;
28
29 import org.activeio.PacketData;
30 import org.activeio.adapter.PacketByteArrayOutputStream;
31 import org.activeio.adapter.PacketInputStream;
32 import org.activemq.io.WireFormat;
33 import org.activemq.message.CachedValue;
34 import org.activemq.message.Packet;
35
36 /**
37 * Provides a stateless implementation of AbstractDefaultWireFormat. Safe for use by multiple threads and incurs no locking overhead.
38 *
39 * @version $Revision: 1.1.1.1 $
40 */
41 public class StatelessDefaultWireFormat extends AbstractDefaultWireFormat {
42
43 private static final long serialVersionUID = -2648674156081593006L;
44
45 public Packet writePacket(Packet packet, DataOutput dataOut) throws IOException {
46
47 PacketWriter writer = getWriter(packet);
48 if (writer != null) {
49
50 PacketByteArrayOutputStream internalBytesOut = new PacketByteArrayOutputStream( 50+ (packet.getMemoryUsage()==0 ? 1024 : packet.getMemoryUsage()) );
51 DataOutputStream internalDataOut = new DataOutputStream(internalBytesOut);
52 writer.writePacket(packet, internalDataOut);
53 internalDataOut.close();
54
55 org.activeio.Packet p = internalBytesOut.getPacket();
56 int count = p.remaining();
57
58 dataOut.writeByte(packet.getPacketType());
59 dataOut.writeInt(count);
60 packet.setMemoryUsage(count);
61 p.writeTo(dataOut);
62
63 }
64 return null;
65 }
66
67 /**
68 * Write a Packet to a PacketByteArrayOutputStream
69 *
70 * @param packet
71 * @param dataOut
72 * @return a response packet - or null
73 * @throws IOException
74 */
75 public org.activeio.Packet writePacket(Packet packet, PacketByteArrayOutputStream paos) throws IOException {
76 PacketWriter writer = getWriter(packet);
77 if (writer != null) {
78
79 // We may not be writing to the start of the PAOS.
80 int startPosition = paos.position();
81 // Skip space for the headers.
82 paos.skip(5);
83 // Stream the data.
84 DataOutputStream data = new DataOutputStream(paos);
85 writer.writePacket(packet, data);
86 data.close();
87 org.activeio.Packet rc = paos.getPacket();
88
89 int count = rc.remaining()-(startPosition+5);
90 packet.setMemoryUsage(count);
91
92 // Now write the headers to the packet.
93
94 rc.position(startPosition);
95 PacketData pd = new PacketData(rc);
96 pd.writeByte(packet.getPacketType());
97 pd.writeInt(count);
98 rc.rewind();
99 return rc;
100 }
101 return null;
102 }
103
104 /**
105 * A helper method which converts a packet into a byte array Overrides the WireFormat to make use of the internal
106 * BytesOutputStream
107 *
108 * @param packet
109 * @return a byte array representing the packet using some wire protocol
110 * @throws IOException
111 */
112 public byte[] toBytes(Packet packet) throws IOException {
113
114 byte[] data = null;
115 PacketWriter writer = getWriter(packet);
116 if (writer != null) {
117
118 // Try to guess the right size.
119 ByteArrayOutputStream internalBytesOut = new ByteArrayOutputStream( 50+ (packet.getMemoryUsage()==0 ? 1024 : packet.getMemoryUsage()) );
120 DataOutputStream internalDataOut = new DataOutputStream(internalBytesOut);
121
122 internalBytesOut.reset();
123 internalDataOut.writeByte(packet.getPacketType());
124 internalDataOut.writeInt(-1);//the length
125 writer.writePacket(packet, internalDataOut);
126 internalDataOut.flush();
127 data = internalBytesOut.toByteArray();
128 // lets subtract the header offset from the length
129 int length = data.length - 5;
130 packet.setMemoryUsage(length);
131 //write in the length to the data
132 data[1] = (byte) ((length >>> 24) & 0xFF);
133 data[2] = (byte) ((length >>> 16) & 0xFF);
134 data[3] = (byte) ((length >>> 8) & 0xFF);
135 data[4] = (byte) ((length >>> 0) & 0xFF);
136 }
137 return data;
138 }
139
140 protected Packet readPacket(DataInput dataIn, PacketReader reader) throws IOException {
141 Packet packet = reader.createPacket();
142 int length = dataIn.readInt();
143 packet.setMemoryUsage(length);
144 reader.buildPacket(packet, dataIn);
145 return packet;
146 }
147
148 /**
149 * @param dataIn
150 * @return
151 * @throws IOException
152 */
153 public Packet readPacket(org.activeio.Packet dataIn) throws IOException {
154 return readPacket(new DataInputStream(new PacketInputStream(dataIn)));
155 }
156
157 protected void handleCachedValue(CachedValue cv) {
158 throw new IllegalStateException("Value caching is not supported.");
159 }
160
161 public Object getValueFromReadCache(short key) {
162 throw new IllegalStateException("Value caching is not supported.");
163 }
164
165 short getWriteCachedKey(Object value) {
166 throw new IllegalStateException("Value caching is not supported.");
167 }
168
169 public boolean isCachingEnabled() {
170 return false;
171 }
172
173 public WireFormat copy() {
174 return new StatelessDefaultWireFormat();
175 }
176
177 private Object readResolve() throws ObjectStreamException {
178 return new DefaultWireFormat();
179 }
180
181 }