Source code: net/jxta/socket/JxtaMulticastSocket.java
1 /*
2 * $Id: JxtaMulticastSocket.java,v 1.7 2004/10/10 00:27:44 bondolo Exp $
3 *
4 * Copyright (c) 2001 Sun Microsystems, Inc. All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions
8 * are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 *
13 * 2. Redistributions in binary form must reproduce the above copyright
14 * notice, this list of conditions and the following disclaimer in
15 * the documentation and/or other materials provided with the
16 * distribution.
17 *
18 * 3. The end-user documentation included with the redistribution,
19 * if any, must include the following acknowledgment:
20 * "This product includes software developed by the
21 * Sun Microsystems, Inc. for Project JXTA."
22 * Alternately, this acknowledgment may appear in the software itself,
23 * if and wherever such third-party acknowledgments normally appear.
24 *
25 * 4. The names "Sun", "Sun Microsystems, Inc.", "JXTA" and "Project JXTA" must
26 * not be used to endorse or promote products derived from this
27 * software without prior written permission. For written
28 * permission, please contact Project JXTA at http://www.jxta.org.
29 *
30 * 5. Products derived from this software may not be called "JXTA",
31 * nor may "JXTA" appear in their name, without prior written
32 * permission of Sun.
33 *
34 * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
35 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
36 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
37 * DISCLAIMED. IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR
38 * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
39 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
40 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
41 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
42 * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
43 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
44 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
45 * SUCH DAMAGE.
46 * ====================================================================
47 *
48 * This software consists of voluntary contributions made by many
49 * individuals on behalf of Project JXTA. For more
50 * information on Project JXTA, please see
51 * <http://www.jxta.org/>.
52 *
53 * This license is based on the BSD license adopted by the Apache Foundation.
54 *
55 */
56 package net.jxta.socket;
57
58 import java.io.ByteArrayInputStream;
59 import java.io.InputStream;
60 import java.io.IOException;
61 import java.net.DatagramPacket;
62 import java.net.InetAddress;
63 import java.net.InetSocketAddress;
64 import java.net.MulticastSocket;
65 import java.net.SocketAddress;
66 import java.net.SocketException;
67 import java.net.URI;
68 import java.util.Enumeration;
69 import java.util.Collections;
70
71 import net.jxta.credential.Credential;
72 import net.jxta.document.MimeMediaType;
73 import net.jxta.document.StructuredDocument;
74 import net.jxta.endpoint.ByteArrayMessageElement;
75 import net.jxta.endpoint.InputStreamMessageElement;
76 import net.jxta.endpoint.Message;
77 import net.jxta.endpoint.MessageElement;
78 import net.jxta.endpoint.StringMessageElement;
79 import net.jxta.id.IDFactory;
80 import net.jxta.impl.util.ProducerBiasedQueue;
81 import net.jxta.membership.MembershipService;
82 import net.jxta.peer.PeerID;
83 import net.jxta.peergroup.PeerGroup;
84 import net.jxta.pipe.InputPipe;
85 import net.jxta.pipe.OutputPipe;
86 import net.jxta.pipe.PipeMsgEvent;
87 import net.jxta.pipe.PipeMsgListener;
88 import net.jxta.pipe.PipeService;
89 import net.jxta.protocol.PipeAdvertisement;
90
91 import org.apache.log4j.Level;
92 import org.apache.log4j.Logger;
93
94 /**
95 * The JxtaMulticastSocket class is useful for sending and receiving
96 * JXTA multicast packets. A JxtaMulticastSocket is a (UDP) DatagramSocket,
97 * with additional capabilities for joining "groups" of other multicast hosts
98 * on the internet.
99 * A multicast group is specified within the context of PeerGroup and a propagate
100 * pipe advertisement.
101 * One would join a multicast group by first creating a MulticastSocket
102 * with the desired peer group and pipe advertisement :
103 *
104 * // join a Multicast group and send the group salutations
105 * ...
106 * String msg = "Hello";
107 * InetAddress group = InetAddress.getByName("228.5.6.7");
108 * MulticastSocket s = new JxtaMulticastSocket(peergroup, propPipeAdv);
109 * //We are joined at this point
110 * DatagramPacket hi = new DatagramPacket(msg.getBytes(), msg.length());
111 * s.send(hi);
112 * // get their responses!
113 * byte[] buf = new byte[1000];
114 * DatagramPacket recv = new DatagramPacket(buf, buf.length);
115 * s.receive(recv);
116 * ...
117 * // OK, I'm done talking - leave the group...
118 * s.close();
119 *
120 * One can also respond only to the sender of the datagram as follows
121 * DatagramPacket res = new DatagramPacket(response.getBytes(), response.length());
122 * res.setAddress(recv.getAddress());
123 * s.send(res);
124 *
125 * When one sends a message to a multicast group, all subscribing recipients to
126 * that peergroup and pipe receive the message (including themselves)
127 * When a socket subscribes to a multicast group/port, it receives datagrams
128 * sent by other hosts to the group/pipe, as do all other members of the group
129 * and pipe. A socket relinquishes membership in a group by the
130 * close() method. Multiple MulticastSocket's may
131 * subscribe to a multicast group and pipe concurrently, and they will all receive
132 * group datagrams.
133
134 * When a datagram is sent it carries along with the peerid of the sender.
135 * The PeerID is represented as a InetAddress in the form of host/ipadress
136 * where host name is the peerid, and ip address is always represented as 0.0.0.0
137 * since it is meaningless in the context of JXTA.
138 * e.g of InetAddress resembles the following:
139 *
140 * uuid-59616261646162614A787461503250339C6014B0F21A49DBBDF2ADBDDBCB314703/0.0.0.0
141 *
142 */
143
144 public class JxtaMulticastSocket extends MulticastSocket implements PipeMsgListener {
145 private final static Logger LOG = Logger.getLogger(JxtaMulticastSocket.class.getName());
146 public static final String NAMESPACE = "JXTAMCAST";
147 public static final String DATATAG = "DATAGRAM";
148 public static final String SRCIDTAG = "SRCID";
149 protected PipeAdvertisement pipeAdv;
150 protected PipeService pipeSvc;
151 protected InputPipe in;
152 protected PeerGroup group;
153 protected SocketAddress socketAddress;
154 protected InetAddress localAddress;
155 protected OutputPipe outputPipe;
156 protected boolean closed = false;
157 protected boolean bound = false;
158 protected ProducerBiasedQueue queue = new ProducerBiasedQueue();
159 protected Credential credential = null;
160 protected StructuredDocument credentialDoc = null;
161 private int timeout = 60000;
162 private byte [] fauxip = new byte[4];
163 private boolean jxtamode = false;
164 private MessageElement srcElement = null;
165
166 /**
167 * Create a multicast socket and bind it to a specific pipe within specified
168 * peer group
169 *
170 *@param group group context
171 *@param pipeAd PipeAdvertisement
172 *@exception IOException if an io error occurs
173 */
174 public JxtaMulticastSocket(PeerGroup group,
175 PipeAdvertisement pipeAd) throws IOException {
176 super();
177 joinGroup(group, pipeAd);
178 }
179
180 /**
181 * joins MutlicastSocket to specified pipe within the context of group
182 *
183 *@param group group context
184 *@param pipeAd PipeAdvertisement
185 *@exception IOException if an io error occurs
186 */
187 public void joinGroup(PeerGroup group, PipeAdvertisement pipeAd) throws IOException {
188
189 if (pipeAd.getType() != null && !pipeAd.getType().equals(PipeService.PropagateType)) {
190 throw new IOException("Only propagate pipe advertisements are supported");
191 }
192 if (pipeAd.getPipeID() == null) {
193 throw new IOException("Invalid pipe advertisement");
194 }
195
196 this.group = group;
197 this.pipeAdv = pipeAd;
198 pipeSvc = group.getPipeService();
199 this.in = pipeSvc.createInputPipe(pipeAd, this);
200 this.credentialDoc = getCredDoc(group);
201 outputPipe = pipeSvc.createOutputPipe(pipeAd, 1);
202 String id = group.getPeerID().getUniqueValue().toString();
203 srcElement = new StringMessageElement(SRCIDTAG, id, null);
204 if (LOG.isEnabledFor(Level.INFO)) {
205 LOG.info("Statring JxtaMulticastSocket on pipe id :"+pipeAdv.getID());
206 }
207 String pipeStr = pipeAd.getPipeID().getUniqueValue().toString();
208 localAddress = InetAddress.getByAddress(pipeStr, fauxip);
209 socketAddress = new InetSocketAddress(localAddress, 0);
210 bound = true;
211 }
212
213 /**
214 * Obtain the credential doc from the group object
215 *
216 *@param group group context
217 *@return The credDoc value
218 */
219 protected static StructuredDocument getCredDoc(PeerGroup group) {
220 try {
221 MembershipService membership = group.getMembershipService();
222 Enumeration each = membership.getCurrentCredentials();
223 if (each.hasMoreElements()) {
224 // get the only credential "nobody"
225 Credential credential = (Credential) each.nextElement();
226 return credential.getDocument(MimeMediaType.XMLUTF8);
227 }
228 } catch (Exception e) {
229 if (LOG.isEnabledFor(Level.WARN)) {
230 LOG.warn("failed to get credential", e);
231 }
232 }
233 return null;
234 }
235
236 /**
237 * Returns the binding state of the MutlicastSocket.
238 *
239 * @return true if the MutlicastSocket successfully bound to an address
240 */
241 public boolean isBound() {
242 return bound;
243 }
244
245 /**
246 * Closes this MutlicastSocket.
247 *
248 *@exception IOException if an I/O error occurs when closing this
249 * socket.
250 */
251 public synchronized void close() {
252 if (closed) {
253 return;
254 }
255 bound = false;
256 closed = true;
257 in.close();
258 outputPipe.close();
259 queue.close();
260 in = null;
261 }
262
263 /**
264 * {@inheritDoc}
265 */
266 public void pipeMsgEvent(PipeMsgEvent event) {
267
268 Message message = event.getMessage();
269 if (message == null) {
270 return;
271 }
272
273 MessageElement element = null;
274 // does the message contain any data
275 element = (MessageElement)
276 message.getMessageElement(NAMESPACE, DATATAG);
277 if (element == null) {
278 return;
279 }
280 try {
281 if (LOG.isEnabledFor(Level.DEBUG)) {
282 LOG.debug("Pushing a message onto queue");
283 }
284 queue.push(message, -1);
285 } catch (InterruptedException e) {
286 if (LOG.isEnabledFor(Level.DEBUG)) {
287 LOG.debug("Interrupted", e);
288 }
289 }
290 }
291
292 /**
293 * Gets the Timeout attribute of the JxtaMulticastSocket
294 *
295 * @return The soTimeout value
296 * @exception IOException if an I/O error occurs
297 */
298 public synchronized int getSoTimeout() {
299 return timeout;
300 }
301
302 /**
303 * Sets the Timeout attribute of the JxtaMulticastSocket
304 * a timeout of 0 blocks forever, by default this Socket's
305 * timeout is set to 0
306 *
307 * @param timeout The new soTimeout value
308 * @exception IOException if an I/O error occurs
309 */
310 public synchronized void setSoTimeout(int timeout)
311 throws SocketException {
312 checkState();
313 this.timeout = timeout;
314 }
315
316 /**
317 * Returns the closed state of the JxtaMulticastSocket.
318 *
319 * @return true if the socket has been closed
320 */
321 public synchronized boolean isClosed() {
322 return closed;
323 }
324
325 /**
326 * Throws a SocketException if closed or not bound
327 */
328 private void checkState() throws SocketException {
329 if (isClosed()) {
330 throw new SocketException("MulticastSocket is closed");
331 } else
332 if (!isBound()) {
333 throw new SocketException("MulticastSocket not bound");
334 }
335 }
336
337 /**
338 * {@inheritDoc}
339 */
340 public void send(DatagramPacket packet) throws IOException {
341 checkState();
342 byte [] data = packet.getData();
343 InputStream bais = new ByteArrayInputStream(data);
344
345 Message msg = new Message();
346 msg.addMessageElement(NAMESPACE,
347 srcElement);
348 msg.addMessageElement(NAMESPACE,
349 new InputStreamMessageElement(DATATAG,
350 MimeMediaType.AOS,
351 bais,
352 null));
353 if (LOG.isEnabledFor(Level.DEBUG)) {
354 LOG.debug("Sending a data packet");
355 }
356 InetAddress address = packet.getAddress();
357 PeerID pid = null;
358 if (address != null) {
359 String pidStr = address.getHostName();
360 try {
361 pid =(PeerID) IDFactory.fromURI(new URI(pidStr));
362 } catch (Exception ex) {
363 //not a valid jxta peerid we multicast, otherwise unicast
364 }
365 }
366 if (pid != null) {
367 // Unicast datagram
368 // create a op pipe to the destination peer
369 OutputPipe op = pipeSvc.createOutputPipe(pipeAdv, Collections.singleton(pid), 1);
370 op.send(msg);
371 op.close();
372 } else {
373 // multicast
374 outputPipe.send(msg);
375 }
376 }
377
378 /**
379 * {@inheritDoc}
380 */
381 public void receive(DatagramPacket packet) throws IOException {
382 checkState();
383 Message msg = null;
384 //data
385 MessageElement del = null;
386 //src
387 MessageElement sel = null;
388 try {
389 msg = (Message) queue.pop(timeout);
390 if(msg == null) {
391 throw new SocketException("Socket timeout reached");
392 }
393 del = msg.getMessageElement(NAMESPACE, DATATAG);
394 sel = msg.getMessageElement(NAMESPACE, SRCIDTAG);
395 if (del == null || sel == null) {
396 if (LOG.isEnabledFor(Level.DEBUG)) {
397 LOG.debug("Message contains no data element, returning");
398 }
399 return;
400 } else {
401 if (LOG.isEnabledFor(Level.DEBUG)) {
402 LOG.debug("Popped a message off the queue");
403 }
404 }
405 } catch (InterruptedException e) {
406 if (LOG.isEnabledFor(Level.DEBUG)) {
407 LOG.debug("Exception occured", e);
408 }
409 throw new IOException(e.toString());
410 }
411 if (del.getByteLength() > packet.getLength()) {
412 throw new IOException("Datagram can not accomodate message of size :"+ del.getByteLength());
413 }
414 String addrStr = new String(sel.getBytes(false),
415 0,
416 (int) sel.getByteLength(),
417 "UTF8");
418 if (LOG.isEnabledFor(Level.DEBUG)) {
419 LOG.debug("Src Address :"+addrStr);
420 }
421 InetAddress address = InetAddress.getByAddress(addrStr, fauxip);
422 if (LOG.isEnabledFor(Level.DEBUG)) {
423 LOG.debug("Setting Data, and Src Address :"+address);
424 }
425 packet.setAddress(address);
426 packet.setData(del.getBytes(false));
427 }
428
429 /**
430 * {@inheritDoc}
431 */
432 public InetAddress getLocalAddress() {
433 if (isClosed()) {
434 return null;
435 }
436 return localAddress;
437 }
438
439 /**
440 * {@inheritDoc}
441 */
442 public SocketAddress getLocalSocketAddress() {
443 if (isClosed()) {
444 return null;
445 }
446 return socketAddress;
447 }
448
449 /**
450 * {@inheritDoc}
451 */
452 public void bind(SocketAddress addr) throws SocketException {
453 if (isBound()) {
454 throw new SocketException("Already bound");
455 }
456 }
457 }
458