Docjar: A Java Source and Docuemnt Enginecom.*    java.*    javax.*    org.*    all    new    plug-in

Quick Search    Search Deep

Source code: org/activemq/io/impl/DefaultWireFormat.java


1   /**
2    *
3    * Copyright 2004 Protique Ltd
4    *
5    * Licensed under the Apache License, Version 2.0 (the "License");
6    * you may not use this file except in compliance with the License.
7    * You may obtain a copy of the License at
8    *
9    * http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   *
17   **/
18  
19  package org.activemq.io.impl;
20  import java.io.DataInput;
21  import java.io.DataInputStream;
22  import java.io.DataOutput;
23  import java.io.DataOutputStream;
24  import java.io.IOException;
25  import java.io.ObjectStreamException;
26  import java.io.Serializable;
27  import java.util.Arrays;
28  import java.util.HashMap;
29  import java.util.Map;
30  
31  import org.activemq.io.WireFormat;
32  import org.activemq.io.util.WireByteArrayInputStream;
33  import org.activemq.io.util.WireByteArrayOutputStream;
34  import org.activemq.message.CachedValue;
35  import org.activemq.message.Packet;
36  
37  /**
38   * This is a stateful AbstractDefaultWireFormat which implements value caching.  Not optimal for use by 
39   * many concurrent threads.  One DefaultWireFormat is typically allocated per client connection.  
40   *  
41   * @version $Revision: 1.1.1.1 $
42   */
43  public class DefaultWireFormat extends AbstractDefaultWireFormat implements Serializable {
44  
45      private static final long serialVersionUID = -1454851936411678612L;
46  
47      private static final int MAX_CACHE_SIZE = Short.MAX_VALUE/2; //needs to be a lot less than Short.MAX_VALUE
48      
49      static final short NULL_VALUE = -1;
50      static final short CLEAR_CACHE = -2;
51      
52      //
53      // Fields used during a write.
54      //
55      protected transient final Object writeMutex = new Object();
56      protected transient WireByteArrayOutputStream internalBytesOut;
57      protected transient DataOutputStream internalDataOut;
58      protected transient WireByteArrayOutputStream cachedBytesOut;
59      protected transient DataOutputStream cachedDataOut;
60      private Map writeValueCache = new HashMap();
61      protected transient short cachedKeyGenerator;
62      protected transient short lastWriteValueCacheEvictionPosition=500;
63      
64      //
65      // Fields used during a read.
66      //
67      protected transient final Object readMutex = new Object();
68      protected transient WireByteArrayInputStream internalBytesIn;
69      protected transient DataInputStream internalDataIn;    
70      private Object[] writeValueCacheArray = new Object[MAX_CACHE_SIZE];
71      private Object[] readValueCacheArray = new Object[MAX_CACHE_SIZE];
72  
73      
74      /**
75       * Default Constructor
76       */
77      public DefaultWireFormat() {
78          internalBytesOut = new WireByteArrayOutputStream();
79          internalDataOut = new DataOutputStream(internalBytesOut);
80          internalBytesIn = new WireByteArrayInputStream();
81          internalDataIn = new DataInputStream(internalBytesIn);
82          this.currentWireFormatVersion = WIRE_FORMAT_VERSION;
83          this.cachedBytesOut = new WireByteArrayOutputStream();
84          this.cachedDataOut = new DataOutputStream(cachedBytesOut);
85      }    
86  
87      /**
88       * @return new WireFormat
89       */
90      public WireFormat copy() {
91          DefaultWireFormat format = new DefaultWireFormat();
92          format.setCachingEnabled(cachingEnabled);
93          format.setCurrentWireFormatVersion(getCurrentWireFormatVersion());
94          return format;
95      }
96      
97      
98      private Object readResolve() throws ObjectStreamException {
99          return new DefaultWireFormat();
100     }
101     
102     public Packet writePacket(Packet packet, DataOutput dataOut) throws IOException {
103         PacketWriter writer = getWriter(packet);
104         if (writer != null) {
105             synchronized (writeMutex) {
106                 internalBytesOut.reset();
107                 writer.writePacket(packet, internalDataOut);
108                 internalDataOut.flush();
109                 //reuse the byte buffer in the ByteArrayOutputStream
110                 byte[] data = internalBytesOut.getData();
111                 int count = internalBytesOut.size();
112                 dataOut.writeByte(packet.getPacketType());
113                 dataOut.writeInt(count);
114                 //byte[] data = internalBytesOut.toByteArray();
115                 //int count = data.length;
116                 //dataOut.writeInt(count);
117                 packet.setMemoryUsage(count);
118                 dataOut.write(data, 0, count);                
119             }
120         }
121         return null;
122     }
123 
124     protected synchronized final Packet readPacket(DataInput dataIn, PacketReader reader) throws IOException {
125         synchronized (readMutex) {
126             Packet packet = reader.createPacket();
127             int length = dataIn.readInt();
128             packet.setMemoryUsage(length);
129             byte[] data = new byte[length];
130             dataIn.readFully(data);
131             //then splat into the internal datainput
132             internalBytesIn.restart(data);
133             reader.buildPacket(packet, internalDataIn);
134             return packet;
135         }
136     }
137     
138     /**
139      * A helper method which converts a packet into a byte array Overrides the WireFormat to make use of the internal
140      * BytesOutputStream
141      * 
142      * @param packet
143      * @return a byte array representing the packet using some wire protocol
144      * @throws IOException
145      */
146     public byte[] toBytes(Packet packet) throws IOException {
147         byte[] data = null;
148         PacketWriter writer = getWriter(packet);
149 
150         if (writer != null) {
151             
152             synchronized (writeMutex) {
153                 internalBytesOut.reset();
154                 internalDataOut.writeByte(packet.getPacketType());
155                 internalDataOut.writeInt(-1);//the length
156                 writer.writePacket(packet, internalDataOut);
157                 internalDataOut.flush();
158                 data = internalBytesOut.toByteArray();
159             }
160             
161             // lets subtract the header offset from the length
162             int length = data.length - 5;
163             packet.setMemoryUsage(length);
164             //write in the length to the data
165             data[1] = (byte) ((length >>> 24) & 0xFF);
166             data[2] = (byte) ((length >>> 16) & 0xFF);
167             data[3] = (byte) ((length >>> 8) & 0xFF);
168             data[4] = (byte) ((length >>> 0) & 0xFF);
169         }
170         
171         return data;
172     }
173 
174     ///////////////////////////////////////////////////////////////
175     //
176     // Methods to handle cached values
177     //
178     ///////////////////////////////////////////////////////////////    
179     
180     public Object getValueFromReadCache(short key) {
181         if( key < 0 || key > readValueCacheArray.length )
182             return null;
183         return readValueCacheArray[key];
184     }
185     
186     protected short getWriteCachedKey(Object key) throws IOException{
187         if (key != null){
188             Short result = null;
189             result = (Short)writeValueCache.get(key);
190             if (result == null){
191                 result = getNextCacheId();
192                 writeValueCache.put(key,result);
193                 writeValueCacheArray[result.shortValue()]=key;
194                 updateCachedValue(result.shortValue(),key);                
195             }
196             return result.shortValue();
197         }
198         return DefaultWireFormat.NULL_VALUE;
199     }
200 
201     /**
202      * @return
203      */
204     private Short getNextCacheId() {
205         Short result;
206         result = new Short(cachedKeyGenerator++);
207         // once we fill the cache start reusing old cache locations to avoid memory leaks.
208         if (cachedKeyGenerator >= MAX_CACHE_SIZE) {
209             cachedKeyGenerator=0;
210         }
211         
212         lastWriteValueCacheEvictionPosition++;
213         if (lastWriteValueCacheEvictionPosition >= MAX_CACHE_SIZE) {
214             lastWriteValueCacheEvictionPosition=0;
215         }
216         
217         if( writeValueCacheArray[lastWriteValueCacheEvictionPosition] !=null ) {
218             Object o = writeValueCacheArray[lastWriteValueCacheEvictionPosition];
219             writeValueCache.remove(o);
220             writeValueCacheArray[lastWriteValueCacheEvictionPosition]=null;            
221         }
222         return result;
223     }
224     
225     protected void validateWriteCache() throws IOException {
226         if (cachingEnabled) {
227             if (writeValueCache.size() >= MAX_CACHE_SIZE) {
228                 writeValueCache.clear();
229                 Arrays.fill(writeValueCacheArray,null);
230                 cachedKeyGenerator = 0;
231                 updateCachedValue((short) -1, null);// send update to peer to
232                                                     // clear the peer cache
233             }
234         }
235     }
236     
237     protected void handleCachedValue(CachedValue cv) {
238         if (cv != null) {
239             if (cv.getId() == CLEAR_CACHE) {
240                 Arrays.fill(readValueCacheArray, null);
241             } else if (cv.getId() != NULL_VALUE) {
242                 readValueCacheArray[cv.getId()] = cv.getValue();
243             }
244         }
245     }    
246     
247     private synchronized void updateCachedValue(short key, Object value) throws IOException {
248         if (cachedValueWriter == null) {
249             cachedValueWriter = new CachedValueWriter();
250         }
251         CachedValue cv = new CachedValue();
252         cv.setId(key);
253         cv.setValue(value);
254         cachedBytesOut.reset();
255         cachedValueWriter.writePacket(cv, cachedDataOut);
256         cachedDataOut.flush();
257         byte[] data = cachedBytesOut.getData();
258         int count = cachedBytesOut.size();
259         getTransportDataOut().writeByte(cv.getPacketType());
260         getTransportDataOut().writeInt(count);
261         getTransportDataOut().write(data, 0, count);
262     }
263 }