| Home >> All >> org >> activemq >> transport >> [ stomp Javadoc ] |
Source code: org/activemq/transport/stomp/Ack.java
1 /* 2 * Copyright (c) 2005 Your Corporation. All Rights Reserved. 3 */ 4 package org.activemq.transport.stomp; 5 6 import org.activemq.message.ActiveMQDestination; 7 import org.activemq.message.ActiveMQMessage; 8 import org.activemq.message.MessageAck; 9 10 import javax.jms.DeliveryMode; 11 import java.io.DataInput; 12 import java.io.IOException; 13 import java.net.ProtocolException; 14 import java.util.List; 15 import java.util.Properties; 16 17 class Ack implements Command 18 { 19 private final StompWireFormat format; 20 private static final HeaderParser parser = new HeaderParser(); 21 22 Ack(StompWireFormat format) 23 { 24 this.format = format; 25 } 26 27 public PacketEnvelope build(String commandLine, DataInput in) throws IOException 28 { 29 Properties headers = parser.parse(in); 30 String message_id = headers.getProperty(Stomp.Headers.Ack.MESSAGE_ID); 31 if (message_id == null) throw new ProtocolException("ACK received without a message-id to acknowledge!"); 32 33 List listeners = format.getAckListeners(); 34 for (int i = 0; i < listeners.size(); i++) 35 { 36 AckListener listener = (AckListener) listeners.get(i); 37 if (listener.handle(message_id)) 38 { 39 listeners.remove(i); 40 ActiveMQMessage msg = listener.getMessage(); 41 MessageAck ack = new MessageAck(); 42 ack.setDestination((ActiveMQDestination) msg.getJMSDestination()); 43 ack.setConsumerId(listener.getConsumerId()); 44 ack.setMessageID(msg.getJMSMessageID()); 45 ack.setMessageRead(true); 46 ack.setProducerKey(msg.getProducerKey()); 47 ack.setSequenceNumber(msg.getSequenceNumber()); 48 ack.setPersistent(msg.getJMSDeliveryMode() == DeliveryMode.PERSISTENT); 49 ack.setSessionId(format.getSessionId()); 50 51 if (headers.containsKey(Stomp.Headers.TRANSACTION)) 52 { 53 String tx_id = format.getTransactionId(headers.getProperty(Stomp.Headers.TRANSACTION)); 54 if (tx_id == null) throw new ProtocolException(headers.getProperty(Stomp.Headers.TRANSACTION) + 55 " is an invalid transaction id"); 56 ack.setTransactionIDString(tx_id); 57 } 58 59 while ((in.readByte()) != 0) {} 60 return new PacketEnvelope(ack, headers); 61 } 62 } 63 while ((in.readByte()) != 0) {} 64 throw new ProtocolException("Unexepected ACK received for message-id [" + message_id + "]"); 65 } 66 }