1 /*
2 * JBoss, Home of Professional Open Source
3 * Copyright 2005, JBoss Inc., and individual contributors as indicated
4 * by the @authors tag. See the copyright.txt in the distribution for a
5 * full listing of individual contributors.
6 *
7 * This is free software; you can redistribute it and/or modify it
8 * under the terms of the GNU Lesser General Public License as
9 * published by the Free Software Foundation; either version 2.1 of
10 * the License, or (at your option) any later version.
11 *
12 * This software is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this software; if not, write to the Free
19 * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
20 * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
21 */
22 package org.jboss.mq.il.uil2.msgs;
23
24 import java.io.ObjectOutputStream;
25 import java.io.IOException;
26 import java.io.ObjectInputStream;
27 import java.lang.reflect.UndeclaredThrowableException;
28
29 import org.jboss.mq.il.uil2.SocketManager.ReadTask;
30
31 /** The base msg class for all msgs used by the UIL2 invoker. Msgs consist
32 * of a msg type, id and exception and can operate as two way items that
33 * are sent with the request content and received with the reply content for
34 * the request. Such round-trip behavior is based on matching the request
35 * msgID with the reply msgID. The msgID parameter is segmented into value
36 * 1 to 2147483647 for client originated msgs and -1 to -2147483647 for server
37 * originated msgs.
38 *
39 * <p>The message is a Runnable to avoid constructing a Runnable object
40 * when asynchronously handling the message from the ReadTask.
41 *
42 * @author Scott.Stark@jboss.org
43 * @author Adrian.Brock@HappeningTimes.com
44 * @version $Revision: 45317 $
45 */
46 public class BaseMsg
47 implements Runnable
48 {
49 /** A flag indicating if the msgIDs are used by the JMS server */
50 private static boolean useJMSServerMsgIDs = false;
51 /** The next base msgID */
52 private static int nextMsgID = 0;
53 /** The lock for the next message id */
54 private static Object nextMsgIDLock = new Object();
55 /** 2^31+1 */
56 private static final int SERVER_MSG_ID_MASK = 0x80000000;
57
58 /** The handler of this message */
59 private ReadTask handler;
60
61 /** The MsgTypes constant representing the type of the msg */
62 public int msgType;
63 /** A msg id used to associated a reply with its request */
64 public int msgID;
65 /** Any error thrown by the remote side */
66 public Exception error;
67
68 public BaseMsg(int msgType)
69 {
70 this(msgType, 0);
71 }
72 public BaseMsg(int msgType, int msgID)
73 {
74 this.msgType = msgType;
75 this.msgID = msgID;
76 }
77
78 /** Set the msgID parameter range. If false, the msgID is segmented into the
79 * range 1 to 2147483647 and if true, the rangs is -1 to -2147483647. The
80 * JMS server sets this to true and clients default to false.
81 * @param flag
82 */
83 public static void setUseJMSServerMsgIDs(boolean flag)
84 {
85 useJMSServerMsgIDs = flag;
86 }
87
88 /** Create a BaseMsg subclass based on the msgType.
89 *
90 * @param msgType A MsgTypes.m_xxx constant
91 * @return the derived BaseMsg
92 * @throws IllegalArgumentException thrown for a msgType that does not
93 * match any MsgTypes.m_xxx constant
94 */
95 public static BaseMsg createMsg(int msgType) throws IllegalArgumentException
96 {
97 BaseMsg msg = null;
98 switch( msgType )
99 {
100 case MsgTypes.m_acknowledge:
101 msg = new AcknowledgementRequestMsg();
102 break;
103 case MsgTypes.m_addMessage:
104 msg = new AddMsg();
105 break;
106 case MsgTypes.m_browse:
107 msg = new BrowseMsg();
108 break;
109 case MsgTypes.m_checkID:
110 msg = new CheckIDMsg();
111 break;
112 case MsgTypes.m_connectionClosing:
113 msg = new CloseMsg();
114 break;
115 case MsgTypes.m_createQueue:
116 msg = new CreateDestMsg(true);
117 break;
118 case MsgTypes.m_createTopic:
119 msg = new CreateDestMsg(false);
120 break;
121 case MsgTypes.m_deleteTemporaryDestination:
122 msg = new DeleteTemporaryDestMsg();
123 break;
124 case MsgTypes.m_getID:
125 msg = new GetIDMsg();
126 break;
127 case MsgTypes.m_getTemporaryQueue:
128 msg = new TemporaryDestMsg(true);
129 break;
130 case MsgTypes.m_getTemporaryTopic:
131 msg = new TemporaryDestMsg(false);
132 break;
133 case MsgTypes.m_receive:
134 msg = new ReceiveMsg();
135 break;
136 case MsgTypes.m_setEnabled:
137 msg = new EnableConnectionMsg();
138 break;
139 case MsgTypes.m_setSpyDistributedConnection:
140 msg = new ConnectionTokenMsg();
141 break;
142 case MsgTypes.m_subscribe:
143 msg = new SubscribeMsg();
144 break;
145 case MsgTypes.m_transact:
146 msg = new TransactMsg();
147 break;
148 case MsgTypes.m_recover:
149 msg = new RecoverMsg();
150 break;
151 case MsgTypes.m_unsubscribe:
152 msg = new UnsubscribeMsg();
153 break;
154 case MsgTypes.m_destroySubscription:
155 msg = new DeleteSubscriptionMsg();
156 break;
157 case MsgTypes.m_checkUser:
158 msg = new CheckUserMsg(false);
159 break;
160 case MsgTypes.m_ping:
161 msg = new PingMsg(true);
162 break;
163 case MsgTypes.m_authenticate:
164 msg = new CheckUserMsg(true);
165 break;
166 case MsgTypes.m_close:
167 // This is never sent
168 break;
169 case MsgTypes.m_pong:
170 msg = new PingMsg(false);
171 break;
172 case MsgTypes.m_receiveRequest:
173 msg = new ReceiveRequestMsg();
174 break;
175 default:
176 throw new IllegalArgumentException("Invalid msgType: "+msgType);
177 }
178 return msg;
179 }
180
181 /** Translate a msgType into its string menmonic.
182 * @param msgType A MsgTypes.m_xxx constant
183 * @return the string form of the MsgTypes.m_xxx constant
184 */
185 public static String toString(int msgType)
186 {
187 String msgTypeString = null;
188 switch (msgType)
189 {
190 case MsgTypes.m_acknowledge:
191 msgTypeString = "m_acknowledge";
192 break;
193 case MsgTypes.m_addMessage:
194 msgTypeString = "m_addMessage";
195 break;
196 case MsgTypes.m_browse:
197 msgTypeString = "m_browse";
198 break;
199 case MsgTypes.m_checkID:
200 msgTypeString = "m_checkID";
201 break;
202 case MsgTypes.m_connectionClosing:
203 msgTypeString = "m_connectionClosing";
204 break;
205 case MsgTypes.m_createQueue:
206 msgTypeString = "m_createQueue";
207 break;
208 case MsgTypes.m_createTopic:
209 msgTypeString = "m_createTopic";
210 break;
211 case MsgTypes.m_deleteTemporaryDestination:
212 msgTypeString = "m_deleteTemporaryDestination";
213 break;
214 case MsgTypes.m_getID:
215 msgTypeString = "m_getID";
216 break;
217 case MsgTypes.m_getTemporaryQueue:
218 msgTypeString = "m_getTemporaryQueue";
219 break;
220 case MsgTypes.m_getTemporaryTopic:
221 msgTypeString = "m_getTemporaryTopic";
222 break;
223 case MsgTypes.m_receive:
224 msgTypeString = "m_receive";
225 break;
226 case MsgTypes.m_setEnabled:
227 msgTypeString = "m_setEnabled";
228 break;
229 case MsgTypes.m_setSpyDistributedConnection:
230 msgTypeString = "m_setSpyDistributedConnection";
231 break;
232 case MsgTypes.m_subscribe:
233 msgTypeString = "m_subscribe";
234 break;
235 case MsgTypes.m_transact:
236 msgTypeString = "m_transact";
237 break;
238 case MsgTypes.m_recover:
239 msgTypeString = "m_recover";
240 break;
241 case MsgTypes.m_unsubscribe:
242 msgTypeString = "m_unsubscribe";
243 break;
244 case MsgTypes.m_destroySubscription:
245 msgTypeString = "m_destroySubscription";
246 break;
247 case MsgTypes.m_checkUser:
248 msgTypeString = "m_checkUser";
249 break;
250 case MsgTypes.m_ping:
251 msgTypeString = "m_ping";
252 break;
253 case MsgTypes.m_authenticate:
254 msgTypeString = "m_authenticate";
255 break;
256 case MsgTypes.m_close:
257 msgTypeString = "m_close";
258 break;
259 case MsgTypes.m_pong:
260 msgTypeString = "m_pong";
261 break;
262 case MsgTypes.m_receiveRequest:
263 msgTypeString = "m_receiveRequest";
264 break;
265 default:
266 msgTypeString = "unknown message type " + msgType;
267 }
268 return msgTypeString;
269 }
270
271 public int getMsgType()
272 {
273 return msgType;
274 }
275
276 /** Access the msgID, initializing it if it has not been set yet. This
277 * is used by the SocketManager.internalSendMessage to setup the unique
278 * msgID for a request msg.
279 *
280 * @return the msgID value
281 */
282 public synchronized int getMsgID()
283 {
284 if( msgID == 0 )
285 {
286 synchronized (nextMsgIDLock)
287 {
288 msgID = ++ nextMsgID;
289 }
290 if( useJMSServerMsgIDs )
291 msgID += SERVER_MSG_ID_MASK;
292 else if( msgID >= SERVER_MSG_ID_MASK )
293 msgID = msgID % SERVER_MSG_ID_MASK;
294 }
295 return msgID;
296 }
297 /** Set the msgID. This is used by the SocketManager read task to populate
298 * a msg with its request ID.
299 * @param msgID the msgID read off the socket
300 */
301 public void setMsgID(int msgID)
302 {
303 this.msgID = msgID;
304 }
305
306 /** Access any exception associated with the msg
307 * @return
308 */
309 public Exception getError()
310 {
311 return error;
312 }
313 /** Set an exception that should be used as the msg return value.
314 *
315 * @param e
316 */
317 public void setError(Throwable e)
318 {
319 if( e instanceof Exception )
320 error = (Exception) e;
321 else
322 error = new UndeclaredThrowableException(e);
323 }
324
325 /** Equality is based on BaseMsg.msgID
326 * @param o a BaseMsg
327 * @return true if o.msgID == msgID
328 */
329 public boolean equals(Object o)
330 {
331 BaseMsg msg = (BaseMsg) o;
332 return msg.msgID == msgID;
333 }
334
335 /** Hash code is simply the msgID
336 * @return
337 */
338 public int hashCode()
339 {
340 return msgID;
341 }
342
343 public String toString()
344 {
345 StringBuffer tmp = new StringBuffer(this.getClass().getName());
346 tmp.append(System.identityHashCode(this));
347 tmp.append("[msgType: ");
348 tmp.append(toString(msgType));
349 tmp.append(", msgID: ");
350 tmp.append(msgID);
351 tmp.append(", error: ");
352 tmp.append(error);
353 tmp.append("]");
354 return tmp.toString();
355 }
356
357 /** Trim the message when replying
358 */
359 public void trimReply()
360 {
361 }
362
363 /** Write the msgType, msgID, hasError flag and optionally the error
364 * @param out
365 * @throws IOException
366 */
367 public void write(ObjectOutputStream out) throws IOException
368 {
369 out.writeByte(msgType);
370 out.writeInt(msgID);
371 int hasError = error != null ? 1 : 0;
372 out.writeByte(hasError);
373 if( hasError == 1 )
374 out.writeObject(error);
375 }
376 /** Read the hasError flag and optionally the error. This method is not
377 * a complete analog of write because the SocketManager read task reads
378 * the msgType and msgID off of the socket.
379 *
380 * @param in
381 * @throws IOException
382 * @throws ClassNotFoundException
383 */
384 public void read(ObjectInputStream in) throws IOException, ClassNotFoundException
385 {
386 int hasError = in.readByte();
387 if( hasError == 1 )
388 error = (Exception) in.readObject();
389 }
390
391 public void setHandler(ReadTask handler)
392 {
393 this.handler = handler;
394 }
395
396 public void run()
397 {
398 handler.handleMsg(this);
399 handler = null;
400 }
401 }