Docjar: A Java Source and Docuemnt Enginecom.*    java.*    javax.*    org.*    all    new    plug-in

Quick Search    Search Deep

Source code: org/activemq/transport/stomp/CommandParser.java


1   /*
2    * Copyright (c) 2005 Your Corporation. All Rights Reserved.
3    */
4   package org.activemq.transport.stomp;
5   
6   import org.activemq.message.Packet;
7   import org.activemq.message.Receipt;
8   
9   import java.io.DataInput;
10  import java.io.DataOutput;
11  import java.io.IOException;
12  import java.net.ProtocolException;
13  
14  class CommandParser
15  {
16      private String clientId;
17      private final StompWireFormat format;
18  
19      CommandParser(StompWireFormat wireFormat)
20      {
21          format = wireFormat;
22      }
23  
24      Packet parse(DataInput in) throws IOException
25      {
26          String line;
27  
28          // skip white space to next real line
29          try
30          {
31              while ((line = in.readLine()).trim().length() == 0) {}
32          }
33          catch (NullPointerException e)
34          {
35              throw new IOException("connection was closed");
36          }
37  
38          // figure corrent command and return it
39          Command command = null;
40          if (line.startsWith(Stomp.Commands.SUBSCRIBE)) command = new Subscribe(format);
41          if (line.startsWith(Stomp.Commands.SEND)) command = new Send(format);
42          if (line.startsWith(Stomp.Commands.DISCONNECT)) command = new Disconnect(clientId);
43          if (line.startsWith(Stomp.Commands.BEGIN)) command = new Begin(format);
44          if (line.startsWith(Stomp.Commands.COMMIT)) command = new Commit(format);
45          if (line.startsWith(Stomp.Commands.ABORT)) command = new Abort(format);
46          if (line.startsWith(Stomp.Commands.UNSUBSCRIBE)) command = new Unsubscribe(format);
47          if (line.startsWith(Stomp.Commands.ACK)) command = new Ack(format);
48  
49          if (command == null)
50          {
51              while (in.readByte() == 0) {}
52              throw new ProtocolException("Unknown command [" + line + "]");
53          }
54  
55          PacketEnvelope envelope = command.build(line, in);
56          if (envelope.getHeaders().containsKey(Stomp.Headers.RECEIPT_REQUESTED))
57          {
58              final short id = StompWireFormat.PACKET_IDS.getNextShortSequence();
59              envelope.getPacket().setId(id);
60              envelope.getPacket().setReceiptRequired(true);
61              final String client_packet_key = envelope.getHeaders().getProperty(Stomp.Headers.RECEIPT_REQUESTED);
62              format.addReceiptListener(new ReceiptListener()
63              {
64                  public boolean onReceipt(Receipt receipt, DataOutput out) throws IOException
65                  {
66                      if (receipt.getCorrelationId() != id) return false;
67  
68                      out.write(new FrameBuilder(Stomp.Responses.RECEIPT)
69                              .addHeader(Stomp.Headers.Receipt.RECEIPT_ID, client_packet_key)
70                              .toFrame());
71                      return true;
72                  }
73              });
74          }
75  
76          return envelope.getPacket();
77      }
78  
79      void setClientId(String clientId)
80      {
81          this.clientId = clientId;
82      }
83  }