Source code: proxy/protocol/ProxyRequestManager.java
1 /* czy to http wywoluje moja metode processRequest, kiedy cos przyjdzie? */
2
3 /** Java class "RequestManager.java" generated from Poseidon for UML.
4 * Poseidon for UML is developed by <A HREF="http://www.gentleware.com">Gentleware</A>.
5 * Generated with <A HREF="http://jakarta.apache.org/velocity/">velocity</A> template engine.
6 */
7
8 /*TODO - z timeoutem */
9
10 /*TODO - Session recover, wrzucanie wiadomosci do JMSmanagera */
11 package proxy.protocol;
12
13 import java.util.*;
14 import proxy.bearer.http.*;
15 import proxy.jmsapi.*;
16 //import proxy.jndi.*;
17 import proxy.ProxyServer;
18 import javax.jms.*;
19 import javax.naming.*;
20
21 /**
22 * <p>
23 * This class provides an implementation of a thread which manages requests
24 * comming from mobile devices. The request are received in the order of their
25 * ids. The receipt protocol ensures that no request is lost and that they come
26 * in the desired order.
27 * </p>
28 * <p>
29 * The requests are stored in a queue and processed sequentially. For each request
30 * the appropriate action associated with this request is executed and a reply
31 * is generated. The reply is then added to the reply queue. The reply queue
32 * is emptied by the replier thread which send the replies to the mobile
33 * device using the bearer layer.
34 *
35 * </p>
36 *
37 * @author Pawel Koziol
38 */
39 public class ProxyRequestManager extends Thread implements mobile.bearer.http.Protocol{
40
41 /**
42 * <p>
43 * For each mobile contains a hash table mapping the id of the last request which was received from an application.
44 * Thus the hash table has two levels - the first one mapping the mobile nr
45 * to the second hash table and the second mapping the owner id to the
46 * last request id.
47 * </p>
48 */
49 private Hashtable currentIds = new Hashtable(); //of HashTable objects
50
51 /**
52 * <p>
53 * For each mobile contains a sorted collection of waiting requests which couldn't
54 * have been processed because a previous request was missing
55 * </p>
56 */
57 private Hashtable waiting = new Hashtable(); //of type TreeSet
58
59 /**
60 * <p>
61 *
62 * </p>
63 */
64
65 private TalkingWithMobiles talking;
66
67 /**
68 * <p>
69 *
70 * </p>
71 */
72 private JMSObjectManager manager = ProxyServer.getObjectManager();
73
74 private static final int MESSAGES_PER_PACK = 1;
75
76 private Replier replier;
77
78 ///////////////////////////////////////
79 // associations
80
81 /**
82 * <p>
83 *
84 * </p>
85 */
86 private LinkedList requestQueue; // of type Request
87 /**
88 * <p>
89 *
90 * </p>
91 */
92 private LinkedList replyQueue;// of type Reply
93
94
95 ///////////////////////////////////////
96 // operations
97
98 public void setTalkingWithMobiles(TalkingWithMobiles _twm)
99 {
100 talking = _twm;
101 }
102
103 public void setObjectManager(JMSObjectManager _om)
104 {
105 manager = _om;
106 }
107
108
109 /**adds the message to the server logs - for debugging*/
110
111 private void log(String msg){
112 System.out.println("PRM: " + msg);
113 }
114 /**
115 * <p>
116 * Receives the next request, acknowledges its receipt and adds it to the request
117 * queue. Provides that the requests are added to the queue in the desired order
118 * and that no request is lost.
119 * </p><p>
120 *
121 * @param req The request to be processed.
122 * </p><p>
123 *
124 * </p>
125 */
126 synchronized public void processRequest(Request req) {
127 int acknArray[] = new int[1];
128 Integer mob = new Integer(req.mobile);
129 Integer own = new Integer(req.owner);
130 TreeSet waitSet; /* sorted collection of waiting requests; contained in the waiting hashtable */
131 Hashtable secondId, secondWait; /*the second level hash tables*/
132
133 log("processRequest id:" + req.id);
134 secondId = (Hashtable) currentIds.get(mob);
135 if(secondId == null) {
136 secondId = new Hashtable();
137 currentIds.put(mob, secondId);
138 }
139 Integer lastId = (Integer) secondId.get(own);
140 if (lastId == null) {
141 lastId = new Integer(0);
142 secondId.put(own, lastId);
143 }
144
145 secondWait = (Hashtable) waiting.get(mob);
146 if(secondWait == null) {
147 secondWait = new Hashtable();
148 waiting.put(mob, secondWait);
149 }
150 waitSet = (TreeSet) secondWait.get(own);
151 if(waitSet == null){
152 waitSet = new TreeSet();
153 secondWait.put(own, waitSet);
154 }
155
156 //lastId = (Integer) secondId.get(own);
157 //waitSet = (TreeSet) secondWait.get(own);
158
159 /*if(req.id == 0){
160 currentIds.put(mob, new Integer(0));
161 waiting.put(mob, new TreeSet());
162 //powinno byc talking.putAcknowledge(own,mob, acknArray);
163 acknArray[0] = 0;
164 talking.putAcknowledge(req.owner, req.mobile, acknArray);
165 }
166 else{*/
167 if (req.id == (lastId.intValue() + 1)) {
168 /* this request comes next */
169 requestQueue.addLast(req);
170 acknArray[0] = req.id;
171 talking.putAcknowledge(req.owner, req.mobile, acknArray);
172 lastId = new Integer(lastId.intValue() + 1);
173 secondId.put(own, lastId);
174
175 /* check if the waiting requests can come next */
176 //waitSet = (TreeSet) waiting.get(mob);
177 if(!waitSet.isEmpty()){
178 Request nextRequest = (Request) waitSet.first();
179 while(nextRequest.id == lastId.intValue() + 1){
180 requestQueue.addLast(nextRequest);
181 waitSet.remove(nextRequest);
182 lastId = new Integer(lastId.intValue() + 1);
183 secondId.put(own, lastId);
184 try{
185 nextRequest = (Request) waitSet.first();
186 }
187 catch(NoSuchElementException e){
188 break;
189 }
190 } //end while
191 } //end check waiting requests
192 }
193 else{ /* this is a previous (already processed) request or a request that needs to wait */
194 if(req.id < lastId.intValue() + 1){
195 /*previous request */
196 acknArray[0] = req.id;
197 talking.putAcknowledge(req.owner, req.mobile, acknArray);
198 }
199 else{
200 /*this request needs to wait */
201 //waitSet = (TreeSet) waiting.get(mob);
202 waitSet.add(req);
203 acknArray[0] = req.id;
204 talking.putAcknowledge(req.owner , req.mobile, acknArray);
205 }
206
207 }
208
209 //}
210 //putAcknowledge(req);
211 // your code here
212 } // end processRequest
213
214 /**
215 * <p>
216 * Adds the reply to the reply queue.
217 * </p><p>
218 *
219 * @param rep The reply to be processed.
220 * </p><p>
221 *
222 * </p>
223 */
224 synchronized public void processReply(Reply rep) {
225 replyQueue.addLast(rep);
226 // your code here
227 } // end processReply
228
229 /**
230 * <p>
231 * Initializes the queues and starts the main thread.
232 * </p>
233 */
234
235 public ProxyRequestManager(){
236 requestQueue = new LinkedList();
237 replyQueue = new LinkedList();
238 replier = new Replier(this);
239 //replier.start(); //czy to jest potrzebne?
240 //start();
241 }
242
243 private Reply handleExc(Request req, int type, String name){
244 Reply reply = new Reply(req);
245 Object[] replyData = new Object[1];
246 replyData[0] = name;
247 reply.setData(replyData);
248 reply.code = type;
249 return reply;
250 }
251
252 private Reply handleRequest(Request req){
253 try{
254 int objId = 0; /*to look up objects in JMSObjectManager */
255 boolean cond = false;
256 int mode = 0;
257 InitialContext context = ProxyServer.proxyInitialContext;
258 QueueConnectionFactory queueConnectionFactory = null;
259 TopicConnectionFactory topicConnectionFactory = null;
260 QueueConnection queueConnection = null;
261 TopicConnection topicConnection = null;
262 Connection connection = null;
263 QueueSession queueSession = null;
264 TopicSession topicSession = null;
265 Session session = null;
266 Queue queue = null;
267 Topic topic = null;
268 QueueSender queueSender = null;
269 TopicPublisher topicPublisher = null;
270 MessageProducer producer = null;
271 QueueReceiver queueReceiver = null;
272 TopicSubscriber topicSubscriber = null;
273 MessageConsumer messageConsumer = null;
274 Message message = null;
275 TextMessage textMessage = null;
276 BytesMessage bytesMessage = null;
277 Destination replyTo = null, dest = null;
278 Object tempObj = null;
279 ProxyStoredObject jmsObject = null;
280
281
282 String text = null;
283 Reply reply = new Reply(req);
284 Object[] replyData = null;
285 Object[] data = null;
286 ProxyStoredObjectType type = new ProxyStoredObjectType();
287
288 // log("begin handle request: code" + req.code + "owner" + req.owner + "mesg " + (String) req.getData()[0]);
289
290 switch(req.code){
291 case LOOKUP:
292 try {
293 //log( (String) req.getData()[0]);
294
295 queueConnectionFactory = (QueueConnectionFactory) context.lookup( ( (
296 String) req.getData()[0]));
297 jmsObject = manager.newObject(req.owner, req.mobile,
298 type.QUEUE_(type.CONNECTION_FACTORY));
299
300 // log("JMSObject.id = " + jmsObject.id);
301 jmsObject.setObj(queueConnectionFactory);
302 manager.addObject(jmsObject);
303 reply = new Reply(req);
304 replyData = new Object[1];
305 replyData[0] = new Integer(jmsObject.id);
306
307 reply.code = REP_QUEUE_CONNECTION_FACTORY;
308 reply.setData(replyData);
309
310 log("zwracam QCF");
311 //if (reply.getData() == null)
312 // log("niemozliwe");
313 // else
314 // log("w handle jeszcze repData jest ok");
315 // log("reply.getData()[0] = " + reply.getData()[0].toString());
316 return reply;
317
318 }
319 catch (Exception e1) {
320 log("wyjatek po QCF ");
321 //e1.printStackTrace();
322 try {
323 topicConnectionFactory = (TopicConnectionFactory) context.lookup( ( (
324 String) req.getData()[0]));
325 jmsObject = manager.newObject(req.owner, req.mobile,
326 type.TOPIC_(type.CONNECTION_FACTORY));
327 jmsObject.setObj(topicConnectionFactory);
328 manager.addObject(jmsObject);
329 replyData = new Object[1];
330 replyData[0] = new Integer(jmsObject.id);
331 reply.setData(replyData);
332 reply.code = REP_TOPIC_CONNECTION_FACTORY;
333 return reply;
334
335 }
336 catch (Exception e2) {
337 try {
338 queue = (Queue) context.lookup( ( (
339 String) req.getData()[0]));
340 jmsObject = manager.newObject(req.owner, req.mobile,
341 type.QUEUE_(type.JNDIOBJECT));
342 jmsObject.setObj(queue);
343 manager.addObject(jmsObject);
344 replyData = new Object[2];
345 replyData[0] = new Integer(jmsObject.id);
346 replyData[1] = queue.getQueueName();
347 reply.setData(replyData);
348 reply.code = REP_QUEUE;
349 return reply;
350
351 }
352 catch (Exception e3) {
353 try {
354 topic = (Topic) context.lookup( ( (
355 String) req.getData()[0]));
356 jmsObject = manager.newObject(req.owner, req.mobile,
357 type.TOPIC_(type.JNDIOBJECT));
358 jmsObject.setObj(topic);
359 manager.addObject(jmsObject);
360 replyData = new Object[2];
361 replyData[0] = new Integer(jmsObject.id);
362 replyData[1] = topic.getTopicName();
363 reply.setData(replyData);
364 reply.code = REP_TOPIC;
365 return reply;
366 }
367 catch (Exception e4) {
368 e4.printStackTrace();
369 return handleExc(req, EXC_NAMING, "No such JNDI object");
370 }
371
372 }
373
374 }
375 }
376
377 case CREATE_QUEUE_CONNECTION:
378 objId = ( (Integer) req.getData()[0]).intValue();
379 queueConnectionFactory = (QueueConnectionFactory) (manager.findObject(
380 objId, req.owner, req.mobile)).getObj();
381
382 if (queueConnectionFactory == null){
383 log("tworzenie qc = findObject zwraca null dla qcf");
384 return handleExc(req, EXC_OTHER, "Invalid QueueConnectionFactory");
385 }
386 try {
387 queueConnection = queueConnectionFactory.createQueueConnection();
388 /*FIXME - to start to prowizorka*/
389 //queueConnection.start();
390 }
391 catch (JMSException e) {
392 log("jmsException przy tworzeniu qc");
393 return handleExc(req, EXC_OTHER, "Cannot create QueueConnection");
394 }
395 jmsObject = manager.newObject(req.owner, req.mobile,
396 type.QUEUE_(type.CONNECTION));
397 jmsObject.setObj(queueConnection);
398 manager.addObject(jmsObject);
399 replyData = new Object[1];
400 replyData[0] = new Integer(jmsObject.id);
401 reply.setData(replyData);
402 reply.code = REP_QUEUE_CONNECTION;
403 return reply;
404
405 case CREATE_TOPIC_CONNECTION:
406 objId = ( (Integer) req.getData()[0]).intValue();
407 topicConnectionFactory = (TopicConnectionFactory) (manager.findObject(
408 objId, req.owner, req.mobile)).getObj();
409 if (topicConnectionFactory == null)
410 return handleExc(req, EXC_OTHER, "Invalid TopicConnectionFactory");
411 try {
412 topicConnection = topicConnectionFactory.createTopicConnection();
413 }
414 catch (JMSException e) {
415 return handleExc(req, EXC_OTHER, "Cannot create TopicConnection");
416 }
417 jmsObject = manager.newObject(req.owner, req.mobile,
418 type.TOPIC_(type.CONNECTION));
419 jmsObject.setObj(topicConnection);
420 manager.addObject(jmsObject);
421 replyData = new Object[1];
422 replyData[0] = new Integer(jmsObject.id);
423 reply.setData(replyData);
424 reply.code = REP_TOPIC_CONNECTION;
425 return reply;
426
427 case CREATE_QUEUE_SESSION:
428 objId = ( (Integer) req.getData()[0]).intValue();
429 log("create queue session dla objId = " + objId);
430 cond = ( (Boolean) req.getData()[1]).booleanValue();
431 mode = ( (Integer) req.getData()[2]).intValue();
432 queueConnection = (QueueConnection) (manager.findObject(
433 objId, req.owner, req.mobile)).getObj();
434 if (queueConnection == null)
435 return handleExc(req, EXC_OTHER, "Invalid QueueConnection");
436 try {
437 queueSession = queueConnection.createQueueSession(cond,Session.CLIENT_ACKNOWLEDGE);
438 }
439 catch (JMSException e) {
440 return handleExc(req, EXC_OTHER, "Cannot create QueueSession");
441 }
442 jmsObject = manager.newObject(req.owner, req.mobile,
443 type.QUEUE_(type.SESSION));
444 jmsObject.setObj(queueSession);
445 manager.addObject(jmsObject);
446 replyData = new Object[1];
447 replyData[0] = new Integer(jmsObject.id);
448 reply.setData(replyData);
449 reply.code = REP_QUEUE_SESSION;
450 return reply;
451
452
453 case CREATE_TOPIC_SESSION:
454 objId = ( (Integer) req.getData()[0]).intValue();
455 cond = ( (Boolean) req.getData()[1]).booleanValue();
456 mode = ( (Integer) req.getData()[2]).intValue();
457 topicConnection = (TopicConnection) (manager.findObject(
458 objId, req.owner, req.mobile)).getObj();
459 if (topicConnection == null)
460 return handleExc(req, EXC_OTHER, "Invalid Topic");
461 try {
462 topicSession = topicConnection.createTopicSession(cond,Session.CLIENT_ACKNOWLEDGE);
463 }
464 catch (JMSException e) {
465 return handleExc(req, EXC_OTHER, "Cannot create TopicSession");
466 }
467 jmsObject = manager.newObject(req.owner, req.mobile,
468 type.TOPIC_(type.SESSION));
469 jmsObject.setObj(topicSession);
470 manager.addObject(jmsObject);
471 replyData = new Object[1];
472 replyData[0] = new Integer(jmsObject.id);
473 reply.setData(replyData);
474 reply.code = REP_TOPIC_SESSION;
475 return reply;
476
477 case CREATE_QUEUE_SENDER:
478 objId = ( (Integer) req.getData()[0]).intValue();
479 queueSession = (QueueSession) (manager.findObject(
480 objId, req.owner, req.mobile)).getObj();
481 if (queueSession == null)
482 return handleExc(req, EXC_OTHER, "Invalid QueueSession");
483 objId = ( (Integer) req.getData()[1]).intValue();
484 queue = (Queue) (manager.findObject(
485 objId, req.owner, req.mobile)).getObj();
486 if (queue == null)
487 return handleExc(req, EXC_OTHER, "Invalid Queue");
488
489 try {
490 queueSender = queueSession.createSender(queue);
491 }
492 catch (JMSException e) {
493 return handleExc(req, EXC_OTHER, "Cannot create queueSender");
494 }
495 jmsObject = manager.newObject(req.owner, req.mobile,
496 type.QUEUE_(type.PRODUCER));
497 jmsObject.setObj(queueSender);
498 manager.addObject(jmsObject);
499 replyData = new Object[1];
500 replyData[0] = new Integer(jmsObject.id);
501 reply.setData(replyData);
502 reply.code = REP_QUEUE_SENDER;
503 return reply;
504
505 case CREATE_TOPIC_PUBLISHER:
506 objId = ( (Integer) req.getData()[0]).intValue();
507 topicSession = (TopicSession) (manager.findObject(
508 objId, req.owner, req.mobile)).getObj();
509 if (topicSession == null)
510 return handleExc(req, EXC_OTHER, "Invalid TopicSession");
511 objId = ( (Integer) req.getData()[1]).intValue();
512 topic = (Topic) (manager.findObject(
513 objId, req.owner, req.mobile)).getObj();
514 if (topic == null)
515 return handleExc(req, EXC_OTHER, "Invalid Topic");
516
517 try {
518 topicPublisher = topicSession.createPublisher(topic);
519 }
520 catch (JMSException e) {
521 return handleExc(req, EXC_OTHER, "Cannot create topicPublisher");
522 }
523 jmsObject = manager.newObject(req.owner, req.mobile,
524 type.TOPIC_(type.PRODUCER));
525 jmsObject.setObj(topicPublisher);
526 manager.addObject(jmsObject);
527 replyData = new Object[1];
528 replyData[0] = new Integer(jmsObject.id);
529 reply.setData(replyData);
530 reply.code = REP_TOPIC_PUBLISHER;
531 return reply;
532
533 case CREATE_QUEUE_RECEIVER:
534 objId = ( (Integer) req.getData()[0]).intValue();
535 queueSession = (QueueSession) (manager.findObject(
536 objId, req.owner, req.mobile)).getObj();
537 if (queueSession == null)
538 return handleExc(req, EXC_OTHER, "Invalid QueueSession");
539 objId = ( (Integer) req.getData()[1]).intValue();
540 queue = (Queue) (manager.findObject(
541 objId, req.owner, req.mobile)).getObj();
542 if (queue == null)
543 return handleExc(req, EXC_OTHER, "Invalid Queue");
544
545 try {
546 queueReceiver = queueSession.createReceiver(queue);
547 }
548 catch (JMSException e) {
549 return handleExc(req, EXC_OTHER, "Cannot create queueReceiver");
550 }
551 jmsObject = manager.newObject(req.owner, req.mobile,
552 type.QUEUE_(type.CONSUMER));
553 jmsObject.setObj(queueReceiver);
554 manager.addObject(jmsObject);
555 replyData = new Object[1];
556 replyData[0] = new Integer(jmsObject.id);
557 reply.setData(replyData);
558 reply.code = REP_QUEUE_RECEIVER;
559 return reply;
560
561 case CREATE_TOPIC_SUBSCRIBER:
562 objId = ( (Integer) req.getData()[0]).intValue();
563 topicSession = (TopicSession) (manager.findObject(
564 objId, req.owner, req.mobile)).getObj();
565 if (topicSession == null)
566 return handleExc(req, EXC_OTHER, "Invalid TopicSession");
567 objId = ( (Integer) req.getData()[1]).intValue();
568 topic = (Topic) (manager.findObject(
569 objId, req.owner, req.mobile)).getObj();
570 if (topic == null)
571 return handleExc(req, EXC_OTHER, "Invalid Topic");
572
573 try {
574 topicSubscriber = topicSession.createSubscriber(topic);
575 }
576 catch (JMSException e) {
577 return handleExc(req, EXC_OTHER, "Cannot create topicSubscriber");
578 }
579 jmsObject = manager.newObject(req.owner, req.mobile,
580 type.TOPIC_(type.CONSUMER));
581 jmsObject.setObj(topicSubscriber);
582 manager.addObject(jmsObject);
583 replyData = new Object[1];
584 replyData[0] = new Integer(jmsObject.id);
585 reply.setData(replyData);
586 reply.code = REP_TOPIC_SUBSCRIBER;
587 return reply;
588
589 case SEND_TEXT_MESSAGE:
590 objId = ( (Integer) req.getData()[0]).intValue();
591 queueSession = (QueueSession) (manager.findObject(
592 objId, req.owner, req.mobile)).getObj();
593 if (queueSession == null)
594 return handleExc(req, EXC_OTHER, "Invalid QueueSession");
595 objId = ( (Integer) req.getData()[1]).intValue();
596 queueSender = (QueueSender) (manager.findObject(
597 objId, req.owner, req.mobile)).getObj();
598 if (queueSender == null)
599 return handleExc(req, EXC_OTHER, "Invalid QueueSender");
600 objId = ( (Integer) req.getData()[2]).intValue();
601 queue = (Queue) (manager.findObject(
602 objId, req.owner, req.mobile)).getObj();
603 if (queue == null)
604 return handleExc(req, EXC_OTHER, "Invalid Queue");
605
606 objId = ( (Integer) req.getData()[7]).intValue();
607
608 tempObj = (manager.findObject(
609 objId, req.owner, req.mobile));
610
611 if(tempObj != null) replyTo = (Destination) tempObj;
612
613
614
615 objId = ( (Integer) req.getData()[8]).intValue();
616 tempObj = (manager.findObject(
617 objId, req.owner, req.mobile));
618 if(tempObj != null) dest = (Destination) tempObj;
619 /*if the destinations cannot be found, they will be set to null */
620
621 try {
622 textMessage = queueSession.createTextMessage();
623 }
624 catch (JMSException e) {
625 return handleExc(req, EXC_OTHER, "Cannot create message");
626 }
627
628 try{
629 data = req.getData();
630 textMessage.setText( ( (String) data[3]));
631 textMessage.setJMSMessageID( (String) data[4]);
632 textMessage.setJMSTimestamp(( (Long) data[5]).longValue());
633 textMessage.setJMSCorrelationID( (String) data[6]);
634 if(replyTo != null) textMessage.setJMSReplyTo(replyTo);
635 if(dest != null) textMessage.setJMSDestination(dest);
636 if(((Integer) data[9]).intValue() != 0) textMessage.setJMSDeliveryMode( Constant.get(((Integer) data[9]).intValue()));
637 textMessage.setJMSRedelivered( ((Boolean) data[10]).booleanValue());
638 textMessage.setJMSType( (String) data[11]);
639 textMessage.setJMSExpiration( ( (Long) data[12]).longValue());
640 textMessage.setJMSPriority( ( (Integer) data[13]).intValue());
641
642 //properties
643 JMSMessage jmsMesg = new JMSMessage(req.owner, req.mobile);
644 jmsMesg.setObj(textMessage);
645 jmsMesg.applyProps((Vector) data[14]);
646 textMessage = (TextMessage) jmsMesg.getObj();
647
648 queueSender.send(textMessage);
649 }
650 catch (InvalidDestinationException e) {
651 log("invalid destination exception while sending mesg: ");
652 e.printStackTrace();
653 return handleExc(req, EXC_INVALID_DESTINATION, "Cannot send message");
654 }
655 catch (JMSException e) {
656 log("JMS exception while sending mesg: ");
657 e.printStackTrace();
658 return handleExc(req, EXC_OTHER, "Cannot send message");
659 }
660 return null;
661
662 case SEND_BYTES_MESSAGE:
663 objId = ( (Integer) req.getData()[0]).intValue();
664 queueSession = (QueueSession) (manager.findObject(
665 objId, req.owner, req.mobile)).getObj();
666 if (queueSession == null)
667 return handleExc(req, EXC_OTHER, "Invalid QueueSession");
668 objId = ( (Integer) req.getData()[1]).intValue();
669 queueSender = (QueueSender) (manager.findObject(
670 objId, req.owner, req.mobile)).getObj();
671 if (queueSender == null)
672 return handleExc(req, EXC_OTHER, "Invalid QueueSender");
673 objId = ( (Integer) req.getData()[2]).intValue();
674 queue = (Queue) (manager.findObject(
675 objId, req.owner, req.mobile)).getObj();
676 if (queue == null)
677 return handleExc(req, EXC_OTHER, "Invalid Queue");
678
679
680 objId = ( (Integer) req.getData()[7]).intValue();
681
682 tempObj = (manager.findObject(
683 objId, req.owner, req.mobile));
684
685 if(tempObj != null) replyTo = (Destination) tempObj;
686
687
688
689 objId = ( (Integer) req.getData()[8]).intValue();
690 tempObj = (manager.findObject(
691 objId, req.owner, req.mobile));
692 if(tempObj != null) dest = (Destination) tempObj;
693 /*if the destinations cannot be found, they will be set to null */
694
695
696 try {
697 bytesMessage = queueSession.createBytesMessage();
698 }
699 catch (JMSException e) {
700 return handleExc(req, EXC_OTHER, "Cannot create message");
701 }
702 try {
703 data = req.getData();
704 bytesMessage.writeBytes( (byte[]) data[3]);
705 bytesMessage.setJMSMessageID( (String) data[4]);
706 bytesMessage.setJMSTimestamp( ( (Long) data[5]).longValue());
707 bytesMessage.setJMSCorrelationID( (String) data[6]);
708 if(replyTo != null) textMessage.setJMSReplyTo(replyTo);
709 if(dest != null) textMessage.setJMSDestination(dest);
710 bytesMessage.setJMSDeliveryMode( ( (Integer) data[9]).intValue());
711 bytesMessage.setJMSRedelivered( ((Boolean) data[10]).booleanValue());
712 bytesMessage.setJMSType( (String) data[11]);
713 bytesMessage.setJMSExpiration( ( (Long) data[12]).longValue());
714 bytesMessage.setJMSPriority( ( (Integer) data[13]).intValue());
715 //properties
716 JMSMessage jmsMesg = new JMSMessage(req.owner, req.mobile);
717 jmsMesg.setObj(bytesMessage);
718 jmsMesg.applyProps( (Vector) data[14]);
719 bytesMessage = (BytesMessage) jmsMesg.getObj();
720
721 queueSender.send(bytesMessage);
722 }
723 catch (InvalidDestinationException e) {
724 log("invalid destination exception while sending mesg: ");
725 e.printStackTrace();
726 return handleExc(req, EXC_INVALID_DESTINATION,
727 "Cannot send message");
728 }
729 catch (JMSException e) {
730 log("JMS exception while sending mesg: ");
731 e.printStackTrace();
732 return handleExc(req, EXC_OTHER, "Cannot send message");
733 }
734 return null;
735
736 case PUBLISH_TEXT_MESSAGE:
737 log("poczatek handle :|>>>>>>>>>>>>>>>>>>>");
738 objId = ( (Integer) req.getData()[0]).intValue();
739 topicSession = (TopicSession) (manager.findObject(
740 objId, req.owner, req.mobile)).getObj();
741 if (topicSession == null)
742 return handleExc(req, EXC_OTHER, "Invalid TopicSession");
743 objId = ( (Integer) req.getData()[1]).intValue();
744 topicPublisher = (TopicPublisher) (manager.findObject(
745 objId, req.owner, req.mobile)).getObj();
746 if (topicPublisher == null)
747 return handleExc(req, EXC_OTHER, "Invalid TopicSender");
748 objId = ( (Integer) req.getData()[2]).intValue();
749 topic = (Topic) (manager.findObject(
750 objId, req.owner, req.mobile)).getObj();
751 if (topic == null)
752 return handleExc(req, EXC_OTHER, "Invalid Topic");
753
754 log("srodek handle >>>>>>>>>>>>>>>>");
755 try{
756 objId = ( (Integer) req.getData()[7]).intValue();
757
758 tempObj = (manager.findObject(
759 objId, req.owner, req.mobile));
760
761 if (tempObj != null)
762 replyTo = (Destination) tempObj;
763
764 objId = ( (Integer) req.getData()[8]).intValue();
765 tempObj = (manager.findObject(
766 objId, req.owner, req.mobile));
767 if (tempObj != null)
768 dest = (Destination) tempObj;
769 }catch (Exception e){
770 log("Tu jest ten wyjatek, ale to w sumie niewazne");
771 }
772 /*if the destinations cannot be found, they will be set to null */
773
774
775 try {
776 textMessage = topicSession.createTextMessage();
777 }
778 catch (JMSException e) {
779 return handleExc(req, EXC_OTHER, "Cannot create message");
780 }
781 try{
782 data = req.getData();
783 log("1");
784 textMessage.setText( ( (String) data[3]));
785 log("2");
786 textMessage.setJMSMessageID( (String) data[4]);
787 log("3");
788 textMessage.setJMSTimestamp(( (Long) data[5]).longValue());
789 log("4");
790 textMessage.setJMSCorrelationID( (String) data[6]);
791 log("5");
792 if(replyTo != null) textMessage.setJMSReplyTo(replyTo);
793 log("6");
794 if(dest != null) textMessage.setJMSDestination(dest);
795 log("7");
796 //textMessage.setJMSDeliveryMode( ((Integer) data[9]).intValue());
797 textMessage.setJMSDeliveryMode( javax.jms.DeliveryMode.NON_PERSISTENT);
798 log("8");
799 textMessage.setJMSRedelivered( ((Boolean) data[10]).booleanValue());
800 log("9");
801 textMessage.setJMSType( (String) data[11]);
802 log("10");
803 textMessage.setJMSExpiration( ( (Long) data[12]).longValue());
804 log("11");
805 textMessage.setJMSPriority( ( (Integer) data[13]).intValue());
806 log("12");
807 //properties
808 JMSMessage jmsMesg = new JMSMessage(req.owner, req.mobile);
809 log("13");
810 jmsMesg.setObj(textMessage);
811 log("14");
812 jmsMesg.applyProps( (Vector) data[14]);
813 log("15");
814 textMessage = (TextMessage) jmsMesg.getObj();
815 log("16");
816 log("prawie koniec handle >>>>>>>>>>>>>>>>");
817 topicPublisher.publish(textMessage);
818 }
819 catch (InvalidDestinationException e) {
820 return handleExc(req, EXC_INVALID_DESTINATION, "Cannot publish message");
821 }
822 catch (JMSException e) {
823 return handleExc(req, EXC_OTHER, "Cannot publish message");
824 }
825 return null;
826
827 case PUBLISH_BYTES_MESSAGE:
828 objId = ( (Integer) req.getData()[0]).intValue();
829 topicSession = (TopicSession) (manager.findObject(
830 objId, req.owner, req.mobile)).getObj();
831 if (topicSession == null)
832 return handleExc(req, EXC_OTHER, "Invalid TopicSession");
833 objId = ( (Integer) req.getData()[1]).intValue();
834 topicPublisher = (TopicPublisher) (manager.findObject(
835 objId, req.owner, req.mobile)).getObj();
836 if (topicPublisher == null)
837 return handleExc(req, EXC_OTHER, "Invalid TopicSender");
838 objId = ( (Integer) req.getData()[2]).intValue();
839 topic = (Topic) (manager.findObject(
840 objId, req.owner, req.mobile)).getObj();
841 if (topic == null)
842 return handleExc(req, EXC_OTHER, "Invalid Topic");
843
844 objId = ( (Integer) req.getData()[7]).intValue();
845
846 tempObj = (manager.findObject(
847 objId, req.owner, req.mobile));
848
849 if(tempObj != null) replyTo = (Destination) tempObj;
850
851
852
853 objId = ( (Integer) req.getData()[8]).intValue();
854 tempObj = (manager.findObject(
855 objId, req.owner, req.mobile));
856 if(tempObj != null) dest = (Destination) tempObj;
857 /*if the destinations cannot be found, they will be set to null */
858
859 try {
860 bytesMessage = queueSession.createBytesMessage();
861 }
862 catch (JMSException e) {
863 return handleExc(req, EXC_OTHER, "Cannot create message");
864 }
865 try {
866 data = req.getData();
867 bytesMessage.writeBytes( (byte[]) data[3]);
868 bytesMessage.setJMSMessageID( (String) data[4]);
869 bytesMessage.setJMSTimestamp( ( (Long) data[5]).longValue());
870 bytesMessage.setJMSCorrelationID( (String) data[6]);
871 if(replyTo != null) textMessage.setJMSReplyTo(replyTo);
872 if(dest != null) textMessage.setJMSDestination(dest);
873 bytesMessage.setJMSDeliveryMode( ( (Integer) data[9]).intValue());
874 bytesMessage.setJMSRedelivered( ((Boolean) data[10]).booleanValue());
875 bytesMessage.setJMSType( (String) data[11]);
876 bytesMessage.setJMSExpiration( ( (Long) data[12]).longValue());
877 bytesMessage.setJMSPriority( ( (Integer) data[13]).intValue());
878
879 //properties
880 JMSMessage jmsMesg = new JMSMessage(req.owner, req.mobile);
881 jmsMesg.setObj(bytesMessage);
882 jmsMesg.applyProps( (Vector) data[14]);
883 bytesMessage = (BytesMessage) jmsMesg.getObj();
884
885 topicPublisher.publish(bytesMessage);
886 }
887 catch (InvalidDestinationException e) {
888 return handleExc(req, EXC_INVALID_DESTINATION,
889 "Cannot send message");
890 }
891 catch (JMSException e) {
892 return handleExc(req, EXC_OTHER, "Cannot send message");
893 }
894 return null;
895
896
897 case RECEIVE_MESSAGE_FROM_QUEUE:
898 case RECEIVE_MESSAGE_FROM_TOPIC:
899 objId = ( (Integer) req.getData()[0]).intValue();
900 messageConsumer = (MessageConsumer) (manager.findObject(
901 objId, req.owner, req.mobile)).getObj();
902 if (messageConsumer == null)
903 return handleExc(req, EXC_OTHER, "Invalid MessageConsumer");
904 objId = ( (Integer) req.getData()[2]).intValue(); /*Session.id*/
905 session = (Session) (manager.findObject(
906 objId, req.owner, req.mobile)).getObj();
907 if (session == null)
908 return handleExc(req, EXC_OTHER, "Invalid Session");
909
910 Receiver receiver = new Receiver(this, messageConsumer,session, req);
911 return null;
912
913
914 case STOP_CONNECTION:
915 connection = (Connection) manager.findObject( ( (Integer) req.getData()[0]).
916 intValue(), req.owner, req.mobile).getObj();
917 if(connection == null) return null;
918 try{
919 connection.stop();
920 }
921 catch(JMSException e){
922 return null;
923 }
924 return null;
925 case START_CONNECTION:
926 connection = (Connection) manager.findObject( ( (Integer) req.getData()[0]).
927 intValue(), req.owner, req.mobile).getObj();
928 if(connection == null) return null;
929 try{
930 connection.start();
931 }
932 catch(JMSException e){
933 return null;
934 }
935
936 return null;
937 case CLOSE_CONNECTION:
938 connection = (Connection) manager.findObject( ( (Integer) req.getData()[0]).
939 intValue(), req.owner, req.mobile).getObj();
940 if (connection == null)
941 return null;
942 try{
943 connection.close();
944 }catch(JMSException e){
945 return null;
946 }
947
948 return null;
949 case COMMIT:
950 session = (Session) manager.findObject( ( (Integer) req.getData()[0]).
951 intValue(), req.owner,
952 req.mobile).getObj();
953 if (session == null)
954 return null;
955 try{
956 session.commit();
957 }
958 catch(JMSException e){
959 return null;
960 }
961
962 return null;
963
964 case ROLLBACK:
965 session = (Session) manager.findObject( ( (Integer) req.getData()[0]).
966 intValue(), req.owner,
967 req.mobile).getObj();
968 if (session == null)
969 return null;
970 try{
971 session.rollback();
972 }
973 catch(JMSException e){
974 return null;
975 }
976 return null;
977 case CLOSE_MESSAGE_PRODUCER:
978 producer = (MessageProducer) manager.findObject( ( (Integer) req.getData()[0]).
979 intValue(), req.owner,
980 req.mobile).getObj();
981 if (producer == null)
982 return null;
983 producer.close();
984 return null;
985
986 case CLOSE_MESSAGE_CONSUMER:
987 messageConsumer = (MessageConsumer) manager.findObject( ( (Integer) req.getData()[0]).
988 intValue(), req.owner,
989 req.mobile).getObj();
990 if (messageConsumer == null)
991 return null;
992 messageConsumer.close();
993 return null;
994 case ACKNOWLEDGE_MESSAGE:
995 message = (Message) manager.findObject( ( (Integer) req.getData()[0]).
996 intValue(), req.owner,
997 req.mobile).getObj();
998 if (message == null)
999 return null;
1000 message.acknowledge();
1001 return null;
1002 /*DONE 24/28 */
1003 /*TODO - creating and deleting temporary destinations, durable */
1004 /* case CREATE_TEMPORARY_QUEUE:
1005 queueSession = (QueueSession) manager.findObject( ( (Integer) data[0]).
1006 intValue(), req.owner,
1007 req.mobile).getObj();
1008 if (session == null)
1009 return null;
1010 jmsObject = manager.newObject(req.owner, req.mobile,
1011 type.QUEUE_(type.CONNECTION));
1012 jmsObject.setObj(queueConnection);
1013 replyData = new Object[1];
1014 replyData[0] = new Integer(jmsObject.id);
1015 reply.setData(replyData);
1016 reply.code = REP_QUEUE_CONNECTION;
1017 return reply;
1018
1019 return null;*/
1020
1021
1022
1023
1024
1025 } // end switch
1026 log("no catch statement matched in handleRequest- returning null");
1027 return null;
1028 } //end try
1029 catch (NamingException e) {
1030 log("oops! Naming exception:" + e);
1031
1032 return handleExc(req, EXC_NAMING, "Naming exception");
1033 }
1034 catch (Exception e) {
1035 log("oops! Exception:" + e);
1036 e.printStackTrace();
1037 return handleExc(req, EXC_OTHER, "Other exception");
1038 }
1039
1040}
1041
1042/**
1043 * <p>
1044 * The main thread. It takes the next request from the request queue
1045 * and executes the action associated
1046 * with it. The action generates a reply, which is handled by the processReply
1047 * method.
1048 * </p>
1049 */
1050public void run() {
1051 Request nextRequest = null;
1052 Reply reply = null;
1053 while (true) {
1054 if (requestQueue.size() > 0) { /*queue not empty */
1055 synchronized (this) {
1056 nextRequest = (Request) requestQueue.getFirst();
1057 }
1058 reply = (Reply) handleRequest(nextRequest);
1059 // log("niemozliwe WCZESNIEJ");
1060
1061 //log("reply.code= " + reply.code);
1062 //if(reply.getData() == null) log(" w run niemozliwe");
1063 // else log("w run jeszcze repData jest ok");
1064 synchronized (this) {
1065 nextRequest = (Request) requestQueue.removeFirst();
1066 }
1067 if (reply != null) processReply(reply);
1068 }
1069 }
1070 }
1071
1072 private class Replier extends Thread{
1073 ProxyRequestManager parent;
1074 public Replier(ProxyRequestManager par){
1075 parent = par;
1076 start();
1077 }
1078
1079 public void run(){
1080
1081 Reply nextReply;
1082 while(true){
1083 if(replyQueue.size() > 0){ /*queue not empty */
1084 synchronized(parent){
1085 nextReply = (Reply) replyQueue.removeFirst();
1086 }
1087 // if(nextReply == null) log("reply null");
1088 // if (nextReply.getData() == null) log("getdata null w replier");
1089 // log("replier: reply code " + nextReply.code + "object id " );
1090 talking.putNext(nextReply.code, nextReply.owner, nextReply.id,
1091 nextReply.mobile, nextReply.serialize());
1092 }
1093 }
1094 }
1095 } //end replier
1096
1097 private class Receiver
1098 extends Thread {
1099 MessageConsumer consumer;
1100 Session session;
1101 Request request;
1102 ProxyRequestManager parent;
1103
1104
1105 public Receiver(ProxyRequestManager par, MessageConsumer cons, Session ses, Request req) {
1106
1107 parent = par;
1108 consumer = cons;
1109 session = ses;
1110 request = req;
1111 log("Receiver wystartowal ................");
1112 start();
1113 }
1114
1115 /**
1116 * <p>
1117 * Copies len bytes from source to dest beginning from offset in the dest and
1118 * from the begining in the source
1119 * </p><p>
1120 *
1121 * </p>
1122 */
1123
1124 private void copyBytes(byte[] source, byte[] dest, int offset, int len){
1125 int i = 0;
1126 for (i = 0; i < len; i++){
1127 dest[offset + i] = source[i];
1128 }
1129 }
1130
1131 public void run() {
1132 Reply reply = new Reply(request);
1133 Message mesg = null;
1134 BytesMessage bmesg = null;
1135 TextMessage tmesg = null;
1136 Object data[] = new Object[13];
1137 ProxyStoredObject jmsObject = null;
1138 ProxyStoredObject jmsMesgObject = null;
1139 ProxyStoredObjectType type = new ProxyStoredObjectType();
1140 boolean text = false; //if it's a text or bytes message
1141 try {
1142 session.recover();
1143 mesg = consumer.receive();
1144 try {
1145 bmesg = (BytesMessage) mesg;
1146 }
1147 catch (Exception e1) {
1148 try {
1149 tmesg = (TextMessage) mesg;
1150 text = true;
1151 }
1152 catch (Exception e2) {
1153 parent.handleExc(request, EXC_OTHER, "Unsupported message type");
1154 return;
1155 }
1156 }
1157
1158 }
1159 catch (JMSException e3){
1160 parent.handleExc(request, EXC_OTHER, "Message receive failed");
1161 return;
1162 }
1163
1164 try {
1165 jmsMesgObject = manager.newObject(request.owner, request.mobile,
1166 type.MESSAGE);
1167 jmsMesgObject.setObj(mesg);
1168 manager.addObject(jmsMesgObject);
1169 data[0] = new Integer(jmsMesgObject.id);
1170 }
1171 catch(Exception e) {
1172 parent.handleExc(request, EXC_OTHER, "Message receive failed");
1173 }
1174
1175
1176 try {
1177 if (text) {
1178 data[1] = tmesg.getText();
1179 reply.code = REP_TEXT_MESSAGE;
1180 }
1181 else {
1182 byte tmp[] = new byte[MAX_BYTES_LEN];
1183 int len = bmesg.readBytes( (byte[]) tmp);
1184 data[1] = new byte[len];
1185 copyBytes(tmp, (byte[]) data[1], 0, len);
1186 reply.code = REP_BYTES_MESSAGE;
1187 }
1188 data[2] = mesg.getJMSMessageID();
1189 data[3] = new Long(mesg.getJMSTimestamp());
1190 data[4] = mesg.getJMSCorrelationID();
1191 /*DESTINATIONS*/
1192 /*ReplyTo*/
1193 jmsObject = manager.newObject(request.owner, request.mobile,
1194 type.JNDIOBJECT);
1195
1196 jmsObject.setObj(mesg.getJMSReplyTo());
1197 manager.addObject(jmsObject);
1198 data[5] = new Integer(jmsObject.id);
1199 /*JMSDestination*/
1200 jmsObject = manager.newObject(request.owner, request.mobile,
1201 type.JNDIOBJECT);
1202
1203 jmsObject.setObj(mesg.getJMSDestination());
1204 manager.addObject(jmsObject);
1205 data[6] = new Integer(jmsObject.id);
1206
1207 data[7] = new Integer(mesg.getJMSDeliveryMode());
1208 data[8] = new Boolean(mesg.getJMSRedelivered());
1209 data[9] = mesg.getJMSType();
1210 data[10] = new Long(mesg.getJMSExpiration());
1211 data[11] = new Integer(mesg.getJMSPriority());
1212 log("!!!!!!!!!!!!!!!!!!!!przsed propsami ");
1213 data[12] = ((proxy.jmsapi.JMSMessage) jmsMesgObject).propsVector();
1214 if (data[12] == null) log("PROPSY SA NULLOWE ");
1215 else log("Jest " + ((Vector) data[12]).size() + " PROPSOW");
1216 }
1217 catch (JMSException e) {
1218 parent.handleExc(request, EXC_OTHER, "Error processing message");
1219 return;
1220 }
1221 catch(Exception e) {
1222 parent.handleExc(request, EXC_OTHER, "Message receive failed");
1223 }
1224 if (data[12] == null) log("PROPSY SA NULLOWE ");
1225 else log("Jest " + ((Vector) data[12]).size() + " PROPSOW");
1226
1227 reply.setData(data);
1228 parent.processReply(reply);
1229 }
1230 } //end receiver
1231
1232
1233
1234} // end RequestManager
1235
1236
1237
1238
1239