Docjar: A Java Source and Docuemnt Enginecom.*    java.*    javax.*    org.*    all    new    plug-in

Quick Search    Search Deep

Source code: org/activemq/transport/jabber/JabberWireFormat.java


1   /** 
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  package org.activemq.transport.jabber;
19  
20  import java.io.DataInput;
21  import java.io.DataOutput;
22  import java.io.IOException;
23  import java.io.Serializable;
24  import java.util.HashMap;
25  import java.util.Iterator;
26  import java.util.Map;
27  
28  import javax.jms.JMSException;
29  
30  import org.apache.commons.logging.Log;
31  import org.apache.commons.logging.LogFactory;
32  import org.activemq.io.AbstractWireFormat;
33  import org.activemq.io.WireFormat;
34  import org.activemq.message.ActiveMQBytesMessage;
35  import org.activemq.message.ActiveMQMessage;
36  import org.activemq.message.ActiveMQObjectMessage;
37  import org.activemq.message.ActiveMQTextMessage;
38  import org.activemq.message.Packet;
39  import org.activemq.io.util.ByteArray;
40  
41  /**
42   * A wire format which uses XMPP format of messages
43   *
44   * @version $Revision: 1.1 $
45   */
46  public class JabberWireFormat extends AbstractWireFormat {
47      private static final Log log = LogFactory.getLog(JabberWireFormat.class);
48  
49      public WireFormat copy() {
50          return new JabberWireFormat();
51      }
52  
53      public Packet readPacket(DataInput in) throws IOException {
54          return null;  /** TODO */
55      }
56  
57      public Packet readPacket(int firstByte, DataInput in) throws IOException {
58          return null;  /** TODO */
59      }
60  
61      public Packet writePacket(Packet packet, DataOutput out) throws IOException, JMSException {
62          switch (packet.getPacketType()) {
63              case Packet.ACTIVEMQ_MESSAGE:
64                  writeMessage((ActiveMQMessage) packet, "", out);
65                  break;
66  
67              case Packet.ACTIVEMQ_TEXT_MESSAGE:
68                  writeTextMessage((ActiveMQTextMessage) packet, out);
69                  break;
70  
71              case Packet.ACTIVEMQ_BYTES_MESSAGE:
72                  writeBytesMessage((ActiveMQBytesMessage) packet, out);
73                  break;
74  
75              case Packet.ACTIVEMQ_OBJECT_MESSAGE:
76                  writeObjectMessage((ActiveMQObjectMessage) packet, out);
77                  break;
78  
79              case Packet.ACTIVEMQ_MAP_MESSAGE:
80              case Packet.ACTIVEMQ_STREAM_MESSAGE:
81  
82  
83              case Packet.ACTIVEMQ_BROKER_INFO:
84              case Packet.ACTIVEMQ_CONNECTION_INFO:
85              case Packet.ACTIVEMQ_MSG_ACK:
86              case Packet.CONSUMER_INFO:
87              case Packet.DURABLE_UNSUBSCRIBE:
88              case Packet.INT_RESPONSE_RECEIPT_INFO:
89              case Packet.PRODUCER_INFO:
90              case Packet.RECEIPT_INFO:
91              case Packet.RESPONSE_RECEIPT_INFO:
92              case Packet.SESSION_INFO:
93              case Packet.TRANSACTION_INFO:
94              case Packet.XA_TRANSACTION_INFO:
95              default:
96                  log.warn("Ignoring message type: " + packet.getPacketType() + " packet: " + packet);
97          }
98          return null;
99      }
100     
101     /**
102      * Can this wireformat process packets of this version
103      * @param version the version number to test
104      * @return true if can accept the version
105      */
106     public boolean canProcessWireFormatVersion(int version){
107         return true;
108     }
109     
110     /**
111      * @return the current version of this wire format
112      */
113     public int getCurrentWireFormatVersion(){
114         return 1;
115     }
116 
117     // Implementation methods
118     //-------------------------------------------------------------------------
119     protected void writeObjectMessage(ActiveMQObjectMessage message, DataOutput out) throws JMSException, IOException {
120         Serializable object = message.getObject();
121         String text = (object != null) ? object.toString() : "";
122         writeMessage(message, text, out);
123     }
124 
125     protected void writeTextMessage(ActiveMQTextMessage message, DataOutput out) throws JMSException, IOException {
126         writeMessage(message, message.getText(), out);
127     }
128 
129     protected void writeBytesMessage(ActiveMQBytesMessage message, DataOutput out) throws IOException {
130         ByteArray data = message.getBodyAsBytes();
131         String text = encodeBinary(data.getBuf(),data.getOffset(),data.getLength());
132         writeMessage(message, text, out);
133     }
134 
135     protected void writeMessage(ActiveMQMessage message, String body, DataOutput out) throws IOException {
136         String type = getXmppType(message);
137 
138         StringBuffer buffer = new StringBuffer("<");
139         buffer.append(type);
140         buffer.append(" to='");
141         buffer.append(message.getJMSDestination().toString());
142         buffer.append("' from='");
143         buffer.append(message.getJMSReplyTo().toString());
144         String messageID = message.getJMSMessageID();
145         if (messageID != null) {
146             buffer.append("' id='");
147             buffer.append(messageID);
148         }
149 
150         HashMap properties = message.getProperties();
151         if (properties != null) {
152             for (Iterator iter = properties.entrySet().iterator(); iter.hasNext();) {
153                 Map.Entry entry = (Map.Entry) iter.next();
154                 Object key = entry.getKey();
155                 Object value = entry.getValue();
156                 if (value != null) {
157                     buffer.append("' ");
158                     buffer.append(key.toString());
159                     buffer.append("='");
160                     buffer.append(value.toString());
161                 }
162             }
163         }
164 
165         buffer.append("'>");
166 
167         String id = message.getJMSCorrelationID();
168         if (id != null) {
169             buffer.append("<thread>");
170             buffer.append(id);
171             buffer.append("</thread>");
172         }
173         buffer.append(body);
174         buffer.append("</");
175         buffer.append(type);
176         buffer.append(">");
177 
178         out.write(buffer.toString().getBytes());
179     }
180 
181     protected String encodeBinary(byte[] data,int offset,int length) {
182         // TODO
183         throw new RuntimeException("Not implemented yet!");
184     }
185 
186     protected String getXmppType(ActiveMQMessage message) {
187         String type = message.getJMSType();
188         if (type == null) {
189             type = "message";
190         }
191         return type;
192     }
193 }