Docjar: A Java Source and Docuemnt Enginecom.*    java.*    javax.*    org.*    all    new    plug-in

Quick Search    Search Deep

Source code: org/activemq/transport/stomp/Ack.java


1   /*
2    * Copyright (c) 2005 Your Corporation. All Rights Reserved.
3    */
4   package org.activemq.transport.stomp;
5   
6   import org.activemq.message.ActiveMQDestination;
7   import org.activemq.message.ActiveMQMessage;
8   import org.activemq.message.MessageAck;
9   
10  import javax.jms.DeliveryMode;
11  import java.io.DataInput;
12  import java.io.IOException;
13  import java.net.ProtocolException;
14  import java.util.List;
15  import java.util.Properties;
16  
17  class Ack implements Command
18  {
19      private final StompWireFormat format;
20      private static final HeaderParser parser = new HeaderParser();
21  
22      Ack(StompWireFormat format)
23      {
24          this.format = format;
25      }
26  
27      public PacketEnvelope build(String commandLine, DataInput in) throws IOException
28      {
29          Properties headers = parser.parse(in);
30          String message_id = headers.getProperty(Stomp.Headers.Ack.MESSAGE_ID);
31          if (message_id == null) throw new ProtocolException("ACK received without a message-id to acknowledge!");
32  
33          List listeners = format.getAckListeners();
34          for (int i = 0; i < listeners.size(); i++)
35          {
36              AckListener listener = (AckListener) listeners.get(i);
37              if (listener.handle(message_id))
38              {
39                  listeners.remove(i);
40                  ActiveMQMessage msg = listener.getMessage();
41                  MessageAck ack = new MessageAck();
42                  ack.setDestination((ActiveMQDestination) msg.getJMSDestination());
43                  ack.setConsumerId(listener.getConsumerId());
44                  ack.setMessageID(msg.getJMSMessageID());
45                  ack.setMessageRead(true);
46                  ack.setProducerKey(msg.getProducerKey());
47                  ack.setSequenceNumber(msg.getSequenceNumber());
48                  ack.setPersistent(msg.getJMSDeliveryMode() == DeliveryMode.PERSISTENT);
49                  ack.setSessionId(format.getSessionId());
50  
51                  if (headers.containsKey(Stomp.Headers.TRANSACTION))
52                  {
53                      String tx_id = format.getTransactionId(headers.getProperty(Stomp.Headers.TRANSACTION));
54                      if (tx_id == null) throw new ProtocolException(headers.getProperty(Stomp.Headers.TRANSACTION) +
55                                                                     " is an invalid transaction id");
56                      ack.setTransactionIDString(tx_id);
57                  }
58  
59                  while ((in.readByte()) != 0) {}
60                  return new PacketEnvelope(ack, headers);
61              }
62          }
63          while ((in.readByte()) != 0) {}
64          throw new ProtocolException("Unexepected ACK received for message-id [" + message_id + "]");
65      }
66  }