Source code: org/activemq/transport/udp/UdpTransportChannel.java
1 /**
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18
19 package org.activemq.transport.udp;
20
21 import java.io.IOException;
22 import java.net.DatagramPacket;
23 import java.net.DatagramSocket;
24 import java.net.InetAddress;
25 import java.net.SocketTimeoutException;
26 import java.net.URI;
27
28 import javax.jms.JMSException;
29
30 import org.activemq.io.WireFormat;
31 import org.activemq.message.Packet;
32 import org.activemq.transport.TransportChannelSupport;
33 import org.activemq.transport.TransportStatusEvent;
34 import org.apache.commons.logging.Log;
35 import org.apache.commons.logging.LogFactory;
36
37 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
38
39 /**
40 * A UDP implementation of a TransportChannel
41 *
42 * @version $Revision: 1.1.1.1 $
43 */
44 public class UdpTransportChannel extends TransportChannelSupport implements Runnable {
45 private static final int SOCKET_BUFFER_SIZE = 32 * 1024;
46 private static final int SO_TIMEOUT = 5000;
47 private static final Log log = LogFactory.getLog(UdpTransportChannel.class);
48 protected DatagramSocket socket;
49 protected int port;
50 protected InetAddress inetAddress;
51 private WireFormat wireFormat;
52 private SynchronizedBoolean closed;
53 private SynchronizedBoolean started;
54 private Thread thread; //need to change this - and use a thread pool
55
56 /**
57 * Construct basic helpers
58 */
59 protected UdpTransportChannel(WireFormat wireFormat) {
60 this.wireFormat = wireFormat;
61 closed = new SynchronizedBoolean(false);
62 started = new SynchronizedBoolean(false);
63 }
64
65 public UdpTransportChannel(WireFormat wireFormat, URI remoteLocation) throws JMSException {
66 this(wireFormat, remoteLocation, remoteLocation.getPort());
67 }
68
69 public UdpTransportChannel(WireFormat wireFormat, URI remoteLocation, int port) throws JMSException {
70 this(wireFormat);
71 try {
72 this.port = port;
73 this.inetAddress = InetAddress.getByName(remoteLocation.getHost());
74 this.socket = createSocket(remoteLocation.getPort());
75 //log.info("Creating multicast socket on port: " + port + " on
76 // host: " + remoteLocation.getHost());
77 socket.setReceiveBufferSize(SOCKET_BUFFER_SIZE);
78 socket.setSendBufferSize(SOCKET_BUFFER_SIZE);
79 connect();
80 // now lets update the port so that sends will go elsewhere
81 }
82 catch (Exception ioe) {
83 JMSException jmsEx = new JMSException("Initialization of TransportChannel failed: " + ioe);
84 jmsEx.setLinkedException(ioe);
85 throw jmsEx;
86 }
87 }
88
89 /**
90 * @param socket
91 * @throws JMSException
92 */
93 public UdpTransportChannel(WireFormat wireFormat, DatagramSocket socket) throws JMSException {
94 this(wireFormat);
95 this.socket = socket;
96 this.port = socket.getPort();
97 this.inetAddress = socket.getInetAddress();
98 try {
99 socket.setReceiveBufferSize(SOCKET_BUFFER_SIZE);
100 socket.setSendBufferSize(SOCKET_BUFFER_SIZE);
101 }
102 catch (IOException ioe) {
103 JMSException jmsEx = new JMSException("Initialization of TransportChannel failed");
104 jmsEx.setLinkedException(ioe);
105 throw jmsEx;
106 }
107 }
108
109 public UdpTransportChannel(WireFormat wireFormat, DatagramSocket socket, int port) throws JMSException {
110 this(wireFormat, socket);
111 this.port = port;
112 }
113
114 /**
115 * close the channel
116 */
117 public void stop() {
118 if (closed.commit(false, true)) {
119 super.stop();
120 try {
121 socket.close();
122 }
123 catch (Exception e) {
124 log.trace(toString() + " now closed");
125 }
126 }
127 }
128
129 public void forceDisconnect() {
130 log.debug("Forcing disconnect");
131 if (socket != null && socket.isConnected()) {
132 socket.close();
133 }
134 setTransportConnected(false);
135 fireStatusEvent(new TransportStatusEvent(this,TransportStatusEvent.DISCONNECTED));
136 }
137
138 /**
139 * start listeneing for events
140 *
141 * @throws JMSException if an error occurs
142 */
143 public void start() throws JMSException {
144 if (started.commit(false, true)) {
145 thread = new Thread(this, toString());
146 if (isServerSide()) {
147 thread.setDaemon(true);
148 }
149 thread.start();
150 }
151 }
152
153 /**
154 * Asynchronously send a Packet
155 *
156 * @param packet
157 * @throws JMSException
158 */
159 public void asyncSend(Packet packet) throws JMSException {
160 try {
161 if (log.isDebugEnabled()) {
162 log.debug("Sending packet: " + packet);
163 }
164 DatagramPacket dpacket = createDatagramPacket(packet);
165 // lets sync to avoid concurrent writes
166 //synchronized (lock) {
167 socket.send(dpacket);
168 //}
169 }
170 catch (IOException e) {
171 JMSException jmsEx = new JMSException("asyncSend failed " + e);
172 jmsEx.setLinkedException(e);
173 onAsyncException(jmsEx);
174 throw jmsEx;
175 }
176 }
177
178 public boolean isMulticast() {
179 return false;
180 }
181
182 /**
183 * reads packets from a Socket
184 */
185 public void run() {
186 DatagramPacket dpacket = createDatagramPacket();
187 while (!closed.get()) {
188 try {
189 socket.setSoTimeout(SO_TIMEOUT);
190 while (!socket.isClosed()) {
191 socket.setSoTimeout(0);
192 socket.receive(dpacket);
193 if (dpacket.getLength() > 0) {
194 Packet packet = wireFormat.readPacket(getClientID(), dpacket);
195 if (packet != null) {
196 doConsumePacket(packet);
197 }
198 }
199 }
200 log.trace("The socket peer is now closed");
201 doClose(new IOException("Socket peer is now closed"));
202 }
203 catch (SocketTimeoutException ste) {
204 //continue;
205 }
206 catch (IOException e) {
207 doClose(e);
208 }
209 }
210 }
211
212 /**
213 * Can this wireformat process packets of this version
214 *
215 * @param version the version number to test
216 * @return true if can accept the version
217 */
218 public boolean canProcessWireFormatVersion(int version) {
219 return wireFormat.canProcessWireFormatVersion(version);
220 }
221
222 /**
223 * @return the current version of this wire format
224 */
225 public int getCurrentWireFormatVersion() {
226 return wireFormat.getCurrentWireFormatVersion();
227 }
228
229 /**
230 * @return
231 */
232 protected DatagramPacket createDatagramPacket() {
233 DatagramPacket answer = new DatagramPacket(new byte[SOCKET_BUFFER_SIZE], SOCKET_BUFFER_SIZE);
234 if (port >= 0) {
235 answer.setPort(port);
236 }
237 answer.setAddress(inetAddress);
238 return answer;
239 }
240
241 protected DatagramPacket createDatagramPacket(Packet packet) throws IOException, JMSException {
242 /*
243 * if (packet instanceof ActiveMQMessage) { ActiveMQMessage message = (ActiveMQMessage) packet;
244 * System.out.println(">>> about to send message with clientID: " + message.getJMSClientID()); }
245 */
246 DatagramPacket answer = wireFormat.writePacket(getClientID(), packet);
247 if (port >= 0) {
248 answer.setPort(port);
249 }
250 answer.setAddress(inetAddress);
251 return answer;
252 }
253
254 private void doClose(Exception ex) {
255 if (!closed.get()) {
256 JMSException jmsEx = new JMSException("Error reading socket: " + ex.getMessage());
257 jmsEx.setLinkedException(ex);
258 onAsyncException(jmsEx);
259 stop();
260 }
261 }
262
263 protected void connect() throws IOException {
264 //socket.connect(inetAddress, port);
265 }
266
267 protected DatagramSocket createSocket(int port) throws IOException {
268 return new DatagramSocket(port, inetAddress);
269 }
270
271 /**
272 * pretty print for object
273 *
274 * @return String representation of this object
275 */
276 public String toString() {
277 return "UdpTransportChannel: " + socket;
278 }
279 }