Docjar: A Java Source and Docuemnt Enginecom.*    java.*    javax.*    org.*    all    new    plug-in

Quick Search    Search Deep

Source code: net/jxta/socket/JxtaMulticastSocket.java


1   /*
2    *  $Id: JxtaMulticastSocket.java,v 1.7 2004/10/10 00:27:44 bondolo Exp $
3    *
4    *  Copyright (c) 2001 Sun Microsystems, Inc.  All rights reserved.
5    *
6    *  Redistribution and use in source and binary forms, with or without
7    *  modification, are permitted provided that the following conditions
8    *  are met:
9    *
10   *  1. Redistributions of source code must retain the above copyright
11   *  notice, this list of conditions and the following disclaimer.
12   *
13   *  2. Redistributions in binary form must reproduce the above copyright
14   *  notice, this list of conditions and the following disclaimer in
15   *  the documentation and/or other materials provided with the
16   *  distribution.
17   *
18   *  3. The end-user documentation included with the redistribution,
19   *  if any, must include the following acknowledgment:
20   *  "This product includes software developed by the
21   *  Sun Microsystems, Inc. for Project JXTA."
22   *  Alternately, this acknowledgment may appear in the software itself,
23   *  if and wherever such third-party acknowledgments normally appear.
24   *
25   *  4. The names "Sun", "Sun Microsystems, Inc.", "JXTA" and "Project JXTA" must
26   *  not be used to endorse or promote products derived from this
27   *  software without prior written permission. For written
28   *  permission, please contact Project JXTA at http://www.jxta.org.
29   *
30   *  5. Products derived from this software may not be called "JXTA",
31   *  nor may "JXTA" appear in their name, without prior written
32   *  permission of Sun.
33   *
34   *  THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
35   *  WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
36   *  OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
37   *  DISCLAIMED.  IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR
38   *  ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
39   *  SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
40   *  LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
41   *  USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
42   *  ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
43   *  OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
44   *  OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
45   *  SUCH DAMAGE.
46   *  ====================================================================
47   *
48   *  This software consists of voluntary contributions made by many
49   *  individuals on behalf of Project JXTA.  For more
50   *  information on Project JXTA, please see
51   *  <http://www.jxta.org/>.
52   *
53   *  This license is based on the BSD license adopted by the Apache Foundation.
54   *
55   */
56  package net.jxta.socket;
57  
58  import java.io.ByteArrayInputStream;
59  import java.io.InputStream;
60  import java.io.IOException;
61  import java.net.DatagramPacket;
62  import java.net.InetAddress;
63  import java.net.InetSocketAddress;
64  import java.net.MulticastSocket;
65  import java.net.SocketAddress;
66  import java.net.SocketException;
67  import java.net.URI;
68  import java.util.Enumeration;
69  import java.util.Collections;
70  
71  import net.jxta.credential.Credential;
72  import net.jxta.document.MimeMediaType;
73  import net.jxta.document.StructuredDocument;
74  import net.jxta.endpoint.ByteArrayMessageElement;
75  import net.jxta.endpoint.InputStreamMessageElement;
76  import net.jxta.endpoint.Message;
77  import net.jxta.endpoint.MessageElement;
78  import net.jxta.endpoint.StringMessageElement;
79  import net.jxta.id.IDFactory;
80  import net.jxta.impl.util.ProducerBiasedQueue;
81  import net.jxta.membership.MembershipService;
82  import net.jxta.peer.PeerID;
83  import net.jxta.peergroup.PeerGroup;
84  import net.jxta.pipe.InputPipe;
85  import net.jxta.pipe.OutputPipe;
86  import net.jxta.pipe.PipeMsgEvent;
87  import net.jxta.pipe.PipeMsgListener;
88  import net.jxta.pipe.PipeService;
89  import net.jxta.protocol.PipeAdvertisement;
90  
91  import org.apache.log4j.Level;
92  import org.apache.log4j.Logger;
93  
94  /**
95   * The JxtaMulticastSocket class is useful for sending and receiving 
96   * JXTA multicast packets. A JxtaMulticastSocket is a (UDP) DatagramSocket, 
97   * with additional capabilities for joining "groups" of other multicast hosts
98   * on the internet.
99   * A multicast group is specified within the context of PeerGroup and a propagate
100  * pipe advertisement. 
101  * One would join a multicast group by first creating a MulticastSocket
102  * with the desired peer group and pipe advertisement :
103  *
104  *  // join a Multicast group and send the group salutations
105  *  ...
106  *  String msg = "Hello";
107  *  InetAddress group = InetAddress.getByName("228.5.6.7");
108  *  MulticastSocket s = new JxtaMulticastSocket(peergroup, propPipeAdv);
109  *  //We are joined at this point
110  *  DatagramPacket hi = new DatagramPacket(msg.getBytes(), msg.length());
111   * s.send(hi);
112  *  // get their responses!
113  *  byte[] buf = new byte[1000];
114  *  DatagramPacket recv = new DatagramPacket(buf, buf.length);
115  *  s.receive(recv);
116  *  ...
117  *  // OK, I'm done talking - leave the group...
118  *  s.close();
119  * 
120  *  One can also respond only to the sender of the datagram as follows
121  *  DatagramPacket res = new DatagramPacket(response.getBytes(), response.length());
122  *  res.setAddress(recv.getAddress());
123  *  s.send(res);
124  *
125  * When one sends a message to a multicast group, all subscribing recipients to
126  * that peergroup and pipe receive the message (including themselves)
127  * When a socket subscribes to a multicast group/port, it receives datagrams 
128  * sent by other hosts to the group/pipe, as do all other members of the group
129  * and pipe. A socket relinquishes membership in a group by the 
130  * close() method. Multiple MulticastSocket's may 
131  * subscribe to a multicast group and pipe concurrently, and they will all receive
132  * group datagrams.
133 
134  * When a datagram is sent it carries along with the peerid of the sender.
135  * The PeerID is represented as a InetAddress in the form of host/ipadress
136  * where host name is the peerid, and ip address is always represented as 0.0.0.0
137  * since it is meaningless in the context of JXTA.
138  * e.g of InetAddress resembles the following:
139  *
140  * uuid-59616261646162614A787461503250339C6014B0F21A49DBBDF2ADBDDBCB314703/0.0.0.0
141  *
142  */
143 
144 public class JxtaMulticastSocket extends MulticastSocket implements PipeMsgListener {
145     private final static Logger LOG = Logger.getLogger(JxtaMulticastSocket.class.getName());
146     public static final String  NAMESPACE = "JXTAMCAST";
147     public static final String    DATATAG = "DATAGRAM";
148     public static final String   SRCIDTAG = "SRCID";
149     protected PipeAdvertisement pipeAdv;
150     protected PipeService pipeSvc;
151     protected InputPipe in;
152     protected PeerGroup group;
153     protected SocketAddress socketAddress;
154     protected InetAddress localAddress;
155     protected OutputPipe outputPipe;
156     protected boolean closed = false;
157     protected boolean bound = false;
158     protected ProducerBiasedQueue queue = new ProducerBiasedQueue();
159     protected Credential credential = null;
160     protected StructuredDocument credentialDoc = null;
161     private int timeout = 60000;
162     private byte [] fauxip = new byte[4];
163     private boolean jxtamode = false;
164     private MessageElement srcElement = null;
165 
166     /**
167      * Create a multicast socket and bind it to a specific pipe within specified 
168      * peer group
169      *
170      *@param  group            group context
171      *@param  pipeAd             PipeAdvertisement
172      *@exception  IOException  if an io error occurs
173      */
174     public JxtaMulticastSocket(PeerGroup group,
175                                PipeAdvertisement pipeAd) throws IOException {
176         super();
177         joinGroup(group, pipeAd);
178     }
179 
180     /**
181      * joins MutlicastSocket to specified pipe within the context of group
182      *
183      *@param  group            group context
184      *@param  pipeAd           PipeAdvertisement
185      *@exception  IOException  if an io error occurs
186      */
187     public void joinGroup(PeerGroup group, PipeAdvertisement pipeAd) throws IOException {
188 
189         if (pipeAd.getType() != null && !pipeAd.getType().equals(PipeService.PropagateType)) {
190             throw new IOException("Only propagate pipe advertisements are supported");
191         }
192         if (pipeAd.getPipeID() == null) {
193             throw new IOException("Invalid pipe advertisement");
194         }
195 
196         this.group = group;
197         this.pipeAdv = pipeAd;
198         pipeSvc = group.getPipeService();
199         this.in = pipeSvc.createInputPipe(pipeAd, this);
200         this.credentialDoc = getCredDoc(group);
201         outputPipe = pipeSvc.createOutputPipe(pipeAd, 1);
202         String id = group.getPeerID().getUniqueValue().toString();
203         srcElement = new StringMessageElement(SRCIDTAG, id, null);
204         if (LOG.isEnabledFor(Level.INFO)) {
205             LOG.info("Statring JxtaMulticastSocket on pipe id :"+pipeAdv.getID());
206         }
207         String pipeStr = pipeAd.getPipeID().getUniqueValue().toString();
208         localAddress = InetAddress.getByAddress(pipeStr, fauxip);
209         socketAddress = new InetSocketAddress(localAddress, 0);
210         bound = true;
211     }
212 
213     /**
214      * Obtain the credential doc from the group object
215      *
216      *@param  group  group context
217      *@return        The credDoc value
218      */
219     protected static StructuredDocument getCredDoc(PeerGroup group) {
220         try {
221             MembershipService membership = group.getMembershipService();
222             Enumeration each = membership.getCurrentCredentials();
223             if (each.hasMoreElements()) {
224                 // get the only credential "nobody"
225                 Credential credential = (Credential) each.nextElement();
226                 return credential.getDocument(MimeMediaType.XMLUTF8);
227             }
228         } catch (Exception e) {
229             if (LOG.isEnabledFor(Level.WARN)) {
230                 LOG.warn("failed to get credential", e);
231             }
232         }
233         return null;
234     }
235 
236     /**
237      * Returns the binding state of the MutlicastSocket.
238      *
239      * @return    true if the MutlicastSocket successfully bound to an address
240      */
241     public boolean isBound() {
242         return bound;
243     }
244 
245     /**
246      * Closes this MutlicastSocket.
247      *
248      *@exception  IOException  if an I/O error occurs when closing this
249      *      socket.
250      */
251     public synchronized void close() {
252         if (closed) {
253             return;
254         }
255         bound = false;
256         closed = true;
257         in.close();
258         outputPipe.close();
259         queue.close();
260         in = null;
261     }
262 
263     /**
264      * {@inheritDoc}
265      */
266     public void pipeMsgEvent(PipeMsgEvent event) {
267 
268         Message message = event.getMessage();
269         if (message == null) {
270             return;
271         }
272 
273         MessageElement element = null;
274         // does the message contain any data
275         element = (MessageElement)
276                   message.getMessageElement(NAMESPACE, DATATAG);
277         if (element == null) {
278             return;
279         }
280         try {
281             if (LOG.isEnabledFor(Level.DEBUG)) {
282                 LOG.debug("Pushing a message onto queue");
283             }
284             queue.push(message, -1);
285         } catch (InterruptedException e) {
286             if (LOG.isEnabledFor(Level.DEBUG)) {
287                 LOG.debug("Interrupted", e);
288             }
289         }
290     }
291 
292     /**
293      *  Gets the Timeout attribute of the JxtaMulticastSocket
294      *
295      * @return                  The soTimeout value
296      * @exception  IOException  if an I/O error occurs
297      */
298     public synchronized int getSoTimeout() {
299         return timeout;
300     }
301 
302     /**
303      *  Sets the Timeout attribute of the JxtaMulticastSocket
304      *  a timeout of 0 blocks forever, by default this Socket's
305      *  timeout is set to 0
306      *
307      * @param  timeout              The new soTimeout value
308      * @exception  IOException  if an I/O error occurs
309      */
310     public synchronized void setSoTimeout(int timeout)
311     throws SocketException {
312         checkState();
313         this.timeout = timeout;
314     }
315 
316     /**
317      * Returns the closed state of the JxtaMulticastSocket.
318      *
319      * @return    true if the socket has been closed
320      */
321     public synchronized boolean isClosed() {
322         return closed;
323     }
324 
325     /**
326      * Throws a SocketException if closed or not bound
327      */
328     private void checkState() throws SocketException {
329         if (isClosed()) {
330             throw new SocketException("MulticastSocket is closed");
331         } else
332             if (!isBound()) {
333                 throw new SocketException("MulticastSocket not bound");
334             }
335     }
336 
337     /**
338      * {@inheritDoc}
339      */
340     public void send(DatagramPacket packet) throws IOException {
341         checkState();
342         byte [] data = packet.getData();
343         InputStream bais = new ByteArrayInputStream(data);
344 
345         Message msg = new Message();
346         msg.addMessageElement(NAMESPACE,
347                               srcElement);
348         msg.addMessageElement(NAMESPACE,
349                               new InputStreamMessageElement(DATATAG,
350                                                             MimeMediaType.AOS,
351                                                             bais,
352                                                             null));
353         if (LOG.isEnabledFor(Level.DEBUG)) {
354             LOG.debug("Sending a data packet");
355         }
356         InetAddress address = packet.getAddress();
357         PeerID pid = null;
358         if (address != null) {
359             String pidStr = address.getHostName();
360             try {
361                 pid =(PeerID) IDFactory.fromURI(new URI(pidStr));
362             } catch (Exception ex) {
363                 //not a valid jxta peerid we multicast, otherwise unicast
364             }
365         }
366         if (pid != null) {
367             // Unicast datagram
368             // create a op pipe to the destination peer
369             OutputPipe op = pipeSvc.createOutputPipe(pipeAdv, Collections.singleton(pid), 1);
370             op.send(msg);
371             op.close();
372         } else {
373             // multicast
374             outputPipe.send(msg);
375         }
376     }
377 
378     /**
379      * {@inheritDoc}
380      */
381     public void receive(DatagramPacket packet) throws IOException {
382         checkState();
383         Message msg = null;
384         //data
385         MessageElement del = null;
386         //src
387         MessageElement sel = null;
388         try {
389             msg = (Message) queue.pop(timeout);
390             if(msg == null) {
391                 throw new SocketException("Socket timeout reached");
392             }
393             del = msg.getMessageElement(NAMESPACE, DATATAG);
394             sel = msg.getMessageElement(NAMESPACE, SRCIDTAG);
395             if (del == null || sel == null) {
396                 if (LOG.isEnabledFor(Level.DEBUG)) {
397                     LOG.debug("Message contains no data element, returning");
398                 }
399                 return;
400             } else {
401                 if (LOG.isEnabledFor(Level.DEBUG)) {
402                     LOG.debug("Popped a message off the queue");
403                 }
404             }
405         } catch (InterruptedException e) {
406             if (LOG.isEnabledFor(Level.DEBUG)) {
407                 LOG.debug("Exception occured", e);
408             }
409             throw new IOException(e.toString());
410         }
411         if (del.getByteLength() > packet.getLength()) {
412             throw new IOException("Datagram can not accomodate message of size :"+ del.getByteLength());
413         }
414         String addrStr = new String(sel.getBytes(false),
415                                     0,
416                                     (int) sel.getByteLength(),
417                                     "UTF8");
418         if (LOG.isEnabledFor(Level.DEBUG)) {
419             LOG.debug("Src Address :"+addrStr);
420         }
421         InetAddress address = InetAddress.getByAddress(addrStr, fauxip);
422         if (LOG.isEnabledFor(Level.DEBUG)) {
423             LOG.debug("Setting Data, and Src Address :"+address);
424         }
425         packet.setAddress(address);
426         packet.setData(del.getBytes(false));
427     }
428 
429     /**
430      * {@inheritDoc}
431      */
432     public InetAddress getLocalAddress() {
433         if (isClosed()) {
434             return null;
435         }
436         return localAddress;
437     }
438 
439     /**
440      * {@inheritDoc}
441      */
442     public SocketAddress getLocalSocketAddress() {
443         if (isClosed()) {
444             return null;
445         }
446         return socketAddress;
447     }
448 
449     /**
450      * {@inheritDoc}
451      */
452     public void bind(SocketAddress addr) throws SocketException {
453         if (isBound()) {
454             throw new SocketException("Already bound");
455         }
456     }
457 }
458