Source code: org/activemq/transport/jabber/JabberWireFormat.java
1 /**
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18 package org.activemq.transport.jabber;
19
20 import java.io.DataInput;
21 import java.io.DataOutput;
22 import java.io.IOException;
23 import java.io.Serializable;
24 import java.util.HashMap;
25 import java.util.Iterator;
26 import java.util.Map;
27
28 import javax.jms.JMSException;
29
30 import org.apache.commons.logging.Log;
31 import org.apache.commons.logging.LogFactory;
32 import org.activemq.io.AbstractWireFormat;
33 import org.activemq.io.WireFormat;
34 import org.activemq.message.ActiveMQBytesMessage;
35 import org.activemq.message.ActiveMQMessage;
36 import org.activemq.message.ActiveMQObjectMessage;
37 import org.activemq.message.ActiveMQTextMessage;
38 import org.activemq.message.Packet;
39 import org.activemq.io.util.ByteArray;
40
41 /**
42 * A wire format which uses XMPP format of messages
43 *
44 * @version $Revision: 1.1 $
45 */
46 public class JabberWireFormat extends AbstractWireFormat {
47 private static final Log log = LogFactory.getLog(JabberWireFormat.class);
48
49 public WireFormat copy() {
50 return new JabberWireFormat();
51 }
52
53 public Packet readPacket(DataInput in) throws IOException {
54 return null; /** TODO */
55 }
56
57 public Packet readPacket(int firstByte, DataInput in) throws IOException {
58 return null; /** TODO */
59 }
60
61 public Packet writePacket(Packet packet, DataOutput out) throws IOException, JMSException {
62 switch (packet.getPacketType()) {
63 case Packet.ACTIVEMQ_MESSAGE:
64 writeMessage((ActiveMQMessage) packet, "", out);
65 break;
66
67 case Packet.ACTIVEMQ_TEXT_MESSAGE:
68 writeTextMessage((ActiveMQTextMessage) packet, out);
69 break;
70
71 case Packet.ACTIVEMQ_BYTES_MESSAGE:
72 writeBytesMessage((ActiveMQBytesMessage) packet, out);
73 break;
74
75 case Packet.ACTIVEMQ_OBJECT_MESSAGE:
76 writeObjectMessage((ActiveMQObjectMessage) packet, out);
77 break;
78
79 case Packet.ACTIVEMQ_MAP_MESSAGE:
80 case Packet.ACTIVEMQ_STREAM_MESSAGE:
81
82
83 case Packet.ACTIVEMQ_BROKER_INFO:
84 case Packet.ACTIVEMQ_CONNECTION_INFO:
85 case Packet.ACTIVEMQ_MSG_ACK:
86 case Packet.CONSUMER_INFO:
87 case Packet.DURABLE_UNSUBSCRIBE:
88 case Packet.INT_RESPONSE_RECEIPT_INFO:
89 case Packet.PRODUCER_INFO:
90 case Packet.RECEIPT_INFO:
91 case Packet.RESPONSE_RECEIPT_INFO:
92 case Packet.SESSION_INFO:
93 case Packet.TRANSACTION_INFO:
94 case Packet.XA_TRANSACTION_INFO:
95 default:
96 log.warn("Ignoring message type: " + packet.getPacketType() + " packet: " + packet);
97 }
98 return null;
99 }
100
101 /**
102 * Can this wireformat process packets of this version
103 * @param version the version number to test
104 * @return true if can accept the version
105 */
106 public boolean canProcessWireFormatVersion(int version){
107 return true;
108 }
109
110 /**
111 * @return the current version of this wire format
112 */
113 public int getCurrentWireFormatVersion(){
114 return 1;
115 }
116
117 // Implementation methods
118 //-------------------------------------------------------------------------
119 protected void writeObjectMessage(ActiveMQObjectMessage message, DataOutput out) throws JMSException, IOException {
120 Serializable object = message.getObject();
121 String text = (object != null) ? object.toString() : "";
122 writeMessage(message, text, out);
123 }
124
125 protected void writeTextMessage(ActiveMQTextMessage message, DataOutput out) throws JMSException, IOException {
126 writeMessage(message, message.getText(), out);
127 }
128
129 protected void writeBytesMessage(ActiveMQBytesMessage message, DataOutput out) throws IOException {
130 ByteArray data = message.getBodyAsBytes();
131 String text = encodeBinary(data.getBuf(),data.getOffset(),data.getLength());
132 writeMessage(message, text, out);
133 }
134
135 protected void writeMessage(ActiveMQMessage message, String body, DataOutput out) throws IOException {
136 String type = getXmppType(message);
137
138 StringBuffer buffer = new StringBuffer("<");
139 buffer.append(type);
140 buffer.append(" to='");
141 buffer.append(message.getJMSDestination().toString());
142 buffer.append("' from='");
143 buffer.append(message.getJMSReplyTo().toString());
144 String messageID = message.getJMSMessageID();
145 if (messageID != null) {
146 buffer.append("' id='");
147 buffer.append(messageID);
148 }
149
150 HashMap properties = message.getProperties();
151 if (properties != null) {
152 for (Iterator iter = properties.entrySet().iterator(); iter.hasNext();) {
153 Map.Entry entry = (Map.Entry) iter.next();
154 Object key = entry.getKey();
155 Object value = entry.getValue();
156 if (value != null) {
157 buffer.append("' ");
158 buffer.append(key.toString());
159 buffer.append("='");
160 buffer.append(value.toString());
161 }
162 }
163 }
164
165 buffer.append("'>");
166
167 String id = message.getJMSCorrelationID();
168 if (id != null) {
169 buffer.append("<thread>");
170 buffer.append(id);
171 buffer.append("</thread>");
172 }
173 buffer.append(body);
174 buffer.append("</");
175 buffer.append(type);
176 buffer.append(">");
177
178 out.write(buffer.toString().getBytes());
179 }
180
181 protected String encodeBinary(byte[] data,int offset,int length) {
182 // TODO
183 throw new RuntimeException("Not implemented yet!");
184 }
185
186 protected String getXmppType(ActiveMQMessage message) {
187 String type = message.getJMSType();
188 if (type == null) {
189 type = "message";
190 }
191 return type;
192 }
193 }