Docjar: A Java Source and Docuemnt Enginecom.*    java.*    javax.*    org.*    all    new    plug-in

Quick Search    Search Deep

Source code: org/activemq/JmsTopicRequestorTest.java


1   /** 
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  package org.activemq;
19  import java.util.Iterator;
20  
21  import javax.jms.Connection;
22  import javax.jms.Destination;
23  import javax.jms.JMSException;
24  import javax.jms.Message;
25  import javax.jms.MessageConsumer;
26  import javax.jms.MessageListener;
27  import javax.jms.MessageProducer;
28  import javax.jms.Session;
29  import javax.jms.Topic;
30  import javax.jms.TopicConnection;
31  import javax.jms.TopicRequestor;
32  import javax.jms.TopicSession;
33  
34  import junit.framework.TestCase;
35  
36  import org.activemq.advisories.ConnectionAdvisor;
37  import org.activemq.message.ActiveMQTopic;
38  
39  
40  /**
41   * @version $Revision: 1.1.1.1 $
42   */
43  public class JmsTopicRequestorTest extends TestCase  implements MessageListener{
44      protected static final int MESSAGE_COUNT = 10;
45      protected Connection serverConnection;
46      protected TopicConnection requestorConnection;
47      protected MessageProducer serverProducer;
48      protected Topic destination;
49      
50      
51      
52      
53      protected void setUp() throws Exception {
54  
55          String root = System.getProperty("activemq.store.dir");
56          System.setProperty("activemq.store.dir", root + "_broker_fac1");
57  
58          ActiveMQConnectionFactory fac1 = new ActiveMQConnectionFactory("peer://" + getClass().getName() + "?brokerName=server");
59          serverConnection = fac1.createConnection();
60          serverConnection.setClientID("server");
61          serverConnection.start();
62  
63          System.setProperty("activemq.store.dir", root + "_broker_fac2");
64          
65          ActiveMQConnectionFactory fac2 = new ActiveMQConnectionFactory("peer://" + getClass().getName() + "?brokerName=requestor");
66          requestorConnection = fac2.createTopicConnection();
67          requestorConnection.setClientID("requestor");
68          String destinationName = getClass().getName();
69          
70         
71          destination = new ActiveMQTopic(destinationName);
72          requestorConnection.start();
73                 
74          
75          Session s = serverConnection.createSession(false,Session.AUTO_ACKNOWLEDGE);
76          MessageConsumer mc = s.createConsumer(destination);
77          mc.setMessageListener(this);
78          serverProducer = s.createProducer(null);
79          
80          
81          
82          
83          
84          
85      }
86      
87      protected void tearDown() throws Exception{
88          serverConnection.close();
89          requestorConnection.close();
90      }
91      
92      
93      
94      public void onMessage(Message msg){
95          
96          try {
97              Destination replyTo = msg.getJMSReplyTo();
98              serverProducer.send(replyTo,msg);
99              System.out.println("Server sent reply ...: " + msg);
100         }catch(JMSException jmsEx){
101             jmsEx.printStackTrace();
102         }
103     }
104     
105     
106     public void testRequests() throws Exception{
107         try {
108         TopicSession s = requestorConnection.createTopicSession(false,Session.AUTO_ACKNOWLEDGE);
109         TopicRequestor requestor = new TopicRequestor(s,destination);
110         ConnectionAdvisor ca = new ConnectionAdvisor(requestorConnection);
111         ca.start();
112         //
113         // A peer:// when connected will have 1 + (2 * number of connections) connections
114         // As we have 2 peers - total connections == 6!!
115         ca.waitForActiveConnections(6, 30000);
116         for (Iterator i = ca.getConnections().iterator(); i.hasNext();){
117             System.out.println(i.next());
118         }
119         for (int i =0; i < MESSAGE_COUNT; i++){
120             Message msg = s.createTextMessage("test:" + i);
121             Message receipt = requestor.request(msg);
122             System.out.println("Got reply: " + receipt);
123             assertNotNull("receipt is null!!",receipt);
124         }
125         }catch(Throwable e){
126             e.printStackTrace();
127         }
128     }
129 }