| Home >> All >> org >> activemq >> transport >> [ stomp Javadoc ] |
Source code: org/activemq/transport/stomp/CommandParser.java
1 /* 2 * Copyright (c) 2005 Your Corporation. All Rights Reserved. 3 */ 4 package org.activemq.transport.stomp; 5 6 import org.activemq.message.Packet; 7 import org.activemq.message.Receipt; 8 9 import java.io.DataInput; 10 import java.io.DataOutput; 11 import java.io.IOException; 12 import java.net.ProtocolException; 13 14 class CommandParser 15 { 16 private String clientId; 17 private final StompWireFormat format; 18 19 CommandParser(StompWireFormat wireFormat) 20 { 21 format = wireFormat; 22 } 23 24 Packet parse(DataInput in) throws IOException 25 { 26 String line; 27 28 // skip white space to next real line 29 try 30 { 31 while ((line = in.readLine()).trim().length() == 0) {} 32 } 33 catch (NullPointerException e) 34 { 35 throw new IOException("connection was closed"); 36 } 37 38 // figure corrent command and return it 39 Command command = null; 40 if (line.startsWith(Stomp.Commands.SUBSCRIBE)) command = new Subscribe(format); 41 if (line.startsWith(Stomp.Commands.SEND)) command = new Send(format); 42 if (line.startsWith(Stomp.Commands.DISCONNECT)) command = new Disconnect(clientId); 43 if (line.startsWith(Stomp.Commands.BEGIN)) command = new Begin(format); 44 if (line.startsWith(Stomp.Commands.COMMIT)) command = new Commit(format); 45 if (line.startsWith(Stomp.Commands.ABORT)) command = new Abort(format); 46 if (line.startsWith(Stomp.Commands.UNSUBSCRIBE)) command = new Unsubscribe(format); 47 if (line.startsWith(Stomp.Commands.ACK)) command = new Ack(format); 48 49 if (command == null) 50 { 51 while (in.readByte() == 0) {} 52 throw new ProtocolException("Unknown command [" + line + "]"); 53 } 54 55 PacketEnvelope envelope = command.build(line, in); 56 if (envelope.getHeaders().containsKey(Stomp.Headers.RECEIPT_REQUESTED)) 57 { 58 final short id = StompWireFormat.PACKET_IDS.getNextShortSequence(); 59 envelope.getPacket().setId(id); 60 envelope.getPacket().setReceiptRequired(true); 61 final String client_packet_key = envelope.getHeaders().getProperty(Stomp.Headers.RECEIPT_REQUESTED); 62 format.addReceiptListener(new ReceiptListener() 63 { 64 public boolean onReceipt(Receipt receipt, DataOutput out) throws IOException 65 { 66 if (receipt.getCorrelationId() != id) return false; 67 68 out.write(new FrameBuilder(Stomp.Responses.RECEIPT) 69 .addHeader(Stomp.Headers.Receipt.RECEIPT_ID, client_packet_key) 70 .toFrame()); 71 return true; 72 } 73 }); 74 } 75 76 return envelope.getPacket(); 77 } 78 79 void setClientId(String clientId) 80 { 81 this.clientId = clientId; 82 } 83 }