Docjar: A Java Source and Docuemnt Enginecom.*    java.*    javax.*    org.*    all    new    plug-in

Quick Search    Search Deep

Source code: proxy/protocol/ProxyRequestManager.java


1   /* czy to http wywoluje moja metode processRequest, kiedy cos przyjdzie? */
2   
3   /** Java class "RequestManager.java" generated from Poseidon for UML.
4    *  Poseidon for UML is developed by <A HREF="http://www.gentleware.com">Gentleware</A>.
5    *  Generated with <A HREF="http://jakarta.apache.org/velocity/">velocity</A> template engine.
6    */
7   
8   /*TODO -  z timeoutem */
9   
10  /*TODO - Session recover, wrzucanie wiadomosci do JMSmanagera */
11  package proxy.protocol;
12  
13  import java.util.*;
14  import proxy.bearer.http.*;
15  import proxy.jmsapi.*;
16  //import proxy.jndi.*;
17  import proxy.ProxyServer;
18  import javax.jms.*;
19  import javax.naming.*;
20  
21  /**
22   * <p>
23   * This class provides an implementation of a thread which manages requests
24   * comming from mobile devices. The request are received in the order of their
25   * ids. The receipt protocol ensures that no request is lost and that they come
26   * in the desired order.
27   * </p>
28   * <p>
29   * The requests are stored in a queue and processed sequentially. For each request
30   * the appropriate action associated with this request is executed and a reply
31   * is generated. The reply is then added to the reply queue. The reply queue
32   * is emptied by the replier thread which send the replies to the mobile
33   * device using the bearer layer.
34   *
35   * </p>
36   *
37   * @author Pawel Koziol
38   */
39  public class ProxyRequestManager extends Thread implements mobile.bearer.http.Protocol{
40  
41    /**
42     * <p>
43     * For each mobile contains a hash table mapping the id of the last request which was received from an application.
44     * Thus the hash table has two levels - the first one mapping the mobile nr
45     * to the second hash table and the second mapping the owner id to the
46     * last request id.
47     * </p>
48     */
49    private Hashtable currentIds = new Hashtable(); //of HashTable objects
50  
51    /**
52     * <p>
53     * For each mobile contains a sorted collection of waiting requests which couldn't
54     * have been processed because a previous request was missing
55     * </p>
56     */
57    private Hashtable waiting = new Hashtable(); //of type TreeSet
58  
59  /**
60   * <p>
61   *
62   * </p>
63   */
64  
65    private TalkingWithMobiles talking;
66  
67  /**
68   * <p>
69   *
70   * </p>
71   */
72    private JMSObjectManager manager = ProxyServer.getObjectManager();
73  
74    private static final int MESSAGES_PER_PACK = 1;
75  
76    private Replier replier;
77  
78  ///////////////////////////////////////
79     // associations
80  
81  /**
82   * <p>
83   *
84   * </p>
85   */
86      private LinkedList requestQueue; // of type Request
87  /**
88   * <p>
89   *
90   * </p>
91   */
92      private LinkedList replyQueue;// of type Reply
93  
94  
95    ///////////////////////////////////////
96    // operations
97  
98  public void setTalkingWithMobiles(TalkingWithMobiles _twm)
99    {
100     talking = _twm;
101   }
102 
103 public void setObjectManager(JMSObjectManager _om)
104   {
105     manager = _om;
106   }
107 
108 
109 /**adds the message to the server logs - for debugging*/
110 
111 private void log(String msg){
112   System.out.println("PRM: " + msg);
113   }
114 /**
115  * <p>
116  * Receives the next request, acknowledges its receipt and adds it to the request
117  * queue. Provides that the requests are added to the queue in the desired order
118  * and that no request is lost.
119  * </p><p>
120  *
121  * @param req The request to be processed.
122  * </p><p>
123  *
124  * </p>
125  */
126     synchronized public void processRequest(Request req) {
127       int acknArray[] = new int[1];
128       Integer mob = new Integer(req.mobile);
129       Integer own = new Integer(req.owner);
130       TreeSet waitSet; /* sorted collection of waiting requests; contained in the waiting hashtable */
131       Hashtable secondId, secondWait; /*the second level hash tables*/
132 
133       log("processRequest id:" + req.id);
134       secondId = (Hashtable) currentIds.get(mob);
135       if(secondId == null) {
136   secondId = new Hashtable();
137   currentIds.put(mob, secondId);
138       }
139       Integer lastId = (Integer) secondId.get(own);
140       if (lastId == null) {
141   lastId = new Integer(0);
142   secondId.put(own, lastId);
143       }
144 
145       secondWait = (Hashtable) waiting.get(mob);
146       if(secondWait == null) {
147   secondWait = new Hashtable();
148   waiting.put(mob, secondWait);
149       }
150       waitSet = (TreeSet) secondWait.get(own);
151       if(waitSet == null){
152   waitSet = new TreeSet();
153   secondWait.put(own, waitSet);
154       }
155 
156       //lastId = (Integer) secondId.get(own);
157       //waitSet = (TreeSet) secondWait.get(own);
158 
159       /*if(req.id == 0){
160         currentIds.put(mob, new Integer(0));
161         waiting.put(mob, new TreeSet());
162         //powinno byc talking.putAcknowledge(own,mob, acknArray);
163         acknArray[0] = 0;
164         talking.putAcknowledge(req.owner, req.mobile, acknArray);
165       }
166       else{*/
167         if (req.id == (lastId.intValue() + 1)) {
168           /* this request comes next */
169           requestQueue.addLast(req);
170           acknArray[0] = req.id;
171           talking.putAcknowledge(req.owner, req.mobile, acknArray);
172           lastId = new Integer(lastId.intValue() + 1);
173           secondId.put(own, lastId);
174 
175           /* check if the waiting requests can come next */
176           //waitSet = (TreeSet) waiting.get(mob);
177           if(!waitSet.isEmpty()){
178             Request nextRequest = (Request) waitSet.first();
179             while(nextRequest.id == lastId.intValue() + 1){
180               requestQueue.addLast(nextRequest);
181               waitSet.remove(nextRequest);
182               lastId = new Integer(lastId.intValue() + 1);
183               secondId.put(own, lastId);
184               try{
185                 nextRequest = (Request) waitSet.first();
186               }
187               catch(NoSuchElementException e){
188                 break;
189               }
190             } //end while
191           } //end check waiting requests
192         }
193         else{ /* this is a previous (already processed) request or a request that needs to wait */
194           if(req.id < lastId.intValue() + 1){
195             /*previous request */
196             acknArray[0] = req.id;
197             talking.putAcknowledge(req.owner, req.mobile, acknArray);
198           }
199           else{
200             /*this request needs to wait */
201             //waitSet = (TreeSet) waiting.get(mob);
202             waitSet.add(req);
203             acknArray[0] = req.id;
204             talking.putAcknowledge(req.owner , req.mobile, acknArray);
205           }
206 
207         }
208 
209       //}
210       //putAcknowledge(req);
211         // your code here
212     } // end processRequest
213 
214 /**
215  * <p>
216  * Adds the reply to the reply queue.
217  * </p><p>
218  *
219  * @param rep The reply to be processed.
220  * </p><p>
221  *
222  * </p>
223  */
224     synchronized public void processReply(Reply rep) {
225       replyQueue.addLast(rep);
226         // your code here
227     } // end processReply
228 
229     /**
230      * <p>
231      * Initializes the queues and starts the main thread.
232      * </p>
233      */
234 
235  public ProxyRequestManager(){
236   requestQueue = new LinkedList();
237   replyQueue = new LinkedList();
238   replier = new Replier(this);
239   //replier.start(); //czy to jest potrzebne?
240   //start();
241  }
242 
243 private Reply handleExc(Request req, int type, String name){
244    Reply reply = new Reply(req);
245    Object[] replyData = new Object[1];
246    replyData[0] = name;
247    reply.setData(replyData);
248    reply.code = type;
249    return reply;
250  }
251 
252  private Reply handleRequest(Request req){
253   try{
254     int objId = 0; /*to look up objects in JMSObjectManager */
255     boolean cond = false;
256     int mode = 0;
257     InitialContext context = ProxyServer.proxyInitialContext;
258     QueueConnectionFactory queueConnectionFactory = null;
259     TopicConnectionFactory topicConnectionFactory = null;
260     QueueConnection queueConnection = null;
261     TopicConnection topicConnection = null;
262     Connection connection = null;
263     QueueSession queueSession = null;
264     TopicSession topicSession = null;
265     Session session = null;
266     Queue queue = null;
267     Topic topic = null;
268     QueueSender queueSender = null;
269     TopicPublisher topicPublisher = null;
270     MessageProducer producer = null;
271     QueueReceiver queueReceiver = null;
272     TopicSubscriber topicSubscriber = null;
273     MessageConsumer messageConsumer = null;
274     Message message = null;
275     TextMessage textMessage = null;
276     BytesMessage bytesMessage = null;
277     Destination replyTo = null, dest = null;
278     Object tempObj = null;
279     ProxyStoredObject jmsObject = null;
280 
281 
282     String text = null;
283     Reply reply = new Reply(req);
284     Object[] replyData = null;
285     Object[] data = null;
286     ProxyStoredObjectType type = new ProxyStoredObjectType();
287 
288    // log("begin handle request: code" + req.code + "owner" + req.owner + "mesg " + (String) req.getData()[0]);
289 
290     switch(req.code){
291       case LOOKUP:
292         try {
293           //log( (String) req.getData()[0]);
294 
295           queueConnectionFactory = (QueueConnectionFactory) context.lookup( ( (
296               String) req.getData()[0]));
297           jmsObject = manager.newObject(req.owner, req.mobile,
298                                         type.QUEUE_(type.CONNECTION_FACTORY));
299 
300          // log("JMSObject.id = " + jmsObject.id);
301           jmsObject.setObj(queueConnectionFactory);
302           manager.addObject(jmsObject);
303           reply = new Reply(req);
304           replyData = new Object[1];
305           replyData[0] = new Integer(jmsObject.id);
306 
307           reply.code = REP_QUEUE_CONNECTION_FACTORY;
308           reply.setData(replyData);
309 
310           log("zwracam QCF");
311           //if (reply.getData() == null)
312            // log("niemozliwe");
313          // else
314           //  log("w handle jeszcze repData jest ok");
315          // log("reply.getData()[0] = " + reply.getData()[0].toString());
316           return reply;
317 
318         }
319         catch (Exception e1) {
320           log("wyjatek po QCF ");
321           //e1.printStackTrace();
322           try {
323             topicConnectionFactory = (TopicConnectionFactory) context.lookup( ( (
324                 String) req.getData()[0]));
325             jmsObject = manager.newObject(req.owner, req.mobile,
326                                           type.TOPIC_(type.CONNECTION_FACTORY));
327             jmsObject.setObj(topicConnectionFactory);
328             manager.addObject(jmsObject);
329             replyData = new Object[1];
330             replyData[0] = new Integer(jmsObject.id);
331             reply.setData(replyData);
332             reply.code = REP_TOPIC_CONNECTION_FACTORY;
333             return reply;
334 
335           }
336           catch (Exception e2) {
337             try {
338               queue = (Queue) context.lookup( ( (
339                   String) req.getData()[0]));
340               jmsObject = manager.newObject(req.owner, req.mobile,
341                                             type.QUEUE_(type.JNDIOBJECT));
342               jmsObject.setObj(queue);
343               manager.addObject(jmsObject);
344               replyData = new Object[2];
345               replyData[0] = new Integer(jmsObject.id);
346               replyData[1] = queue.getQueueName();
347               reply.setData(replyData);
348               reply.code = REP_QUEUE;
349               return reply;
350 
351             }
352             catch (Exception e3) {
353               try {
354                 topic = (Topic) context.lookup( ( (
355                     String) req.getData()[0]));
356                 jmsObject = manager.newObject(req.owner, req.mobile,
357                                               type.TOPIC_(type.JNDIOBJECT));
358                 jmsObject.setObj(topic);
359                 manager.addObject(jmsObject);
360                 replyData = new Object[2];
361                 replyData[0] = new Integer(jmsObject.id);
362                 replyData[1] = topic.getTopicName();
363                 reply.setData(replyData);
364                 reply.code = REP_TOPIC;
365                 return reply;
366               }
367               catch (Exception e4) {
368                 e4.printStackTrace();
369                 return handleExc(req, EXC_NAMING, "No such JNDI object");
370               }
371 
372             }
373 
374           }
375         }
376 
377       case CREATE_QUEUE_CONNECTION:
378         objId = ( (Integer) req.getData()[0]).intValue();
379         queueConnectionFactory = (QueueConnectionFactory) (manager.findObject(
380             objId, req.owner, req.mobile)).getObj();
381 
382         if (queueConnectionFactory == null){
383           log("tworzenie qc = findObject zwraca null dla qcf");
384           return handleExc(req, EXC_OTHER, "Invalid QueueConnectionFactory");
385         }
386         try {
387           queueConnection = queueConnectionFactory.createQueueConnection();
388           /*FIXME - to start to prowizorka*/
389           //queueConnection.start();
390         }
391         catch (JMSException e) {
392           log("jmsException przy tworzeniu qc");
393           return handleExc(req, EXC_OTHER, "Cannot create QueueConnection");
394         }
395         jmsObject = manager.newObject(req.owner, req.mobile,
396                                       type.QUEUE_(type.CONNECTION));
397         jmsObject.setObj(queueConnection);
398         manager.addObject(jmsObject);
399         replyData = new Object[1];
400         replyData[0] = new Integer(jmsObject.id);
401         reply.setData(replyData);
402         reply.code = REP_QUEUE_CONNECTION;
403         return reply;
404 
405       case CREATE_TOPIC_CONNECTION:
406         objId = ( (Integer) req.getData()[0]).intValue();
407         topicConnectionFactory = (TopicConnectionFactory) (manager.findObject(
408             objId, req.owner, req.mobile)).getObj();
409         if (topicConnectionFactory == null)
410           return handleExc(req, EXC_OTHER, "Invalid TopicConnectionFactory");
411         try {
412           topicConnection = topicConnectionFactory.createTopicConnection();
413         }
414         catch (JMSException e) {
415           return handleExc(req, EXC_OTHER, "Cannot create TopicConnection");
416         }
417         jmsObject = manager.newObject(req.owner, req.mobile,
418                                       type.TOPIC_(type.CONNECTION));
419         jmsObject.setObj(topicConnection);
420         manager.addObject(jmsObject);
421         replyData = new Object[1];
422         replyData[0] = new Integer(jmsObject.id);
423         reply.setData(replyData);
424         reply.code = REP_TOPIC_CONNECTION;
425         return reply;
426 
427       case CREATE_QUEUE_SESSION:
428         objId = ( (Integer) req.getData()[0]).intValue();
429         log("create queue session dla objId = " + objId);
430         cond = ( (Boolean) req.getData()[1]).booleanValue();
431         mode = ( (Integer) req.getData()[2]).intValue();
432         queueConnection = (QueueConnection) (manager.findObject(
433             objId, req.owner, req.mobile)).getObj();
434         if (queueConnection == null)
435           return handleExc(req, EXC_OTHER, "Invalid QueueConnection");
436         try {
437           queueSession = queueConnection.createQueueSession(cond,Session.CLIENT_ACKNOWLEDGE);
438         }
439         catch (JMSException e) {
440           return handleExc(req, EXC_OTHER, "Cannot create QueueSession");
441         }
442         jmsObject = manager.newObject(req.owner, req.mobile,
443                                       type.QUEUE_(type.SESSION));
444         jmsObject.setObj(queueSession);
445         manager.addObject(jmsObject);
446         replyData = new Object[1];
447         replyData[0] = new Integer(jmsObject.id);
448         reply.setData(replyData);
449         reply.code = REP_QUEUE_SESSION;
450         return reply;
451 
452 
453       case CREATE_TOPIC_SESSION:
454         objId = ( (Integer) req.getData()[0]).intValue();
455         cond = ( (Boolean) req.getData()[1]).booleanValue();
456         mode = ( (Integer) req.getData()[2]).intValue();
457         topicConnection = (TopicConnection) (manager.findObject(
458             objId, req.owner, req.mobile)).getObj();
459         if (topicConnection == null)
460           return handleExc(req, EXC_OTHER, "Invalid Topic");
461         try {
462           topicSession = topicConnection.createTopicSession(cond,Session.CLIENT_ACKNOWLEDGE);
463         }
464         catch (JMSException e) {
465           return handleExc(req, EXC_OTHER, "Cannot create TopicSession");
466         }
467         jmsObject = manager.newObject(req.owner, req.mobile,
468                                       type.TOPIC_(type.SESSION));
469         jmsObject.setObj(topicSession);
470         manager.addObject(jmsObject);
471         replyData = new Object[1];
472         replyData[0] = new Integer(jmsObject.id);
473         reply.setData(replyData);
474         reply.code = REP_TOPIC_SESSION;
475         return reply;
476 
477       case CREATE_QUEUE_SENDER:
478         objId = ( (Integer) req.getData()[0]).intValue();
479         queueSession = (QueueSession) (manager.findObject(
480             objId, req.owner, req.mobile)).getObj();
481         if (queueSession == null)
482           return handleExc(req, EXC_OTHER, "Invalid QueueSession");
483         objId = ( (Integer) req.getData()[1]).intValue();
484         queue = (Queue) (manager.findObject(
485             objId, req.owner, req.mobile)).getObj();
486         if (queue == null)
487           return handleExc(req, EXC_OTHER, "Invalid Queue");
488 
489         try {
490           queueSender = queueSession.createSender(queue);
491         }
492         catch (JMSException e) {
493           return handleExc(req, EXC_OTHER, "Cannot create queueSender");
494         }
495         jmsObject = manager.newObject(req.owner, req.mobile,
496                                       type.QUEUE_(type.PRODUCER));
497         jmsObject.setObj(queueSender);
498         manager.addObject(jmsObject);
499         replyData = new Object[1];
500         replyData[0] = new Integer(jmsObject.id);
501         reply.setData(replyData);
502         reply.code = REP_QUEUE_SENDER;
503         return reply;
504 
505       case CREATE_TOPIC_PUBLISHER:
506         objId = ( (Integer) req.getData()[0]).intValue();
507         topicSession = (TopicSession) (manager.findObject(
508             objId, req.owner, req.mobile)).getObj();
509         if (topicSession == null)
510           return handleExc(req, EXC_OTHER, "Invalid TopicSession");
511         objId = ( (Integer) req.getData()[1]).intValue();
512         topic = (Topic) (manager.findObject(
513             objId, req.owner, req.mobile)).getObj();
514         if (topic == null)
515           return handleExc(req, EXC_OTHER, "Invalid Topic");
516 
517         try {
518           topicPublisher = topicSession.createPublisher(topic);
519         }
520         catch (JMSException e) {
521           return handleExc(req, EXC_OTHER, "Cannot create topicPublisher");
522         }
523         jmsObject = manager.newObject(req.owner, req.mobile,
524                                       type.TOPIC_(type.PRODUCER));
525         jmsObject.setObj(topicPublisher);
526         manager.addObject(jmsObject);
527         replyData = new Object[1];
528         replyData[0] = new Integer(jmsObject.id);
529         reply.setData(replyData);
530         reply.code = REP_TOPIC_PUBLISHER;
531         return reply;
532 
533       case CREATE_QUEUE_RECEIVER:
534         objId = ( (Integer) req.getData()[0]).intValue();
535         queueSession = (QueueSession) (manager.findObject(
536             objId, req.owner, req.mobile)).getObj();
537         if (queueSession == null)
538           return handleExc(req, EXC_OTHER, "Invalid QueueSession");
539         objId = ( (Integer) req.getData()[1]).intValue();
540         queue = (Queue) (manager.findObject(
541             objId, req.owner, req.mobile)).getObj();
542         if (queue == null)
543           return handleExc(req, EXC_OTHER, "Invalid Queue");
544 
545         try {
546           queueReceiver = queueSession.createReceiver(queue);
547         }
548         catch (JMSException e) {
549           return handleExc(req, EXC_OTHER, "Cannot create queueReceiver");
550         }
551         jmsObject = manager.newObject(req.owner, req.mobile,
552                                       type.QUEUE_(type.CONSUMER));
553         jmsObject.setObj(queueReceiver);
554         manager.addObject(jmsObject);
555         replyData = new Object[1];
556         replyData[0] = new Integer(jmsObject.id);
557         reply.setData(replyData);
558         reply.code = REP_QUEUE_RECEIVER;
559         return reply;
560 
561       case CREATE_TOPIC_SUBSCRIBER:
562         objId = ( (Integer) req.getData()[0]).intValue();
563         topicSession = (TopicSession) (manager.findObject(
564             objId, req.owner, req.mobile)).getObj();
565         if (topicSession == null)
566           return handleExc(req, EXC_OTHER, "Invalid TopicSession");
567         objId = ( (Integer) req.getData()[1]).intValue();
568         topic = (Topic) (manager.findObject(
569             objId, req.owner, req.mobile)).getObj();
570         if (topic == null)
571           return handleExc(req, EXC_OTHER, "Invalid Topic");
572 
573         try {
574           topicSubscriber = topicSession.createSubscriber(topic);
575         }
576         catch (JMSException e) {
577           return handleExc(req, EXC_OTHER, "Cannot create topicSubscriber");
578         }
579         jmsObject = manager.newObject(req.owner, req.mobile,
580                                       type.TOPIC_(type.CONSUMER));
581         jmsObject.setObj(topicSubscriber);
582                 manager.addObject(jmsObject);
583         replyData = new Object[1];
584         replyData[0] = new Integer(jmsObject.id);
585         reply.setData(replyData);
586         reply.code = REP_TOPIC_SUBSCRIBER;
587         return reply;
588 
589       case SEND_TEXT_MESSAGE:
590         objId = ( (Integer) req.getData()[0]).intValue();
591         queueSession = (QueueSession) (manager.findObject(
592             objId, req.owner, req.mobile)).getObj();
593         if (queueSession == null)
594           return handleExc(req, EXC_OTHER, "Invalid QueueSession");
595         objId = ( (Integer) req.getData()[1]).intValue();
596         queueSender = (QueueSender) (manager.findObject(
597             objId, req.owner, req.mobile)).getObj();
598         if (queueSender == null)
599           return handleExc(req, EXC_OTHER, "Invalid QueueSender");
600         objId = ( (Integer) req.getData()[2]).intValue();
601         queue = (Queue) (manager.findObject(
602             objId, req.owner, req.mobile)).getObj();
603         if (queue == null)
604           return handleExc(req, EXC_OTHER, "Invalid Queue");
605 
606         objId = ( (Integer) req.getData()[7]).intValue();
607 
608           tempObj = (manager.findObject(
609               objId, req.owner, req.mobile));
610 
611         if(tempObj != null) replyTo = (Destination) tempObj;
612 
613 
614 
615         objId = ( (Integer) req.getData()[8]).intValue();
616         tempObj =  (manager.findObject(
617             objId, req.owner, req.mobile));
618       if(tempObj != null) dest = (Destination) tempObj;
619       /*if the destinations cannot be found, they will be set to null */
620 
621         try {
622          textMessage = queueSession.createTextMessage();
623         }
624         catch (JMSException e) {
625           return handleExc(req, EXC_OTHER, "Cannot create message");
626         }
627 
628         try{
629           data = req.getData();
630           textMessage.setText( ( (String) data[3]));
631           textMessage.setJMSMessageID( (String) data[4]);
632           textMessage.setJMSTimestamp(( (Long) data[5]).longValue());
633           textMessage.setJMSCorrelationID( (String) data[6]);
634           if(replyTo != null) textMessage.setJMSReplyTo(replyTo);
635           if(dest != null) textMessage.setJMSDestination(dest);
636           if(((Integer) data[9]).intValue() != 0) textMessage.setJMSDeliveryMode( Constant.get(((Integer) data[9]).intValue()));
637           textMessage.setJMSRedelivered( ((Boolean) data[10]).booleanValue());
638           textMessage.setJMSType( (String) data[11]);
639           textMessage.setJMSExpiration( ( (Long) data[12]).longValue());
640           textMessage.setJMSPriority( ( (Integer) data[13]).intValue());
641 
642           //properties
643           JMSMessage jmsMesg = new JMSMessage(req.owner, req.mobile);
644           jmsMesg.setObj(textMessage);
645           jmsMesg.applyProps((Vector) data[14]);
646           textMessage = (TextMessage) jmsMesg.getObj();
647 
648           queueSender.send(textMessage);
649         }
650         catch (InvalidDestinationException e) {
651           log("invalid destination exception while sending mesg: ");
652           e.printStackTrace();
653           return handleExc(req, EXC_INVALID_DESTINATION, "Cannot send message");
654         }
655         catch (JMSException e) {
656           log("JMS exception while sending mesg: ");
657           e.printStackTrace();
658           return handleExc(req, EXC_OTHER, "Cannot send message");
659         }
660         return null;
661 
662       case SEND_BYTES_MESSAGE:
663         objId = ( (Integer) req.getData()[0]).intValue();
664         queueSession = (QueueSession) (manager.findObject(
665             objId, req.owner, req.mobile)).getObj();
666         if (queueSession == null)
667           return handleExc(req, EXC_OTHER, "Invalid QueueSession");
668         objId = ( (Integer) req.getData()[1]).intValue();
669         queueSender = (QueueSender) (manager.findObject(
670             objId, req.owner, req.mobile)).getObj();
671         if (queueSender == null)
672           return handleExc(req, EXC_OTHER, "Invalid QueueSender");
673         objId = ( (Integer) req.getData()[2]).intValue();
674         queue = (Queue) (manager.findObject(
675             objId, req.owner, req.mobile)).getObj();
676         if (queue == null)
677           return handleExc(req, EXC_OTHER, "Invalid Queue");
678 
679 
680         objId = ( (Integer) req.getData()[7]).intValue();
681 
682             tempObj = (manager.findObject(
683                 objId, req.owner, req.mobile));
684 
685           if(tempObj != null) replyTo = (Destination) tempObj;
686 
687 
688 
689           objId = ( (Integer) req.getData()[8]).intValue();
690           tempObj =  (manager.findObject(
691               objId, req.owner, req.mobile));
692         if(tempObj != null) dest = (Destination) tempObj;
693         /*if the destinations cannot be found, they will be set to null */
694 
695 
696         try {
697           bytesMessage = queueSession.createBytesMessage();
698         }
699         catch (JMSException e) {
700           return handleExc(req, EXC_OTHER, "Cannot create message");
701         }
702         try {
703           data = req.getData();
704           bytesMessage.writeBytes( (byte[]) data[3]);
705           bytesMessage.setJMSMessageID( (String) data[4]);
706           bytesMessage.setJMSTimestamp( ( (Long) data[5]).longValue());
707           bytesMessage.setJMSCorrelationID( (String) data[6]);
708           if(replyTo != null) textMessage.setJMSReplyTo(replyTo);
709           if(dest != null) textMessage.setJMSDestination(dest);
710           bytesMessage.setJMSDeliveryMode( ( (Integer) data[9]).intValue());
711           bytesMessage.setJMSRedelivered( ((Boolean) data[10]).booleanValue());
712           bytesMessage.setJMSType( (String) data[11]);
713           bytesMessage.setJMSExpiration( ( (Long) data[12]).longValue());
714           bytesMessage.setJMSPriority( ( (Integer) data[13]).intValue());
715           //properties
716           JMSMessage jmsMesg = new JMSMessage(req.owner, req.mobile);
717           jmsMesg.setObj(bytesMessage);
718           jmsMesg.applyProps( (Vector) data[14]);
719           bytesMessage = (BytesMessage) jmsMesg.getObj();
720 
721           queueSender.send(bytesMessage);
722         }
723         catch (InvalidDestinationException e) {
724           log("invalid destination exception while sending mesg: ");
725           e.printStackTrace();
726           return handleExc(req, EXC_INVALID_DESTINATION,
727                            "Cannot send message");
728         }
729         catch (JMSException e) {
730           log("JMS exception while sending mesg: ");
731           e.printStackTrace();
732           return handleExc(req, EXC_OTHER, "Cannot send message");
733         }
734         return null;
735 
736       case PUBLISH_TEXT_MESSAGE:
737         log("poczatek handle :|>>>>>>>>>>>>>>>>>>>");
738         objId = ( (Integer) req.getData()[0]).intValue();
739         topicSession = (TopicSession) (manager.findObject(
740             objId, req.owner, req.mobile)).getObj();
741         if (topicSession == null)
742           return handleExc(req, EXC_OTHER, "Invalid TopicSession");
743         objId = ( (Integer) req.getData()[1]).intValue();
744         topicPublisher = (TopicPublisher) (manager.findObject(
745             objId, req.owner, req.mobile)).getObj();
746         if (topicPublisher == null)
747           return handleExc(req, EXC_OTHER, "Invalid TopicSender");
748         objId = ( (Integer) req.getData()[2]).intValue();
749         topic = (Topic) (manager.findObject(
750             objId, req.owner, req.mobile)).getObj();
751         if (topic == null)
752           return handleExc(req, EXC_OTHER, "Invalid Topic");
753 
754         log("srodek handle >>>>>>>>>>>>>>>>");
755         try{
756           objId = ( (Integer) req.getData()[7]).intValue();
757 
758           tempObj = (manager.findObject(
759               objId, req.owner, req.mobile));
760 
761           if (tempObj != null)
762             replyTo = (Destination) tempObj;
763 
764           objId = ( (Integer) req.getData()[8]).intValue();
765           tempObj = (manager.findObject(
766               objId, req.owner, req.mobile));
767           if (tempObj != null)
768             dest = (Destination) tempObj;
769         }catch (Exception e){
770           log("Tu jest ten wyjatek, ale to w sumie niewazne");
771         }
772           /*if the destinations cannot be found, they will be set to null */
773 
774 
775         try {
776          textMessage = topicSession.createTextMessage();
777         }
778         catch (JMSException e) {
779           return handleExc(req, EXC_OTHER, "Cannot create message");
780         }
781         try{
782           data = req.getData();
783           log("1");
784           textMessage.setText( ( (String) data[3]));
785                     log("2");
786           textMessage.setJMSMessageID( (String) data[4]);
787                     log("3");
788           textMessage.setJMSTimestamp(( (Long) data[5]).longValue());
789                     log("4");
790           textMessage.setJMSCorrelationID( (String) data[6]);
791                     log("5");
792           if(replyTo != null) textMessage.setJMSReplyTo(replyTo);
793                       log("6");
794           if(dest != null) textMessage.setJMSDestination(dest);
795                       log("7");
796           //textMessage.setJMSDeliveryMode( ((Integer) data[9]).intValue());
797           textMessage.setJMSDeliveryMode( javax.jms.DeliveryMode.NON_PERSISTENT);
798                     log("8");
799           textMessage.setJMSRedelivered( ((Boolean) data[10]).booleanValue());
800                     log("9");
801           textMessage.setJMSType( (String) data[11]);
802                     log("10");
803           textMessage.setJMSExpiration( ( (Long) data[12]).longValue());
804                     log("11");
805           textMessage.setJMSPriority( ( (Integer) data[13]).intValue());
806                     log("12");
807           //properties
808           JMSMessage jmsMesg = new JMSMessage(req.owner, req.mobile);
809                     log("13");
810           jmsMesg.setObj(textMessage);
811                     log("14");
812           jmsMesg.applyProps( (Vector) data[14]);
813                     log("15");
814           textMessage = (TextMessage) jmsMesg.getObj();
815                     log("16");
816           log("prawie koniec handle >>>>>>>>>>>>>>>>");
817           topicPublisher.publish(textMessage);
818         }
819         catch (InvalidDestinationException e) {
820           return handleExc(req, EXC_INVALID_DESTINATION, "Cannot publish message");
821         }
822         catch (JMSException e) {
823           return handleExc(req, EXC_OTHER, "Cannot publish message");
824         }
825         return null;
826 
827       case PUBLISH_BYTES_MESSAGE:
828        objId = ( (Integer) req.getData()[0]).intValue();
829        topicSession = (TopicSession) (manager.findObject(
830            objId, req.owner, req.mobile)).getObj();
831        if (topicSession == null)
832          return handleExc(req, EXC_OTHER, "Invalid TopicSession");
833        objId = ( (Integer) req.getData()[1]).intValue();
834        topicPublisher = (TopicPublisher) (manager.findObject(
835            objId, req.owner, req.mobile)).getObj();
836        if (topicPublisher == null)
837          return handleExc(req, EXC_OTHER, "Invalid TopicSender");
838        objId = ( (Integer) req.getData()[2]).intValue();
839        topic = (Topic) (manager.findObject(
840            objId, req.owner, req.mobile)).getObj();
841        if (topic == null)
842          return handleExc(req, EXC_OTHER, "Invalid Topic");
843 
844        objId = ( (Integer) req.getData()[7]).intValue();
845 
846           tempObj = (manager.findObject(
847               objId, req.owner, req.mobile));
848 
849         if(tempObj != null) replyTo = (Destination) tempObj;
850 
851 
852 
853         objId = ( (Integer) req.getData()[8]).intValue();
854         tempObj =  (manager.findObject(
855             objId, req.owner, req.mobile));
856       if(tempObj != null) dest = (Destination) tempObj;
857       /*if the destinations cannot be found, they will be set to null */
858 
859         try {
860           bytesMessage = queueSession.createBytesMessage();
861         }
862         catch (JMSException e) {
863           return handleExc(req, EXC_OTHER, "Cannot create message");
864         }
865         try {
866           data = req.getData();
867           bytesMessage.writeBytes( (byte[]) data[3]);
868           bytesMessage.setJMSMessageID( (String) data[4]);
869           bytesMessage.setJMSTimestamp( ( (Long) data[5]).longValue());
870           bytesMessage.setJMSCorrelationID( (String) data[6]);
871           if(replyTo != null) textMessage.setJMSReplyTo(replyTo);
872           if(dest != null) textMessage.setJMSDestination(dest);
873           bytesMessage.setJMSDeliveryMode( ( (Integer) data[9]).intValue());
874           bytesMessage.setJMSRedelivered( ((Boolean) data[10]).booleanValue());
875           bytesMessage.setJMSType( (String) data[11]);
876           bytesMessage.setJMSExpiration( ( (Long) data[12]).longValue());
877           bytesMessage.setJMSPriority( ( (Integer) data[13]).intValue());
878 
879           //properties
880           JMSMessage jmsMesg = new JMSMessage(req.owner, req.mobile);
881           jmsMesg.setObj(bytesMessage);
882           jmsMesg.applyProps( (Vector) data[14]);
883           bytesMessage = (BytesMessage) jmsMesg.getObj();
884 
885           topicPublisher.publish(bytesMessage);
886         }
887         catch (InvalidDestinationException e) {
888           return handleExc(req, EXC_INVALID_DESTINATION,
889                            "Cannot send message");
890         }
891         catch (JMSException e) {
892           return handleExc(req, EXC_OTHER, "Cannot send message");
893         }
894         return null;
895 
896 
897       case RECEIVE_MESSAGE_FROM_QUEUE:
898       case RECEIVE_MESSAGE_FROM_TOPIC:
899       objId = ( (Integer) req.getData()[0]).intValue();
900         messageConsumer = (MessageConsumer) (manager.findObject(
901               objId, req.owner, req.mobile)).getObj();
902         if (messageConsumer == null)
903           return handleExc(req, EXC_OTHER, "Invalid MessageConsumer");
904         objId = ( (Integer) req.getData()[2]).intValue(); /*Session.id*/
905         session = (Session) (manager.findObject(
906               objId, req.owner, req.mobile)).getObj();
907         if (session == null)
908           return handleExc(req, EXC_OTHER, "Invalid Session");
909 
910         Receiver receiver = new Receiver(this, messageConsumer,session, req);
911         return null;
912 
913 
914       case STOP_CONNECTION:
915         connection = (Connection) manager.findObject( ( (Integer) req.getData()[0]).
916             intValue(), req.owner, req.mobile).getObj();
917         if(connection == null) return null;
918         try{
919           connection.stop();
920         }
921         catch(JMSException e){
922           return null;
923         }
924         return null;
925       case START_CONNECTION:
926       connection = (Connection) manager.findObject( ( (Integer) req.getData()[0]).
927           intValue(), req.owner, req.mobile).getObj();
928       if(connection == null) return null;
929       try{
930         connection.start();
931       }
932       catch(JMSException e){
933           return null;
934         }
935 
936       return null;
937     case CLOSE_CONNECTION:
938       connection = (Connection) manager.findObject( ( (Integer) req.getData()[0]).
939           intValue(), req.owner, req.mobile).getObj();
940       if (connection == null)
941         return null;
942       try{
943         connection.close();
944       }catch(JMSException e){
945           return null;
946         }
947 
948       return null;
949     case COMMIT:
950       session = (Session) manager.findObject( ( (Integer) req.getData()[0]).
951                                              intValue(), req.owner,
952                                              req.mobile).getObj();
953       if (session == null)
954         return null;
955       try{
956         session.commit();
957       }
958       catch(JMSException e){
959           return null;
960       }
961 
962       return null;
963 
964     case ROLLBACK:
965       session = (Session) manager.findObject( ( (Integer) req.getData()[0]).
966                                              intValue(), req.owner,
967                                              req.mobile).getObj();
968       if (session == null)
969         return null;
970       try{
971         session.rollback();
972       }
973       catch(JMSException e){
974           return null;
975         }
976      return null;
977    case CLOSE_MESSAGE_PRODUCER:
978      producer = (MessageProducer) manager.findObject( ( (Integer) req.getData()[0]).
979                                             intValue(), req.owner,
980                                             req.mobile).getObj();
981      if (producer == null)
982        return null;
983      producer.close();
984       return null;
985 
986     case CLOSE_MESSAGE_CONSUMER:
987       messageConsumer = (MessageConsumer) manager.findObject( ( (Integer) req.getData()[0]).
988                                            intValue(), req.owner,
989                                            req.mobile).getObj();
990     if (messageConsumer == null)
991       return null;
992     messageConsumer.close();
993      return null;
994    case ACKNOWLEDGE_MESSAGE:
995      message = (Message) manager.findObject( ( (Integer) req.getData()[0]).
996                                           intValue(), req.owner,
997                                           req.mobile).getObj();
998    if (message == null)
999      return null;
1000   message.acknowledge();
1001    return null;
1002    /*DONE 24/28 */
1003    /*TODO - creating and deleting  temporary destinations, durable */
1004   /* case CREATE_TEMPORARY_QUEUE:
1005      queueSession = (QueueSession) manager.findObject( ( (Integer) data[0]).
1006                                             intValue(), req.owner,
1007                                             req.mobile).getObj();
1008      if (session == null)
1009        return null;
1010      jmsObject = manager.newObject(req.owner, req.mobile,
1011                                      type.QUEUE_(type.CONNECTION));
1012        jmsObject.setObj(queueConnection);
1013        replyData = new Object[1];
1014        replyData[0] = new Integer(jmsObject.id);
1015        reply.setData(replyData);
1016        reply.code = REP_QUEUE_CONNECTION;
1017        return reply;
1018
1019      return null;*/
1020
1021
1022
1023
1024
1025    } // end switch
1026    log("no catch statement matched in handleRequest- returning null");
1027    return null;
1028  } //end try
1029  catch (NamingException e) {
1030    log("oops! Naming exception:" + e);
1031
1032    return handleExc(req, EXC_NAMING, "Naming exception");
1033  }
1034  catch (Exception e) {
1035    log("oops! Exception:" + e);
1036    e.printStackTrace();
1037    return handleExc(req, EXC_OTHER, "Other exception");
1038  }
1039
1040}
1041
1042/**
1043 * <p>
1044 * The main thread. It takes the next request from the request queue
1045 *  and executes the action associated
1046 * with it. The action generates a reply, which is handled by the processReply
1047 * method.
1048 * </p>
1049 */
1050public void run() {
1051  Request nextRequest = null;
1052  Reply reply = null;
1053  while (true) {
1054    if (requestQueue.size() > 0) { /*queue not empty */
1055      synchronized (this) {
1056        nextRequest = (Request) requestQueue.getFirst();
1057      }
1058      reply = (Reply) handleRequest(nextRequest);
1059       // log("niemozliwe WCZESNIEJ");
1060
1061      //log("reply.code= " + reply.code);
1062      //if(reply.getData() == null) log(" w run niemozliwe");
1063      //      else log("w run jeszcze repData jest ok");
1064      synchronized (this) {
1065        nextRequest = (Request) requestQueue.removeFirst();
1066      }
1067      if (reply != null) processReply(reply);
1068    }
1069  }
1070 }
1071
1072  private class Replier extends Thread{
1073    ProxyRequestManager parent;
1074    public Replier(ProxyRequestManager par){
1075      parent = par;
1076      start();
1077    }
1078
1079    public void run(){
1080
1081      Reply nextReply;
1082      while(true){
1083  if(replyQueue.size() > 0){ /*queue not empty */
1084          synchronized(parent){
1085              nextReply = (Reply) replyQueue.removeFirst();
1086          }
1087         // if(nextReply == null) log("reply null");
1088         // if (nextReply.getData() == null) log("getdata null w replier");
1089         // log("replier: reply code " + nextReply.code + "object id " );
1090          talking.putNext(nextReply.code,  nextReply.owner,  nextReply.id,
1091                          nextReply.mobile, nextReply.serialize());
1092        }
1093      }
1094    }
1095  } //end replier
1096
1097  private class Receiver
1098      extends Thread {
1099    MessageConsumer consumer;
1100    Session session;
1101    Request request;
1102    ProxyRequestManager parent;
1103
1104
1105    public Receiver(ProxyRequestManager par, MessageConsumer cons, Session ses, Request req) {
1106
1107      parent = par;
1108      consumer = cons;
1109      session = ses;
1110      request = req;
1111      log("Receiver wystartowal ................");
1112      start();
1113    }
1114
1115    /**
1116     * <p>
1117     * Copies len bytes from source to dest beginning from offset in the dest and
1118     * from the begining in the source
1119     * </p><p>
1120     *
1121     * </p>
1122     */
1123
1124     private void copyBytes(byte[] source, byte[] dest, int offset, int len){
1125      int i = 0;
1126      for (i = 0; i < len; i++){
1127        dest[offset + i] = source[i];
1128      }
1129     }
1130
1131    public void run() {
1132      Reply reply = new Reply(request);
1133      Message mesg = null;
1134      BytesMessage bmesg = null;
1135      TextMessage tmesg = null;
1136      Object data[] = new Object[13];
1137      ProxyStoredObject jmsObject = null;
1138      ProxyStoredObject jmsMesgObject = null;
1139      ProxyStoredObjectType type = new ProxyStoredObjectType();
1140      boolean text = false; //if it's a text or bytes message
1141      try {
1142        session.recover();
1143        mesg = consumer.receive();
1144        try {
1145          bmesg = (BytesMessage) mesg;
1146        }
1147        catch (Exception e1) {
1148          try {
1149            tmesg = (TextMessage) mesg;
1150            text = true;
1151          }
1152          catch (Exception e2) {
1153            parent.handleExc(request, EXC_OTHER, "Unsupported message type");
1154            return;
1155          }
1156        }
1157
1158      }
1159      catch (JMSException e3){
1160        parent.handleExc(request, EXC_OTHER, "Message receive failed");
1161        return;
1162      }
1163
1164      try {
1165        jmsMesgObject = manager.newObject(request.owner, request.mobile,
1166                                      type.MESSAGE);
1167        jmsMesgObject.setObj(mesg);
1168        manager.addObject(jmsMesgObject);
1169        data[0] = new Integer(jmsMesgObject.id);
1170      }
1171      catch(Exception e) {
1172        parent.handleExc(request, EXC_OTHER, "Message receive failed");
1173      }
1174
1175
1176      try {
1177        if (text) {
1178          data[1] = tmesg.getText();
1179            reply.code = REP_TEXT_MESSAGE;
1180        }
1181        else {
1182          byte tmp[] = new byte[MAX_BYTES_LEN];
1183          int len = bmesg.readBytes( (byte[]) tmp);
1184          data[1] = new byte[len];
1185          copyBytes(tmp, (byte[]) data[1], 0, len);
1186          reply.code = REP_BYTES_MESSAGE;
1187        }
1188        data[2] = mesg.getJMSMessageID();
1189        data[3] = new Long(mesg.getJMSTimestamp());
1190        data[4] = mesg.getJMSCorrelationID();
1191        /*DESTINATIONS*/
1192        /*ReplyTo*/
1193        jmsObject = manager.newObject(request.owner, request.mobile,
1194                                      type.JNDIOBJECT);
1195
1196        jmsObject.setObj(mesg.getJMSReplyTo());
1197        manager.addObject(jmsObject);
1198        data[5] = new Integer(jmsObject.id);
1199        /*JMSDestination*/
1200        jmsObject = manager.newObject(request.owner, request.mobile,
1201                                      type.JNDIOBJECT);
1202
1203        jmsObject.setObj(mesg.getJMSDestination());
1204        manager.addObject(jmsObject);
1205        data[6] = new Integer(jmsObject.id);
1206
1207        data[7] = new Integer(mesg.getJMSDeliveryMode());
1208        data[8] = new Boolean(mesg.getJMSRedelivered());
1209        data[9] = mesg.getJMSType();
1210        data[10] = new Long(mesg.getJMSExpiration());
1211        data[11] = new Integer(mesg.getJMSPriority());
1212         log("!!!!!!!!!!!!!!!!!!!!przsed propsami ");
1213        data[12] = ((proxy.jmsapi.JMSMessage) jmsMesgObject).propsVector();
1214        if (data[12] == null) log("PROPSY SA NULLOWE ");
1215          else log("Jest " + ((Vector) data[12]).size() + " PROPSOW");
1216      }
1217      catch (JMSException e) {
1218        parent.handleExc(request, EXC_OTHER, "Error processing message");
1219        return;
1220      }
1221      catch(Exception e) {
1222        parent.handleExc(request, EXC_OTHER, "Message receive failed");
1223      }
1224      if (data[12] == null) log("PROPSY SA NULLOWE ");
1225          else log("Jest " + ((Vector) data[12]).size() + " PROPSOW");
1226
1227      reply.setData(data);
1228      parent.processReply(reply);
1229    }
1230  } //end receiver
1231
1232
1233
1234} // end RequestManager
1235
1236
1237
1238
1239