Docjar: A Java Source and Docuemnt Enginecom.*    java.*    javax.*    org.*    all    new    plug-in

Quick Search    Search Deep

Source code: org/activemq/transport/udp/UdpTransportChannel.java


1   /** 
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  
19  package org.activemq.transport.udp;
20  
21  import java.io.IOException;
22  import java.net.DatagramPacket;
23  import java.net.DatagramSocket;
24  import java.net.InetAddress;
25  import java.net.SocketTimeoutException;
26  import java.net.URI;
27  
28  import javax.jms.JMSException;
29  
30  import org.activemq.io.WireFormat;
31  import org.activemq.message.Packet;
32  import org.activemq.transport.TransportChannelSupport;
33  import org.activemq.transport.TransportStatusEvent;
34  import org.apache.commons.logging.Log;
35  import org.apache.commons.logging.LogFactory;
36  
37  import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
38  
39  /**
40   * A UDP implementation of a TransportChannel
41   *
42   * @version $Revision: 1.1.1.1 $
43   */
44  public class UdpTransportChannel extends TransportChannelSupport implements Runnable {
45      private static final int SOCKET_BUFFER_SIZE = 32 * 1024;
46      private static final int SO_TIMEOUT = 5000;
47      private static final Log log = LogFactory.getLog(UdpTransportChannel.class);
48      protected DatagramSocket socket;
49      protected int port;
50      protected InetAddress inetAddress;
51      private WireFormat wireFormat;
52      private SynchronizedBoolean closed;
53      private SynchronizedBoolean started;
54      private Thread thread; //need to change this - and use a thread pool
55  
56      /**
57       * Construct basic helpers
58       */
59      protected UdpTransportChannel(WireFormat wireFormat) {
60          this.wireFormat = wireFormat;
61          closed = new SynchronizedBoolean(false);
62          started = new SynchronizedBoolean(false);
63      }
64  
65      public UdpTransportChannel(WireFormat wireFormat, URI remoteLocation) throws JMSException {
66          this(wireFormat, remoteLocation, remoteLocation.getPort());
67      }
68  
69      public UdpTransportChannel(WireFormat wireFormat, URI remoteLocation, int port) throws JMSException {
70          this(wireFormat);
71          try {
72              this.port = port;
73              this.inetAddress = InetAddress.getByName(remoteLocation.getHost());
74              this.socket = createSocket(remoteLocation.getPort());
75              //log.info("Creating multicast socket on port: " + port + " on
76              // host: " + remoteLocation.getHost());
77              socket.setReceiveBufferSize(SOCKET_BUFFER_SIZE);
78              socket.setSendBufferSize(SOCKET_BUFFER_SIZE);
79              connect();
80              // now lets update the port so that sends will go elsewhere
81          }
82          catch (Exception ioe) {
83              JMSException jmsEx = new JMSException("Initialization of TransportChannel failed: " + ioe);
84              jmsEx.setLinkedException(ioe);
85              throw jmsEx;
86          }
87      }
88  
89      /**
90       * @param socket
91       * @throws JMSException
92       */
93      public UdpTransportChannel(WireFormat wireFormat, DatagramSocket socket) throws JMSException {
94          this(wireFormat);
95          this.socket = socket;
96          this.port = socket.getPort();
97          this.inetAddress = socket.getInetAddress();
98          try {
99              socket.setReceiveBufferSize(SOCKET_BUFFER_SIZE);
100             socket.setSendBufferSize(SOCKET_BUFFER_SIZE);
101         }
102         catch (IOException ioe) {
103             JMSException jmsEx = new JMSException("Initialization of TransportChannel failed");
104             jmsEx.setLinkedException(ioe);
105             throw jmsEx;
106         }
107     }
108 
109     public UdpTransportChannel(WireFormat wireFormat, DatagramSocket socket, int port) throws JMSException {
110         this(wireFormat, socket);
111         this.port = port;
112     }
113 
114     /**
115      * close the channel
116      */
117     public void stop() {
118         if (closed.commit(false, true)) {
119             super.stop();
120             try {
121                 socket.close();
122             }
123             catch (Exception e) {
124                 log.trace(toString() + " now closed");
125             }
126         }
127     }
128 
129     public void forceDisconnect() {
130         log.debug("Forcing disconnect");
131         if (socket != null && socket.isConnected()) {
132             socket.close();
133         }
134         setTransportConnected(false);
135         fireStatusEvent(new TransportStatusEvent(this,TransportStatusEvent.DISCONNECTED));
136     }
137 
138     /**
139      * start listeneing for events
140      *
141      * @throws JMSException if an error occurs
142      */
143     public void start() throws JMSException {
144         if (started.commit(false, true)) {
145             thread = new Thread(this, toString());
146             if (isServerSide()) {
147                 thread.setDaemon(true);
148             }
149             thread.start();
150         }
151     }
152 
153     /**
154      * Asynchronously send a Packet
155      *
156      * @param packet
157      * @throws JMSException
158      */
159     public void asyncSend(Packet packet) throws JMSException {
160         try {
161             if (log.isDebugEnabled()) {
162                 log.debug("Sending packet: " + packet);
163             }
164             DatagramPacket dpacket = createDatagramPacket(packet);
165             // lets sync to avoid concurrent writes
166             //synchronized (lock) {
167             socket.send(dpacket);
168             //}
169         }
170         catch (IOException e) {
171             JMSException jmsEx = new JMSException("asyncSend failed " + e);
172             jmsEx.setLinkedException(e);
173             onAsyncException(jmsEx);
174             throw jmsEx;
175         }
176     }
177 
178     public boolean isMulticast() {
179         return false;
180     }
181 
182     /**
183      * reads packets from a Socket
184      */
185     public void run() {
186         DatagramPacket dpacket = createDatagramPacket();
187         while (!closed.get()) {
188             try {
189                 socket.setSoTimeout(SO_TIMEOUT);
190                 while (!socket.isClosed()) {
191                     socket.setSoTimeout(0);
192                     socket.receive(dpacket);
193                     if (dpacket.getLength() > 0) {
194                         Packet packet = wireFormat.readPacket(getClientID(), dpacket);
195                         if (packet != null) {
196                             doConsumePacket(packet);
197                         }
198                     }
199                 }
200                 log.trace("The socket peer is now closed");
201                 doClose(new IOException("Socket peer is now closed"));
202             }
203             catch (SocketTimeoutException ste) {
204                 //continue;
205             }
206             catch (IOException e) {
207                 doClose(e);
208             }
209         }
210     }
211 
212     /**
213      * Can this wireformat process packets of this version
214      *
215      * @param version the version number to test
216      * @return true if can accept the version
217      */
218     public boolean canProcessWireFormatVersion(int version) {
219         return wireFormat.canProcessWireFormatVersion(version);
220     }
221 
222     /**
223      * @return the current version of this wire format
224      */
225     public int getCurrentWireFormatVersion() {
226         return wireFormat.getCurrentWireFormatVersion();
227     }
228 
229     /**
230      * @return
231      */
232     protected DatagramPacket createDatagramPacket() {
233         DatagramPacket answer = new DatagramPacket(new byte[SOCKET_BUFFER_SIZE], SOCKET_BUFFER_SIZE);
234         if (port >= 0) {
235             answer.setPort(port);
236         }
237         answer.setAddress(inetAddress);
238         return answer;
239     }
240 
241     protected DatagramPacket createDatagramPacket(Packet packet) throws IOException, JMSException {
242         /*
243          * if (packet instanceof ActiveMQMessage) { ActiveMQMessage message = (ActiveMQMessage) packet;
244          * System.out.println(">>> about to send message with clientID: " + message.getJMSClientID()); }
245          */
246         DatagramPacket answer = wireFormat.writePacket(getClientID(), packet);
247         if (port >= 0) {
248             answer.setPort(port);
249         }
250         answer.setAddress(inetAddress);
251         return answer;
252     }
253 
254     private void doClose(Exception ex) {
255         if (!closed.get()) {
256             JMSException jmsEx = new JMSException("Error reading socket: " + ex.getMessage());
257             jmsEx.setLinkedException(ex);
258             onAsyncException(jmsEx);
259             stop();
260         }
261     }
262 
263     protected void connect() throws IOException {
264         //socket.connect(inetAddress, port);
265     }
266 
267     protected DatagramSocket createSocket(int port) throws IOException {
268         return new DatagramSocket(port, inetAddress);
269     }
270 
271     /**
272      * pretty print for object
273      *
274      * @return String representation of this object
275      */
276     public String toString() {
277         return "UdpTransportChannel: " + socket;
278     }
279 }