1 /*
2 * JBoss, Home of Professional Open Source.
3 * Copyright 2006, Red Hat Middleware LLC, and individual contributors
4 * as indicated by the @author tags. See the copyright.txt file in the
5 * distribution for a full listing of individual contributors.
6 *
7 * This is free software; you can redistribute it and/or modify it
8 * under the terms of the GNU Lesser General Public License as
9 * published by the Free Software Foundation; either version 2.1 of
10 * the License, or (at your option) any later version.
11 *
12 * This software is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this software; if not, write to the Free
19 * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
20 * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
21 */
22 package org.jboss.mq.il.uil2;
23
24 import java.rmi.RemoteException;
25 import javax.jms.Destination;
26 import javax.transaction.xa.Xid;
27
28 import org.jboss.logging.Logger;
29 import org.jboss.mq.ConnectionToken;
30 import org.jboss.mq.AcknowledgementRequest;
31 import org.jboss.mq.Recoverable;
32 import org.jboss.mq.SpyMessage;
33 import org.jboss.mq.SpyDestination;
34 import org.jboss.mq.TransactionRequest;
35 import org.jboss.mq.DurableSubscriptionID;
36 import org.jboss.mq.il.uil2.msgs.BaseMsg;
37 import org.jboss.mq.il.uil2.msgs.ConnectionTokenMsg;
38 import org.jboss.mq.il.uil2.msgs.AcknowledgementRequestMsg;
39 import org.jboss.mq.il.uil2.msgs.MsgTypes;
40 import org.jboss.mq.il.uil2.msgs.AddMsg;
41 import org.jboss.mq.il.uil2.msgs.BrowseMsg;
42 import org.jboss.mq.il.uil2.msgs.CheckIDMsg;
43 import org.jboss.mq.il.uil2.msgs.CreateDestMsg;
44 import org.jboss.mq.il.uil2.msgs.DeleteTemporaryDestMsg;
45 import org.jboss.mq.il.uil2.msgs.GetIDMsg;
46 import org.jboss.mq.il.uil2.msgs.RecoverMsg;
47 import org.jboss.mq.il.uil2.msgs.TemporaryDestMsg;
48 import org.jboss.mq.il.uil2.msgs.ReceiveMsg;
49 import org.jboss.mq.il.uil2.msgs.EnableConnectionMsg;
50 import org.jboss.mq.il.uil2.msgs.SubscribeMsg;
51 import org.jboss.mq.il.uil2.msgs.TransactMsg;
52 import org.jboss.mq.il.uil2.msgs.UnsubscribeMsg;
53 import org.jboss.mq.il.uil2.msgs.DeleteSubscriptionMsg;
54 import org.jboss.mq.il.uil2.msgs.CheckUserMsg;
55 import org.jboss.mq.il.uil2.msgs.PingMsg;
56 import org.jboss.mq.il.Invoker;
57
58 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
59
60 /** This is the SocketManager callback handler for the UIL2 server side
61 * socket. This handles messages that are requests from clients.
62 *
63 * @author Scott.Stark@jboss.org
64 * @version $Revision: 61548 $
65 */
66 public class ServerSocketManagerHandler implements MsgTypes, SocketManagerHandler
67 {
68 private static Logger log = Logger.getLogger(ServerSocketManagerHandler.class);
69
70 private ConnectionToken connectionToken;
71 private Invoker server;
72 private SocketManager socketMgr;
73 private UILServerILService uilServerILService;
74 private SynchronizedBoolean closed = new SynchronizedBoolean(false);
75
76 public ServerSocketManagerHandler(Invoker server, SocketManager socketMgr, UILServerILService uilServerILService)
77 {
78 this.server = server;
79 this.socketMgr = socketMgr;
80 this.uilServerILService = uilServerILService;
81 }
82
83 /** The callback from the SocketManager
84 * @param msg
85 */
86 public void handleMsg(BaseMsg msg)
87 throws Exception
88 {
89 boolean trace = log.isTraceEnabled();
90 int msgType = msg.getMsgType();
91 if (trace)
92 log.trace("Begin handleMsg, msgType: " + msgType);
93
94 switch (msgType)
95 {
96 case m_setSpyDistributedConnection:
97 log.debug("Setting up the UILClientIL Connection");
98 ConnectionTokenMsg cmsg = (ConnectionTokenMsg) msg;
99 connectionToken = cmsg.getToken();
100 UILClientIL clientIL = (UILClientIL) connectionToken.clientIL;
101 clientIL.setSocketMgr(socketMgr);
102 socketMgr.sendReply(msg);
103 log.debug("The UILClientIL Connection is set up");
104 break;
105 case m_acknowledge:
106 AcknowledgementRequestMsg ackmsg = (AcknowledgementRequestMsg) msg;
107 AcknowledgementRequest ack = ackmsg.getAck();
108 server.acknowledge(connectionToken, ack);
109 // We send the reply although on newer clients it is ignored.
110 socketMgr.sendReply(msg);
111 break;
112 case m_addMessage:
113 AddMsg amsg = (AddMsg) msg;
114 server.addMessage(connectionToken, amsg.getMsg());
115 socketMgr.sendReply(msg);
116 break;
117 case m_browse:
118 BrowseMsg bmsg = (BrowseMsg) msg;
119 SpyMessage[] msgs = server.browse(connectionToken, bmsg.getDest(), bmsg.getSelector());
120 bmsg.setMessages(msgs);
121 socketMgr.sendReply(msg);
122 break;
123 case m_checkID:
124 CheckIDMsg idmsg = (CheckIDMsg) msg;
125 String ID = idmsg.getID();
126 server.checkID(ID);
127 if (connectionToken != null)
128 connectionToken.setClientID(ID);
129 socketMgr.sendReply(msg);
130 break;
131 case m_connectionClosing:
132 server.connectionClosing(connectionToken);
133 closed.set(true);
134 socketMgr.sendReply(msg);
135 socketMgr.stop();
136 break;
137 case m_createQueue:
138 CreateDestMsg cqmsg = (CreateDestMsg) msg;
139 Destination queue = server.createQueue(connectionToken, cqmsg.getName());
140 cqmsg.setDest(queue);
141 socketMgr.sendReply(msg);
142 break;
143 case m_createTopic:
144 CreateDestMsg ctmsg = (CreateDestMsg) msg;
145 Destination topic = server.createTopic(connectionToken, ctmsg.getName());
146 ctmsg.setDest(topic);
147 socketMgr.sendReply(msg);
148 break;
149 case m_deleteTemporaryDestination:
150 DeleteTemporaryDestMsg dtdmsg = (DeleteTemporaryDestMsg) msg;
151 SpyDestination tmpdest = dtdmsg.getDest();
152 server.deleteTemporaryDestination(connectionToken, tmpdest);
153 socketMgr.sendReply(msg);
154 break;
155 case m_getID:
156 GetIDMsg gidmsg = (GetIDMsg) msg;
157 String gid = server.getID();
158 if (connectionToken != null)
159 connectionToken.setClientID(gid);
160 gidmsg.setID(gid);
161 socketMgr.sendReply(msg);
162 break;
163 case m_getTemporaryQueue:
164 TemporaryDestMsg tqmsg = (TemporaryDestMsg) msg;
165 Destination tmpQueue = server.getTemporaryQueue(connectionToken);
166 tqmsg.setDest(tmpQueue);
167 socketMgr.sendReply(msg);
168 break;
169 case m_getTemporaryTopic:
170 TemporaryDestMsg ttmsg = (TemporaryDestMsg) msg;
171 Destination tmpTopic = server.getTemporaryTopic(connectionToken);
172 ttmsg.setDest(tmpTopic);
173 socketMgr.sendReply(msg);
174 break;
175 case m_receive:
176 ReceiveMsg rmsg = (ReceiveMsg) msg;
177 SpyMessage reply = server.receive(connectionToken, rmsg.getSubscriberID(), rmsg.getWait());
178 rmsg.setMessage(reply);
179 socketMgr.sendReply(msg);
180 break;
181 case m_setEnabled:
182 EnableConnectionMsg ecmsg = (EnableConnectionMsg) msg;
183 server.setEnabled(connectionToken, ecmsg.isEnabled());
184 socketMgr.sendReply(msg);
185 break;
186 case m_subscribe:
187 SubscribeMsg smsg = (SubscribeMsg) msg;
188 server.subscribe(connectionToken, smsg.getSubscription());
189 socketMgr.sendReply(msg);
190 break;
191 case m_transact:
192 TransactMsg tmsg = (TransactMsg) msg;
193 TransactionRequest trans = tmsg.getRequest();
194 server.transact(connectionToken, trans);
195 socketMgr.sendReply(msg);
196 break;
197 case m_recover:
198 RecoverMsg recmsg = (RecoverMsg) msg;
199 int flags = recmsg.getFlags();
200 if (server instanceof Recoverable)
201 {
202 Recoverable recoverable = (Recoverable) server;
203 Xid[] xids = recoverable.recover(connectionToken, flags);
204 recmsg.setXids(xids);
205 socketMgr.sendReply(msg);
206 break;
207 }
208 throw new IllegalStateException("Invoker does not implement recoverable " + server);
209 case m_unsubscribe:
210 UnsubscribeMsg umsg = (UnsubscribeMsg) msg;
211 server.unsubscribe(connectionToken, umsg.getSubscriptionID());
212 socketMgr.sendReply(msg);
213 break;
214 case m_destroySubscription:
215 DeleteSubscriptionMsg dsmsg = (DeleteSubscriptionMsg) msg;
216 DurableSubscriptionID dsub = dsmsg.getSubscriptionID();
217 server.destroySubscription(connectionToken, dsub);
218 socketMgr.sendReply(msg);
219 break;
220 case m_checkUser:
221 CheckUserMsg cumsg = (CheckUserMsg) msg;
222 String uid = server.checkUser(cumsg.getUsername(), cumsg.getPassword());
223 cumsg.setID(uid);
224 cumsg.clearPassword();
225 socketMgr.sendReply(msg);
226 break;
227 case m_ping:
228 PingMsg ping = (PingMsg) msg;
229 server.ping(connectionToken, ping.getTime());
230 break;
231 case m_pong:
232 break;
233 // Ignore, this is an old client, that still wants to send us replies
234 case m_receiveRequest:
235 break;
236 case m_authenticate:
237 CheckUserMsg cumsg2 = (CheckUserMsg) msg;
238 String sessionID = server.authenticate(cumsg2.getUsername(), cumsg2.getPassword());
239 cumsg2.setID(sessionID);
240 cumsg2.clearPassword();
241 socketMgr.sendReply(msg);
242 break;
243 default:
244 throw new RemoteException("Unknown msgType: "+msgType);
245 }
246 if (trace)
247 log.trace("End handleMsg, msgType: " + msgType);
248 }
249
250 public void onStreamNotification(Object stream, int size)
251 {
252 }
253
254 public void asynchFailure(String error, Throwable e)
255 {
256 log.debug(error, e);
257 }
258
259 public void close()
260 {
261 try
262 {
263 uilServerILService.removeHandler(this);
264 if (closed.get() == false)
265 server.connectionClosing(connectionToken);
266 }
267 catch (Exception e)
268 {
269 log.debug("Error closing connection: ", e);
270 }
271 }
272 }