Docjar: A Java Source and Docuemnt Enginecom.*    java.*    javax.*    org.*    all    new    plug-in

Quick Search    Search Deep

Source code: org/activemq/io/impl/AbstractDefaultWireFormat.java


1   /**
2    *
3    * Copyright 2004 Protique Ltd
4    *
5    * Licensed under the Apache License, Version 2.0 (the "License");
6    * you may not use this file except in compliance with the License.
7    * You may obtain a copy of the License at
8    *
9    * http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   *
17   **/
18  
19  package org.activemq.io.impl;
20  import java.io.DataInput;
21  import java.io.DataInputStream;
22  import java.io.DataOutput;
23  import java.io.DataOutputStream;
24  import java.io.IOException;
25  import java.io.Serializable;
26  import java.util.ArrayList;
27  import java.util.List;
28  
29  import javax.jms.JMSException;
30  
31  import org.activemq.io.AbstractWireFormat;
32  import org.activemq.io.WireFormat;
33  import org.activemq.message.AbstractPacket;
34  import org.activemq.message.CachedValue;
35  import org.activemq.message.Packet;
36  import org.activemq.message.WireFormatInfo;
37  import org.apache.commons.logging.Log;
38  import org.apache.commons.logging.LogFactory;
39  
40  /**
41   * Default implementation used for Java-Java protocols. When talking to non-Java nodes we may use a different wire
42   * format.
43   * 
44   * @version $Revision: 1.1.1.1 $
45   */
46  public abstract class AbstractDefaultWireFormat extends AbstractWireFormat implements Serializable {
47  
48      /**
49       * Current wire format version for this implementation
50       */
51      public static final int WIRE_FORMAT_VERSION = 3;
52      private static final Log log = LogFactory.getLog(AbstractDefaultWireFormat.class);
53      
54      protected transient  PacketReader messageReader;
55      protected transient  PacketReader textMessageReader;
56      protected transient  PacketReader objectMessageReader;
57      protected transient  PacketReader bytesMessageReader;
58      protected transient  PacketReader streamMessageReader;
59      protected transient  PacketReader mapMessageReader;
60      protected transient  PacketReader messageAckReader;
61      protected transient  PacketReader receiptReader;
62      protected transient  PacketReader consumerInfoReader;
63      protected transient  PacketReader producerInfoReader;
64      protected transient  PacketReader transactionInfoReader;
65      protected transient  PacketReader xaTransactionInfoReader;
66      protected transient  PacketReader brokerInfoReader;
67      protected transient  PacketReader connectionInfoReader;
68      protected transient  PacketReader sessionInfoReader;
69      protected transient  PacketReader durableUnsubscribeReader;
70      protected transient  PacketReader reponseReceiptReader;
71      protected transient  PacketReader intReponseReceiptReader;
72      protected transient  PacketReader capacityInfoReader;
73      protected transient  PacketReader capacityInfoRequestReader;
74      protected transient  PacketReader wireFormatInfoReader;
75      protected transient  PacketReader keepAliveReader;
76      protected transient  PacketReader brokerAdminCommandReader;
77      protected transient  PacketReader cachedValueReader;
78      protected transient  PacketReader cleanupConnectionAndSessionInfoReader;        
79      protected transient  PacketWriter messageWriter;
80      protected transient  PacketWriter textMessageWriter;
81      protected transient  PacketWriter objectMessageWriter;
82      protected transient  PacketWriter bytesMessageWriter;
83      protected transient  PacketWriter streamMessageWriter;
84      protected transient  PacketWriter mapMessageWriter;
85      protected transient  PacketWriter messageAckWriter;
86      protected transient  PacketWriter receiptWriter;
87      protected transient  PacketWriter consumerInfoWriter;
88      protected transient  PacketWriter producerInfoWriter;
89      protected transient  PacketWriter transactionInfoWriter;
90      protected transient  PacketWriter xaTransactionInfoWriter;
91      protected transient  PacketWriter brokerInfoWriter;
92      protected transient  PacketWriter connectionInfoWriter;
93      protected transient  PacketWriter sessionInfoWriter;
94      protected transient  PacketWriter durableUnsubscribeWriter;
95      protected transient  PacketWriter reponseReceiptWriter;
96      protected transient  PacketWriter intReponseReceiptWriter;
97      protected transient  PacketWriter capacityInfoWriter;
98      protected transient  PacketWriter capacityInfoRequestWriter;
99      protected transient  PacketWriter wireFormatInfoWriter;
100     protected transient  PacketWriter keepAliveWriter;
101     protected transient  PacketWriter brokerAdminCommandWriter;
102     protected transient  PacketWriter cachedValueWriter;
103     protected transient  PacketWriter cleanupConnectionAndSessionInfoWriter;
104 
105     private List readers = new ArrayList();
106     private List writers = new ArrayList();
107     
108     protected transient int currentWireFormatVersion;
109     
110     /**
111      * Default Constructor
112      */
113     public AbstractDefaultWireFormat() {
114         this.currentWireFormatVersion = WIRE_FORMAT_VERSION;
115         initializeReaders();
116         initializeWriters();
117     }
118     
119     
120     abstract public byte[] toBytes(Packet packet) throws IOException;
121     abstract public Packet writePacket(Packet packet, DataOutput dataOut) throws IOException;    
122     abstract protected Packet readPacket(DataInput dataIn, PacketReader reader) throws IOException;
123     
124     abstract protected void handleCachedValue(CachedValue cv);
125     abstract public Object getValueFromReadCache(short key);
126     abstract short getWriteCachedKey(Object value) throws IOException;
127     
128 
129     /**
130      * Some wire formats require a handshake at start-up
131      * @param dataOut
132      * @param dataIn
133      * @throws JMSException
134      */
135     public void initiateClientSideProtocol(DataOutputStream dataOut,DataInputStream dataIn) throws JMSException{
136         WireFormatInfo info = new WireFormatInfo();
137         info.setVersion(getCurrentWireFormatVersion());
138         try {
139             writePacket(info, dataOut);
140             dataOut.flush();
141         }
142         catch (IOException e) {
143             throw new JMSException("Failed to intiate protocol");
144         }
145     }
146     
147     /**
148      * Some wire formats require a handshake at start-up
149      * @param dataOut
150      * @param dataIn
151      * @throws JMSException
152      */
153     public void initiateServerSideProtocol(DataOutputStream dataOut,DataInputStream dataIn) throws JMSException{
154     }
155         
156     /**
157      * @return new WireFormat
158      */
159     abstract public WireFormat copy();
160 
161     /**
162      * @param firstByte
163      * @param dataIn
164      * @return
165      * @throws IOException
166      * 
167      */
168     public Packet readPacket(int firstByte, DataInput dataIn) throws IOException {
169         switch (firstByte) {
170             case Packet.ACTIVEMQ_MESSAGE :
171                 return readPacket(dataIn, messageReader);
172             case Packet.ACTIVEMQ_TEXT_MESSAGE :
173                 return readPacket(dataIn, textMessageReader);
174             case Packet.ACTIVEMQ_OBJECT_MESSAGE :
175                 return readPacket(dataIn, objectMessageReader);
176             case Packet.ACTIVEMQ_BYTES_MESSAGE :
177                 return readPacket(dataIn, bytesMessageReader);
178             case Packet.ACTIVEMQ_STREAM_MESSAGE :
179                 return readPacket(dataIn, streamMessageReader);
180             case Packet.ACTIVEMQ_MAP_MESSAGE :
181                 return readPacket(dataIn, mapMessageReader);
182             case Packet.ACTIVEMQ_MSG_ACK :
183                 return readPacket(dataIn, messageAckReader);
184             case Packet.RECEIPT_INFO :
185                 return readPacket(dataIn, receiptReader);
186             case Packet.CONSUMER_INFO :
187                 return readPacket(dataIn, consumerInfoReader);
188             case Packet.PRODUCER_INFO :
189                 return readPacket(dataIn, producerInfoReader);
190             case Packet.TRANSACTION_INFO :
191                 return readPacket(dataIn, transactionInfoReader);
192             case Packet.XA_TRANSACTION_INFO :
193                 return readPacket(dataIn, xaTransactionInfoReader);
194             case Packet.ACTIVEMQ_BROKER_INFO :
195                 return readPacket(dataIn, brokerInfoReader);
196             case Packet.ACTIVEMQ_CONNECTION_INFO :
197                 return readPacket(dataIn, connectionInfoReader);
198             case Packet.SESSION_INFO :
199                 return readPacket(dataIn, sessionInfoReader);
200             case Packet.DURABLE_UNSUBSCRIBE :
201                 return readPacket(dataIn, durableUnsubscribeReader);
202             case Packet.RESPONSE_RECEIPT_INFO :
203                 return readPacket(dataIn, reponseReceiptReader);
204             case Packet.INT_RESPONSE_RECEIPT_INFO :
205                 return readPacket(dataIn, intReponseReceiptReader);
206             case Packet.CAPACITY_INFO :
207                 return readPacket(dataIn, capacityInfoReader);
208             case Packet.CAPACITY_INFO_REQUEST :
209                 return readPacket(dataIn, capacityInfoRequestReader);
210             case Packet.WIRE_FORMAT_INFO :
211                 WireFormatInfo info =  (WireFormatInfo)readPacket(dataIn, wireFormatInfoReader);
212                 if (info != null){
213                     if (info.getVersion() < 3){
214                         throw new IOException("Cannot support wire format version: " + info.getVersion());
215                     }
216                 }
217                 return info;
218             
219             case Packet.KEEP_ALIVE :
220               return readPacket(dataIn, keepAliveReader);
221             case Packet.BROKER_ADMIN_COMMAND :
222               return readPacket(dataIn, brokerAdminCommandReader);
223             case Packet.CACHED_VALUE_COMMAND :
224                 CachedValue cv =  (CachedValue)readPacket(dataIn,cachedValueReader);
225                 handleCachedValue(cv);
226                 return null;
227             case Packet.CLEANUP_CONNECTION_INFO :
228                 return readPacket(dataIn, cleanupConnectionAndSessionInfoReader);
229             default :
230                 log.error("Could not find PacketReader for packet type: "
231                         + AbstractPacket.getPacketTypeAsString(firstByte));
232                 return null;
233         }
234     }
235     
236     protected PacketWriter getWriter(Packet packet) throws IOException {
237         PacketWriter answer = null;
238         switch (packet.getPacketType()) {
239             case Packet.ACTIVEMQ_MESSAGE :
240                 answer = messageWriter;
241                 break;
242             case Packet.ACTIVEMQ_TEXT_MESSAGE :
243                 answer = textMessageWriter;
244                 break;
245             case Packet.ACTIVEMQ_OBJECT_MESSAGE :
246                 answer = objectMessageWriter;
247                 break;
248             case Packet.ACTIVEMQ_BYTES_MESSAGE :
249                 answer = bytesMessageWriter;
250                 break;
251             case Packet.ACTIVEMQ_STREAM_MESSAGE :
252                 answer = streamMessageWriter;
253                 break;
254             case Packet.ACTIVEMQ_MAP_MESSAGE :
255                 answer = mapMessageWriter;
256                 break;
257             case Packet.ACTIVEMQ_MSG_ACK :
258                 answer = messageAckWriter;
259                 break;
260             case Packet.RECEIPT_INFO :
261                 answer = receiptWriter;
262                 break;
263             case Packet.CONSUMER_INFO :
264                 answer = consumerInfoWriter;
265                 break;
266             case Packet.PRODUCER_INFO :
267                 answer = producerInfoWriter;
268                 break;
269             case Packet.TRANSACTION_INFO :
270                 answer = transactionInfoWriter;
271                 break;
272             case Packet.XA_TRANSACTION_INFO :
273                 answer = xaTransactionInfoWriter;
274                 break;
275             case Packet.ACTIVEMQ_BROKER_INFO :
276                 answer = brokerInfoWriter;
277                 break;
278             case Packet.ACTIVEMQ_CONNECTION_INFO :
279                 answer = connectionInfoWriter;
280                 break;
281             case Packet.SESSION_INFO :
282                 answer = sessionInfoWriter;
283                 break;
284             case Packet.DURABLE_UNSUBSCRIBE :
285                 answer = durableUnsubscribeWriter;
286                 break;
287             case Packet.RESPONSE_RECEIPT_INFO :
288                 answer = reponseReceiptWriter;
289                 break;
290             case Packet.INT_RESPONSE_RECEIPT_INFO :
291                 answer = intReponseReceiptWriter;
292                 break;
293             case Packet.CAPACITY_INFO :
294                 answer = capacityInfoWriter;
295                 break;
296             case Packet.CAPACITY_INFO_REQUEST :
297                 answer = capacityInfoRequestWriter;
298                 break;
299             case Packet.WIRE_FORMAT_INFO :
300                 answer = wireFormatInfoWriter;
301                 break;
302             case Packet.KEEP_ALIVE :
303                 answer = keepAliveWriter;
304                 break;
305             case Packet.BROKER_ADMIN_COMMAND :
306                 answer = brokerAdminCommandWriter;
307                 break;
308             case Packet.CACHED_VALUE_COMMAND:
309                 answer = cachedValueWriter;
310                 break;
311             case Packet.CLEANUP_CONNECTION_INFO:
312                 answer = cleanupConnectionAndSessionInfoWriter;
313                 break;
314             default :
315                 log.error("no PacketWriter for packet: " + packet);
316         }
317         return answer;
318     }
319     
320     /**
321      * Can this wireformat process packets of this version
322      * @param version the version number to test
323      * @return true if can accept the version
324      */
325     public boolean canProcessWireFormatVersion(int version){
326         return version <= WIRE_FORMAT_VERSION;
327     }
328     
329     /**
330      * @return the current version of this wire format
331      */
332     public int getCurrentWireFormatVersion(){
333         return currentWireFormatVersion;
334     }
335     
336     /**
337      * set the current version
338      * @param version
339      */
340     public void setCurrentWireFormatVersion(int version){
341         this.currentWireFormatVersion = version;
342         for (int i =0; i < readers.size(); i++){
343             PacketReader reader = (PacketReader)readers.get(i);
344             reader.setWireFormatVersion(version);
345         }
346         for (int i =0; i < writers.size(); i++){
347             PacketWriter writer = (PacketWriter)writers.get(i);
348             writer.setWireFormatVersion(version);
349         }
350     }
351 
352     private void initializeReaders() {
353         messageReader = new ActiveMQMessageReader(this);
354         readers.add(messageReader);
355         textMessageReader = new ActiveMQTextMessageReader(this);
356         readers.add(textMessageReader);
357         objectMessageReader = new ActiveMQObjectMessageReader(this);
358         readers.add(objectMessageReader);
359         bytesMessageReader = new ActiveMQBytesMessageReader(this);
360         readers.add(bytesMessageReader);
361         streamMessageReader = new ActiveMQStreamMessageReader(this);
362         readers.add(streamMessageReader);
363         mapMessageReader = new ActiveMQMapMessageReader(this);
364         readers.add(mapMessageReader);
365         messageAckReader = new MessageAckReader(this);
366         readers.add(messageAckReader);
367         receiptReader = new ReceiptReader();
368         readers.add(receiptReader);
369         consumerInfoReader = new ConsumerInfoReader();
370         readers.add(consumerInfoReader);
371         producerInfoReader = new ProducerInfoReader();
372         readers.add(producerInfoReader);
373         transactionInfoReader = new TransactionInfoReader();
374         readers.add(transactionInfoReader);
375         xaTransactionInfoReader = new XATransactionInfoReader();
376         readers.add(xaTransactionInfoReader);
377         brokerInfoReader = new BrokerInfoReader();
378         readers.add(brokerInfoReader);
379         connectionInfoReader = new ConnectionInfoReader();
380         readers.add(connectionInfoReader);
381         sessionInfoReader = new SessionInfoReader();
382         readers.add(sessionInfoReader);
383         durableUnsubscribeReader = new DurableUnsubscribeReader();
384         readers.add(durableUnsubscribeReader);
385         reponseReceiptReader = new ResponseReceiptReader();
386         readers.add(reponseReceiptReader);
387         intReponseReceiptReader = new IntResponseReceiptReader();
388         readers.add(intReponseReceiptReader);
389         capacityInfoReader = new CapacityInfoReader();
390         readers.add(capacityInfoReader);
391         capacityInfoRequestReader = new CapacityInfoRequestReader();
392         readers.add(capacityInfoReader);
393         wireFormatInfoReader = new WireFormatInfoReader(this);
394         readers.add(wireFormatInfoReader);
395         keepAliveReader = new KeepAliveReader();
396         readers.add(keepAliveReader);
397         brokerAdminCommandReader = new BrokerAdminCommandReader();
398         readers.add(brokerAdminCommandReader);
399         cachedValueReader = new CachedValueReader();
400         readers.add(cachedValueReader);
401         cleanupConnectionAndSessionInfoReader = new CleanupConnectionInfoReader();
402         readers.add(cleanupConnectionAndSessionInfoReader);
403     }
404     
405     private void initializeWriters(){
406         messageWriter = new ActiveMQMessageWriter(this);
407         writers.add(messageWriter);
408         textMessageWriter = new ActiveMQTextMessageWriter(this);
409         writers.add(textMessageWriter);
410         objectMessageWriter = new ActiveMQObjectMessageWriter(this);
411         writers.add(objectMessageWriter);
412         bytesMessageWriter = new ActiveMQBytesMessageWriter(this);
413         writers.add(bytesMessageWriter);
414         streamMessageWriter = new ActiveMQStreamMessageWriter(this);
415         writers.add(streamMessageWriter);
416         mapMessageWriter = new ActiveMQMapMessageWriter(this);
417         writers.add(mapMessageWriter);
418         messageAckWriter = new MessageAckWriter(this);
419         writers.add(messageAckWriter);
420         receiptWriter = new ReceiptWriter();
421         writers.add(receiptWriter);
422         consumerInfoWriter = new ConsumerInfoWriter();
423         writers.add(consumerInfoWriter);
424         producerInfoWriter = new ProducerInfoWriter();
425         writers.add(producerInfoWriter);
426         transactionInfoWriter = new TransactionInfoWriter();
427         writers.add(transactionInfoWriter);
428         xaTransactionInfoWriter = new XATransactionInfoWriter();
429         writers.add(xaTransactionInfoWriter);
430         brokerInfoWriter = new BrokerInfoWriter();
431         writers.add(brokerInfoWriter);
432         connectionInfoWriter = new ConnectionInfoWriter();
433         writers.add(connectionInfoWriter);
434         sessionInfoWriter = new SessionInfoWriter();
435         writers.add(sessionInfoWriter);
436         durableUnsubscribeWriter = new DurableUnsubscribeWriter();
437         writers.add(durableUnsubscribeWriter);
438         reponseReceiptWriter = new ResponseReceiptWriter();
439         writers.add(reponseReceiptWriter);
440         intReponseReceiptWriter = new IntResponseReceiptWriter();
441         writers.add(intReponseReceiptWriter);
442         capacityInfoWriter = new CapacityInfoWriter();
443         writers.add(capacityInfoWriter);
444         capacityInfoRequestWriter = new CapacityInfoRequestWriter();
445         writers.add(capacityInfoWriter);
446         wireFormatInfoWriter = new WireFormatInfoWriter();
447         writers.add(wireFormatInfoWriter);
448         keepAliveWriter = new KeepAliveWriter();
449         writers.add(keepAliveWriter);
450         brokerAdminCommandWriter = new BrokerAdminCommandWriter();
451         writers.add(brokerAdminCommandWriter);
452         cachedValueWriter = new CachedValueWriter();
453         writers.add(cachedValueWriter);
454         cleanupConnectionAndSessionInfoWriter = new CleanupConnectionInfoWriter();
455         writers.add(cleanupConnectionAndSessionInfoWriter);
456     }
457     
458 }