Docjar: A Java Source and Docuemnt Enginecom.*    java.*    javax.*    org.*    all    new    plug-in

Quick Search    Search Deep

Source code: org/activemq/transport/stomp/StompWireFormat.java


1   /*
2    * Copyright (c) 2005 Your Corporation. All Rights Reserved.
3    */
4   package org.activemq.transport.stomp;
5   
6   import EDU.oswego.cs.dl.util.concurrent.Channel;
7   import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
8   import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArrayList;
9   import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
10  import org.activemq.io.WireFormat;
11  import org.activemq.message.ActiveMQDestination;
12  import org.activemq.message.ActiveMQTextMessage;
13  import org.activemq.message.ConnectionInfo;
14  import org.activemq.message.ConsumerInfo;
15  import org.activemq.message.Packet;
16  import org.activemq.message.Receipt;
17  import org.activemq.message.SessionInfo;
18  import org.activemq.message.ActiveMQBytesMessage;
19  import org.activemq.util.IdGenerator;
20  
21  import javax.jms.JMSException;
22  import javax.jms.Session;
23  import java.io.BufferedReader;
24  import java.io.DataInput;
25  import java.io.DataInputStream;
26  import java.io.DataOutput;
27  import java.io.DataOutputStream;
28  import java.io.IOException;
29  import java.io.InputStreamReader;
30  import java.net.DatagramPacket;
31  import java.net.ProtocolException;
32  import java.util.List;
33  import java.util.Map;
34  import java.util.Properties;
35  
36  /**
37   * Implements the TTMP protocol.
38   */
39  public class StompWireFormat implements WireFormat
40  {
41  
42      static final IdGenerator PACKET_IDS = new IdGenerator();
43      static final IdGenerator clientIds = new IdGenerator();
44  
45      private CommandParser commandParser = new CommandParser(this);
46      private HeaderParser headerParser = new HeaderParser();
47  
48      private DataInputStream in;
49  
50      private String clientId;
51  
52      private Channel pendingReadPackets = new LinkedQueue();
53      private Channel pendingWriteFrames = new LinkedQueue();
54      private List receiptListeners = new CopyOnWriteArrayList();
55      private short sessionId;
56      private Map subscriptions = new ConcurrentHashMap();
57      private List ackListeners = new CopyOnWriteArrayList();
58      private final Map transactions = new ConcurrentHashMap();
59  
60  
61  
62      void addReceiptListener(ReceiptListener listener)
63      {
64          receiptListeners.add(listener);
65      }
66  
67  
68      public Packet readPacket(DataInput in) throws IOException
69      {
70          Packet pending = (Packet) AsyncHelper.tryUntilNotInterrupted(new AsyncHelper.HelperWithReturn()
71          {
72              public Object cycle() throws InterruptedException
73              {
74                  return pendingReadPackets.poll(0);
75              }
76          });
77          if (pending != null)
78          {
79              return pending;
80          }
81  
82          try
83          {
84              return commandParser.parse(in);
85          }
86          catch (ProtocolException e)
87          {
88              sendError(e.getMessage());
89              return FlushPacket.PACKET;
90          }
91      }
92  
93      public Packet writePacket(final Packet packet, final DataOutput out) throws IOException, JMSException
94      {
95          flushPendingFrames(out);
96  
97          // It may have just been a flush request.
98          if( packet == null )
99              return null;
100 
101         if (packet.getPacketType() == Packet.RECEIPT_INFO)
102         {
103             assert(packet instanceof Receipt);
104             Receipt receipt = (Receipt) packet;
105             for (int i = 0; i < receiptListeners.size(); i++)
106             {
107                 ReceiptListener listener = (ReceiptListener) receiptListeners.get(i);
108                 if (listener.onReceipt(receipt, out))
109                 {
110                     receiptListeners.remove(listener);
111                     return null;
112                 }
113             }
114         }
115 
116         if (packet.getPacketType() == Packet.ACTIVEMQ_TEXT_MESSAGE)
117         {
118             assert(packet instanceof ActiveMQTextMessage);
119             ActiveMQTextMessage msg = (ActiveMQTextMessage) packet;
120             Subscription sub = (Subscription) subscriptions.get(msg.getJMSDestination());
121             sub.receive(msg, out);
122         }
123         else if (packet.getPacketType() == Packet.ACTIVEMQ_BYTES_MESSAGE)
124         {
125             assert(packet instanceof ActiveMQBytesMessage);
126             ActiveMQBytesMessage msg = (ActiveMQBytesMessage) packet;
127             Subscription sub = (Subscription) subscriptions.get(msg.getJMSDestination());
128             sub.receive(msg, out);
129         }
130         return null;
131     }
132 
133     private void flushPendingFrames(final DataOutput out) throws IOException
134     {
135         boolean interrupted = false;
136         do
137         {
138             try
139             {
140                 byte[] frame = (byte[]) pendingWriteFrames.poll(0);
141                 if (frame == null) return;
142                 out.write(frame);
143             }
144             catch (InterruptedException e)
145             {
146                 interrupted = true;
147             }
148         }
149         while (interrupted);
150     }
151 
152     private void sendError(final String message)
153     {
154 //        System.err.println("sending error [" + message + "]");
155         AsyncHelper.tryUntilNotInterrupted(new AsyncHelper.Helper()
156         {
157             public void cycle() throws InterruptedException
158             {
159                 pendingWriteFrames.put(new FrameBuilder(Stomp.Responses.ERROR)
160                         .addHeader(Stomp.Headers.Error.MESSAGE, message)
161                         .toFrame());
162             }
163         });
164     }
165 
166     /**
167      * some transports may register their streams (e.g. Tcp)
168      *
169      * @param dataOut
170      * @param dataIn
171      */
172     public void registerTransportStreams(DataOutputStream dataOut, DataInputStream dataIn)
173     {
174         this.in = dataIn;
175     }
176 
177     /**
178      * Some wire formats require a handshake at start-up
179      *
180      * @throws java.io.IOException
181      */
182     public void initiateServerSideProtocol() throws IOException
183     {
184         BufferedReader in = new BufferedReader(new InputStreamReader(this.in));
185         String first_line = in.readLine();
186         if (!first_line.startsWith(Stomp.Commands.CONNECT))
187         {
188             throw new IOException("First line does not begin with with " + Stomp.Commands.CONNECT);
189         }
190 
191         Properties headers = headerParser.parse(in);
192         //if (!headers.containsKey(TTMP.Headers.Connect.LOGIN))
193         //    System.err.println("Required header [" + TTMP.Headers.Connect.LOGIN + "] missing");
194         //if (!headers.containsKey(TTMP.Headers.Connect.PASSCODE))
195         //    System.err.println("Required header [" + TTMP.Headers.Connect.PASSCODE + "] missing");
196 
197         // allow anyone to login for now
198 
199         String login = headers.getProperty(Stomp.Headers.Connect.LOGIN);
200         String passcode = headers.getProperty(Stomp.Headers.Connect.PASSCODE);
201 
202         // skip to end of the packet
203         while (in.read() != 0) {}
204         final ConnectionInfo info = new ConnectionInfo();
205         final Short packet_id = new Short(PACKET_IDS.getNextShortSequence());
206         clientId = clientIds.generateId();
207         commandParser.setClientId(clientId);
208 
209         info.setClientId(clientId);
210         info.setReceiptRequired(true);
211         info.setClientVersion(Integer.toString(getCurrentWireFormatVersion()));
212         info.setClosed(false);
213         info.setHostName("ttmp.fake.host.name");
214         info.setId(packet_id.shortValue());
215         info.setUserName(login);
216         info.setPassword(passcode);
217         info.setStartTime(System.currentTimeMillis());
218         info.setStarted(true);
219 
220         AsyncHelper.tryUntilNotInterrupted(new AsyncHelper.Helper()
221         {
222             public void cycle() throws InterruptedException
223             {
224                 pendingReadPackets.put(info);
225             }
226         });
227 
228         addReceiptListener(new ReceiptListener()
229         {
230             public boolean onReceipt(Receipt receipt, DataOutput out)
231             {
232                 if (receipt.getCorrelationId() != packet_id.shortValue()) return false;
233                 final Short session_packet_id = new Short(PACKET_IDS.getNextShortSequence());
234                 sessionId = clientIds.getNextShortSequence();
235 
236                 final SessionInfo info = new SessionInfo();
237                 info.setStartTime(System.currentTimeMillis());
238                 info.setId(session_packet_id.shortValue());
239                 info.setClientId(clientId);
240                 info.setSessionId(sessionId);
241                 info.setStarted(true);
242                 info.setSessionMode(Session.AUTO_ACKNOWLEDGE);
243                 info.setReceiptRequired(true);
244 
245                 AsyncHelper.tryUntilNotInterrupted(new AsyncHelper.Helper()
246                 {
247                     public void cycle() throws InterruptedException
248                     {
249                         pendingReadPackets.put(info);
250                     }
251                 });
252 
253                 addReceiptListener(new ReceiptListener()
254                 {
255                     public boolean onReceipt(Receipt receipt, DataOutput out) throws IOException
256                     {
257                         if (receipt.getCorrelationId() != session_packet_id.shortValue()) return false;
258                         StringBuffer buffer = new StringBuffer();
259                         buffer.append(Stomp.Responses.CONNECTED).append(Stomp.NEWLINE);
260                         buffer.append(Stomp.Headers.Connected.SESSION)
261                                 .append(Stomp.Headers.SEPERATOR)
262                                 .append(clientId)
263                                 .append(Stomp.NEWLINE)
264                                 .append(Stomp.NEWLINE);
265                         buffer.append(Stomp.NULL);
266                         out.writeBytes(buffer.toString());
267                         return true;
268                     }
269                 });
270 
271                 return true;
272             }
273         });
274     }
275 
276     /**
277      * Creates a new copy of this wire format so it can be used in another thread/context
278      */
279     public WireFormat copy()
280     {
281         return new StompWireFormat();
282     }
283 
284     /* Stuff below here is leaky stuff we don't actually need */
285 
286     /**
287      * Some wire formats require a handshake at start-up
288      *
289      * @throws java.io.IOException
290      */
291     public void initiateClientSideProtocol() throws IOException
292     {
293         throw new UnsupportedOperationException("Not yet implemented!");
294     }
295 
296     /**
297      * Can this wireformat process packets of this version
298      *
299      * @param version the version number to test
300      * @return true if can accept the version
301      */
302     public boolean canProcessWireFormatVersion(int version)
303     {
304         return version == getCurrentWireFormatVersion();
305     }
306 
307     /**
308      * @return the current version of this wire format
309      */
310     public int getCurrentWireFormatVersion()
311     {
312         return 1;
313     }
314 
315     /**
316      * @return Returns the enableCaching.
317      */
318     public boolean isCachingEnabled()
319     {
320         return false;
321     }
322 
323     /**
324      * @param enableCaching The enableCaching to set.
325      */
326     public void setCachingEnabled(boolean enableCaching)
327     {
328         // never
329     }
330 
331     /**
332      * some wire formats will implement their own fragementation
333      *
334      * @return true unless a wire format supports it's own fragmentation
335      */
336     public boolean doesSupportMessageFragmentation()
337     {
338         return false;
339     }
340 
341     /**
342      * Some wire formats will not be able to understand compressed messages
343      *
344      * @return true unless a wire format cannot understand compression
345      */
346     public boolean doesSupportMessageCompression()
347     {
348         return false;
349     }
350 
351     /**
352      * Writes the given package to a new datagram
353      *
354      * @param channelID is the unique channel ID
355      * @param packet    is the packet to write
356      * @return
357      * @throws java.io.IOException
358      * @throws javax.jms.JMSException
359      */
360     public DatagramPacket writePacket(String channelID, Packet packet) throws IOException, JMSException
361     {
362         throw new UnsupportedOperationException("Will not be implemented");
363     }
364 
365     /**
366      * Reads the packet from the given byte[]
367      *
368      * @param bytes
369      * @param offset
370      * @param length
371      * @return
372      * @throws java.io.IOException
373      */
374     public Packet fromBytes(byte[] bytes, int offset, int length) throws IOException
375     {
376         throw new UnsupportedOperationException("Will not be implemented");
377     }
378 
379     /**
380      * Reads the packet from the given byte[]
381      *
382      * @param bytes
383      * @return
384      * @throws java.io.IOException
385      */
386     public Packet fromBytes(byte[] bytes) throws IOException
387     {
388         throw new UnsupportedOperationException("Will not be implemented");
389     }
390 
391     /**
392      * A helper method which converts a packet into a byte array
393      *
394      * @param packet
395      * @return a byte array representing the packet using some wire protocol
396      * @throws java.io.IOException
397      * @throws javax.jms.JMSException
398      */
399     public byte[] toBytes(Packet packet) throws IOException, JMSException
400     {
401         throw new UnsupportedOperationException("Will not be implemented");
402     }
403 
404     /**
405      * A helper method for working with sockets where the first byte is read
406      * first, then the rest of the message is read.
407      * <p/>
408      * Its common when dealing with sockets to have different timeout semantics
409      * until the first non-zero byte is read of a message, after which
410      * time a zero timeout is used.
411      *
412      * @param firstByte the first byte of the packet
413      * @param in        the rest of the packet
414      * @return
415      * @throws java.io.IOException
416      */
417     public Packet readPacket(int firstByte, DataInput in) throws IOException
418     {
419         throw new UnsupportedOperationException("Will not be implemented");
420     }
421 
422     /**
423      * Read a packet from a Datagram packet from the given channelID. If the
424      * packet is from the same channel ID as it was sent then we have a
425      * loop-back so discard the packet
426      *
427      * @param channelID is the unique channel ID
428      * @param dpacket
429      * @return the packet read from the datagram or null if it should be
430      *         discarded
431      * @throws java.io.IOException
432      */
433     public Packet readPacket(String channelID, DatagramPacket dpacket) throws IOException
434     {
435         throw new UnsupportedOperationException("Will not be implemented");
436     }
437 
438     void clearTransactionId(String user_tx_id)
439     {
440         this.transactions.remove(user_tx_id);
441     }
442 
443     String getClientId()
444     {
445         return this.clientId;
446     }
447 
448     public short getSessionId()
449     {
450         return sessionId;
451     }
452 
453     public void addSubscription(Subscription s)
454     {
455         if (subscriptions.containsKey(s.getDestination()))
456         {
457             Subscription old = (Subscription) subscriptions.get(s.getDestination());
458             ConsumerInfo p = old.close();
459             enqueuePacket(p);
460             subscriptions.put(s.getDestination(), s);
461         }
462         else
463         {
464             subscriptions.put(s.getDestination(), s);
465         }
466     }
467 
468     public void enqueuePacket(final Packet ack)
469     {
470         AsyncHelper.tryUntilNotInterrupted(new AsyncHelper.Helper()
471         {
472             public void cycle() throws InterruptedException
473             {
474                 pendingReadPackets.put(ack);
475             }
476         });
477     }
478 
479     public Subscription getSubscriptionFor(ActiveMQDestination destination)
480     {
481         return (Subscription) subscriptions.get(destination);
482     }
483 
484     public void addAckListener(AckListener listener)
485     {
486         this.ackListeners.add(listener);
487     }
488 
489     public List getAckListeners()
490     {
491         return ackListeners;
492     }
493 
494     public String getTransactionId(String key)
495     {
496         return (String) transactions.get(key);
497     }
498 
499     public void registerTransactionId(String user_tx_id, String tx_id)
500     {
501         transactions.put(user_tx_id, tx_id);
502     }
503 }