Source code: org/activemq/io/impl/DefaultWireFormat.java
1 /**
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18
19 package org.activemq.io.impl;
20 import java.io.DataInput;
21 import java.io.DataInputStream;
22 import java.io.DataOutput;
23 import java.io.DataOutputStream;
24 import java.io.IOException;
25 import java.io.ObjectStreamException;
26 import java.io.Serializable;
27 import java.util.Arrays;
28 import java.util.HashMap;
29 import java.util.Map;
30
31 import org.activemq.io.WireFormat;
32 import org.activemq.io.util.WireByteArrayInputStream;
33 import org.activemq.io.util.WireByteArrayOutputStream;
34 import org.activemq.message.CachedValue;
35 import org.activemq.message.Packet;
36
37 /**
38 * This is a stateful AbstractDefaultWireFormat which implements value caching. Not optimal for use by
39 * many concurrent threads. One DefaultWireFormat is typically allocated per client connection.
40 *
41 * @version $Revision: 1.1.1.1 $
42 */
43 public class DefaultWireFormat extends AbstractDefaultWireFormat implements Serializable {
44
45 private static final long serialVersionUID = -1454851936411678612L;
46
47 private static final int MAX_CACHE_SIZE = Short.MAX_VALUE/2; //needs to be a lot less than Short.MAX_VALUE
48
49 static final short NULL_VALUE = -1;
50 static final short CLEAR_CACHE = -2;
51
52 //
53 // Fields used during a write.
54 //
55 protected transient final Object writeMutex = new Object();
56 protected transient WireByteArrayOutputStream internalBytesOut;
57 protected transient DataOutputStream internalDataOut;
58 protected transient WireByteArrayOutputStream cachedBytesOut;
59 protected transient DataOutputStream cachedDataOut;
60 private Map writeValueCache = new HashMap();
61 protected transient short cachedKeyGenerator;
62 protected transient short lastWriteValueCacheEvictionPosition=500;
63
64 //
65 // Fields used during a read.
66 //
67 protected transient final Object readMutex = new Object();
68 protected transient WireByteArrayInputStream internalBytesIn;
69 protected transient DataInputStream internalDataIn;
70 private Object[] writeValueCacheArray = new Object[MAX_CACHE_SIZE];
71 private Object[] readValueCacheArray = new Object[MAX_CACHE_SIZE];
72
73
74 /**
75 * Default Constructor
76 */
77 public DefaultWireFormat() {
78 internalBytesOut = new WireByteArrayOutputStream();
79 internalDataOut = new DataOutputStream(internalBytesOut);
80 internalBytesIn = new WireByteArrayInputStream();
81 internalDataIn = new DataInputStream(internalBytesIn);
82 this.currentWireFormatVersion = WIRE_FORMAT_VERSION;
83 this.cachedBytesOut = new WireByteArrayOutputStream();
84 this.cachedDataOut = new DataOutputStream(cachedBytesOut);
85 }
86
87 /**
88 * @return new WireFormat
89 */
90 public WireFormat copy() {
91 DefaultWireFormat format = new DefaultWireFormat();
92 format.setCachingEnabled(cachingEnabled);
93 format.setCurrentWireFormatVersion(getCurrentWireFormatVersion());
94 return format;
95 }
96
97
98 private Object readResolve() throws ObjectStreamException {
99 return new DefaultWireFormat();
100 }
101
102 public Packet writePacket(Packet packet, DataOutput dataOut) throws IOException {
103 PacketWriter writer = getWriter(packet);
104 if (writer != null) {
105 synchronized (writeMutex) {
106 internalBytesOut.reset();
107 writer.writePacket(packet, internalDataOut);
108 internalDataOut.flush();
109 //reuse the byte buffer in the ByteArrayOutputStream
110 byte[] data = internalBytesOut.getData();
111 int count = internalBytesOut.size();
112 dataOut.writeByte(packet.getPacketType());
113 dataOut.writeInt(count);
114 //byte[] data = internalBytesOut.toByteArray();
115 //int count = data.length;
116 //dataOut.writeInt(count);
117 packet.setMemoryUsage(count);
118 dataOut.write(data, 0, count);
119 }
120 }
121 return null;
122 }
123
124 protected synchronized final Packet readPacket(DataInput dataIn, PacketReader reader) throws IOException {
125 synchronized (readMutex) {
126 Packet packet = reader.createPacket();
127 int length = dataIn.readInt();
128 packet.setMemoryUsage(length);
129 byte[] data = new byte[length];
130 dataIn.readFully(data);
131 //then splat into the internal datainput
132 internalBytesIn.restart(data);
133 reader.buildPacket(packet, internalDataIn);
134 return packet;
135 }
136 }
137
138 /**
139 * A helper method which converts a packet into a byte array Overrides the WireFormat to make use of the internal
140 * BytesOutputStream
141 *
142 * @param packet
143 * @return a byte array representing the packet using some wire protocol
144 * @throws IOException
145 */
146 public byte[] toBytes(Packet packet) throws IOException {
147 byte[] data = null;
148 PacketWriter writer = getWriter(packet);
149
150 if (writer != null) {
151
152 synchronized (writeMutex) {
153 internalBytesOut.reset();
154 internalDataOut.writeByte(packet.getPacketType());
155 internalDataOut.writeInt(-1);//the length
156 writer.writePacket(packet, internalDataOut);
157 internalDataOut.flush();
158 data = internalBytesOut.toByteArray();
159 }
160
161 // lets subtract the header offset from the length
162 int length = data.length - 5;
163 packet.setMemoryUsage(length);
164 //write in the length to the data
165 data[1] = (byte) ((length >>> 24) & 0xFF);
166 data[2] = (byte) ((length >>> 16) & 0xFF);
167 data[3] = (byte) ((length >>> 8) & 0xFF);
168 data[4] = (byte) ((length >>> 0) & 0xFF);
169 }
170
171 return data;
172 }
173
174 ///////////////////////////////////////////////////////////////
175 //
176 // Methods to handle cached values
177 //
178 ///////////////////////////////////////////////////////////////
179
180 public Object getValueFromReadCache(short key) {
181 if( key < 0 || key > readValueCacheArray.length )
182 return null;
183 return readValueCacheArray[key];
184 }
185
186 protected short getWriteCachedKey(Object key) throws IOException{
187 if (key != null){
188 Short result = null;
189 result = (Short)writeValueCache.get(key);
190 if (result == null){
191 result = getNextCacheId();
192 writeValueCache.put(key,result);
193 writeValueCacheArray[result.shortValue()]=key;
194 updateCachedValue(result.shortValue(),key);
195 }
196 return result.shortValue();
197 }
198 return DefaultWireFormat.NULL_VALUE;
199 }
200
201 /**
202 * @return
203 */
204 private Short getNextCacheId() {
205 Short result;
206 result = new Short(cachedKeyGenerator++);
207 // once we fill the cache start reusing old cache locations to avoid memory leaks.
208 if (cachedKeyGenerator >= MAX_CACHE_SIZE) {
209 cachedKeyGenerator=0;
210 }
211
212 lastWriteValueCacheEvictionPosition++;
213 if (lastWriteValueCacheEvictionPosition >= MAX_CACHE_SIZE) {
214 lastWriteValueCacheEvictionPosition=0;
215 }
216
217 if( writeValueCacheArray[lastWriteValueCacheEvictionPosition] !=null ) {
218 Object o = writeValueCacheArray[lastWriteValueCacheEvictionPosition];
219 writeValueCache.remove(o);
220 writeValueCacheArray[lastWriteValueCacheEvictionPosition]=null;
221 }
222 return result;
223 }
224
225 protected void validateWriteCache() throws IOException {
226 if (cachingEnabled) {
227 if (writeValueCache.size() >= MAX_CACHE_SIZE) {
228 writeValueCache.clear();
229 Arrays.fill(writeValueCacheArray,null);
230 cachedKeyGenerator = 0;
231 updateCachedValue((short) -1, null);// send update to peer to
232 // clear the peer cache
233 }
234 }
235 }
236
237 protected void handleCachedValue(CachedValue cv) {
238 if (cv != null) {
239 if (cv.getId() == CLEAR_CACHE) {
240 Arrays.fill(readValueCacheArray, null);
241 } else if (cv.getId() != NULL_VALUE) {
242 readValueCacheArray[cv.getId()] = cv.getValue();
243 }
244 }
245 }
246
247 private synchronized void updateCachedValue(short key, Object value) throws IOException {
248 if (cachedValueWriter == null) {
249 cachedValueWriter = new CachedValueWriter();
250 }
251 CachedValue cv = new CachedValue();
252 cv.setId(key);
253 cv.setValue(value);
254 cachedBytesOut.reset();
255 cachedValueWriter.writePacket(cv, cachedDataOut);
256 cachedDataOut.flush();
257 byte[] data = cachedBytesOut.getData();
258 int count = cachedBytesOut.size();
259 getTransportDataOut().writeByte(cv.getPacketType());
260 getTransportDataOut().writeInt(count);
261 getTransportDataOut().write(data, 0, count);
262 }
263 }