Docjar: A Java Source and Docuemnt Enginecom.*    java.*    javax.*    org.*    all    new    plug-in

Quick Search    Search Deep

Source code: org/activemq/transport/activeio/PacketAggregator.java


1   /**
2    *
3    * Copyright 2004 Hiram Chirino
4    *
5    *  Licensed under the Apache License, Version 2.0 (the "License");
6    *  you may not use this file except in compliance with the License.
7    *  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   *  Unless required by applicable law or agreed to in writing, software
12   *  distributed under the License is distributed on an "AS IS" BASIS,
13   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   *  See the License for the specific language governing permissions and
15   *  limitations under the License.
16   */
17  package org.activemq.transport.activeio;
18  
19  import java.io.IOException;
20  
21  import org.activeio.Packet;
22  import org.activeio.PacketData;
23  import org.activeio.packet.AppendedPacket;
24  import org.activeio.packet.EOSPacket;
25  
26  /**
27   * @version $Revision: 1.1.1.1 $
28   */
29  abstract public class PacketAggregator {
30  
31      private static final int HEADER_LENGTH = 5;        
32  
33      Packet incompleteUpPacket;
34      boolean headerLoaded;
35      private int upPacketLength;
36      
37      public void addRawPacket(Packet packet) throws IOException {
38  
39          // Passthrough the EOS packet.
40          if( packet == EOSPacket.EOS_PACKET ) {
41              packetAssembled(packet);
42              return;
43          }
44          
45          if (incompleteUpPacket != null) {
46              packet = AppendedPacket.join(incompleteUpPacket, packet);
47              incompleteUpPacket = null;
48          }
49  
50          while (true) {
51  
52              if (!headerLoaded) {
53                  headerLoaded = packet.remaining() >= HEADER_LENGTH;
54                  if( headerLoaded ) {
55                      PacketData data = new PacketData(packet.duplicate());
56                      data.readByte();
57                      upPacketLength = data.readInt();
58                      if( upPacketLength < 0 ) {
59                          throw new IOException("Up packet lenth was invalid: "+upPacketLength);
60                      }
61                      upPacketLength+=HEADER_LENGTH;
62                  }
63                  if( !headerLoaded )
64                      break;
65              }
66  
67              if (packet.remaining() < upPacketLength )
68                  break;
69  
70              // Get ready to create a slice to send up.
71              int origLimit = packet.limit();
72              packet.limit(upPacketLength);
73              packetAssembled(packet.slice());
74              
75              // Get a slice of the remaining since that will dump
76              // the first packets of an AppendedPacket
77              packet.position(upPacketLength);
78              packet.limit(origLimit);
79              packet = packet.slice();
80  
81              // Need to load a header again now.
82              headerLoaded = false;
83          }
84          if (packet.hasRemaining()) {
85              incompleteUpPacket = packet;
86          }
87          
88      }
89  
90      protected abstract void packetAssembled(Packet packet);
91  }