Docjar: A Java Source and Docuemnt Enginecom.*    java.*    javax.*    org.*    all    new    plug-in

Quick Search    Search Deep

Source code: com/act365/net/tcp/TCPJSocketImpl.java


1   /*
2     * JSocket Wrench
3     * 
4     * Copyright (C) act365.com October 2003
5     * 
6     * Web site: http://www.act365.com/wrench
7     * E-mail: developers@act365.com
8     * 
9     * The JSocket Wrench library adds support for low-level Internet protocols
10    * to the Java programming language.
11    * 
12    * This program is free software; you can redistribute it and/or modify it 
13    * under the terms of the GNU General Public License as published by the Free 
14    * Software Foundation; either version 2 of the License, or (at your option) 
15    * any later version.
16    *  
17    * This program is distributed in the hope that it will be useful, 
18    * but WITHOUT ANY WARRANTY; without even the implied warranty of 
19    * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General 
20    * Public License for more details.
21    * 
22    * You should have received a copy of the GNU General Public License along with 
23    * this program; if not, write to the Free Software Foundation, Inc., 
24    * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
25    */
26  
27  package com.act365.net.tcp ;
28  
29  import com.act365.net.*;
30  import com.act365.net.ip.*;
31  
32  import java.beans.* ;
33  import java.io.* ;
34  import java.net.* ;
35  import java.util.*;
36  
37  /**
38   * Implements the TCPJ protocol, which is a clone of TCP that would typically be
39   * used with a non-standard IP protocol setting.
40   */
41  
42  class TCPJSocketImpl extends SocketImpl implements PropertyChangeListener {
43    
44    final static long msltimeout = 10000 ;
45  
46    final static int transmissionlimit = 3 ,
47                     maxwindowsize = 32767 ,
48                     minEphemeralPort = 1024 ,
49                     maxEphemeralPort = 5000 ,
50                     protocol = SocketUtils.getProtocol();
51  
52    final static boolean debug = false ,
53                         closeserver = true ,
54                         includeipheader = SocketUtils.includeHeader() ,
55                         implementackdelay = true ,
56                         modelpacketloss = false ;
57  
58    final static double alpha = 0.9 ,
59                        beta = 2.0 ,
60                        packetloss = 0.1 ;
61  
62    static int nextEphemeralPort = 1024 ;
63   
64    DatagramSocket socket ;
65  
66    InetAddress localhost ;
67  
68    int state ,
69        previousstate ,
70        localseqnum ,
71        destseqnum ,
72        acknowledgedseqnum ,
73        windowsize ,
74        destwindowsize ,
75        readoffset ,
76        readcount ;
77    
78    long rto ,
79         rtt ,
80         sendtime ,
81         receivetime ;
82  
83    byte[] readbuffer ;
84  
85    TCPAcknowledger acknowledger ;
86  
87    TCPMSLTimer msltimer ;
88  
89    Random random ;
90  
91    /**
92     * Default constructor
93     * @throws SocketException if the underlying <code>DatagramSocket</code> object cannot be opened
94     */
95    
96    public TCPJSocketImpl() throws SocketException {
97  
98      resetSocket();
99   
100     socket = new DatagramSocket();
101 
102     acknowledger = new TCPAcknowledger( this , 200 );
103  
104     msltimer = new TCPMSLTimer( this , msltimeout );
105 
106     TCPJListener.getInstance().addPropertyChangeListener( this );
107 
108     if( modelpacketloss ){
109       random = new Random();
110     }
111   }
112   
113   /** 
114    Sends a TCP message to the destination but doesn't await acknowledgement. 
115    Checks whether the receiver is ready to receive.
116   */ 
117 
118   void send( int flags , TCPOptions options , byte[] buffer , int offset , int count , boolean retransmission ) throws IOException {
119 
120     int sendsize ;
121 
122     if( ( flags & TCP.PSH ) > 0 ){
123       sendsize = localseqnum - acknowledgedseqnum ;
124       while( count > 0 ){
125         while( sendsize >= destwindowsize ){
126           try {
127             wait(); // Persist timer
128           } catch( InterruptedException ie ) {
129           }
130         }
131         sendsize = Math.min( count , acknowledgedseqnum + destwindowsize - localseqnum );
132         transmit( flags , options , buffer , offset , sendsize , retransmission );
133         offset += sendsize ;
134         count -= sendsize ; 
135       } 
136     } else {
137       transmit( flags , options , buffer , offset , count , retransmission );
138     }
139   }
140 
141   /**
142    Updates the Retransmission Timeout Value (RTO).
143   */
144 
145   void updateRTO(){
146   
147     if( sendtime > 0 && receivetime > 0 ){
148       rtt = (long)( alpha * rtt + ( 1 - alpha )*( receivetime - sendtime ) );
149       rto = (long)( beta * rtt );
150       sendtime = 0 ;
151       receivetime = 0 ;
152 
153       if( debug ){
154         System.err.println("Updated rto: " + rto );
155       }
156     }
157   }
158 
159   /**
160    Sends a message and awaits acknowledgement. Updates the estimate 
161    of the connection round-trip time.
162   */
163    
164   synchronized void sendAndAwaitACK( int flags , TCPOptions options , byte[] buffer , int offset , int count ) throws IOException {
165 
166     int sendsize = 1 , 
167         counter = 1 ;
168 
169     long delay = rto ;
170 
171     send( flags , options , buffer , offset , count , false );
172 
173     try {
174       wait( delay );
175       delay *= 2 ;
176       sendsize = acknowledgedseqnum == 0 ? 0 : localseqnum - acknowledgedseqnum ;
177       while( ( acknowledgedseqnum == 0 || sendsize > 0  ) && counter ++ < transmissionlimit ){
178         send( flags , options , buffer , Math.max( offset - sendsize , 0 ) , Math.min( sendsize , buffer.length ) , true );
179         wait( delay );
180         delay *= 2 ;
181         sendsize = acknowledgedseqnum == 0 ? 0 : localseqnum - acknowledgedseqnum ;
182       } 
183     } catch ( Exception e ) {
184       System.err.println("Exception: " + e.getMessage() );
185     }
186   
187     if( sendsize > 0 ){
188       throw new IOException("Connection reset");
189     }
190 
191     updateRTO();
192   }
193 
194   /**
195    Sends a message with no data but doesn't await acknowledgement.
196   */
197 
198   void send( int flags ) throws IOException {
199     send( flags , new TCPOptions() , new byte[0] , 0 , 0 , false );
200   }
201 
202   /**
203    Sends a message with no data and awaits acknowledgement.
204   */
205 
206   void sendAndAwaitACK( int flags ) throws IOException {
207     sendAndAwaitACK( flags , new TCPOptions() , new byte[0] , 0 , 0 );
208   }
209 
210   /**
211    Sends a message with no data but some options and awaits acknowledgement.
212   */
213 
214   void sendAndAwaitACK( int flags , TCPOptions options ) throws IOException {
215     sendAndAwaitACK( flags , options , new byte[0] , 0 , 0 );
216   }
217 
218   /** 
219    Transmits a TCP message to the destination. No check is made beforehand
220    to see whether the receiver is ready to receive.
221   */ 
222 
223   void transmit( int flags , TCPOptions options , byte[] buffer , int offset , int count , boolean retransmit ) throws IOException {
224 
225     if( localhost == null ){
226       throw new IOException("Local address not yet set");
227     } else if( address == null ){
228       throw new IOException("Destination address not yet set");
229     } else if( port == 0 ){
230       throw new IOException("Destination port not yet set");
231     }
232 
233     if( localport == 0 ){
234       localport = nextEphemeralPort ++ ;
235       if( nextEphemeralPort == maxEphemeralPort ){
236         nextEphemeralPort = minEphemeralPort ;
237       }
238     } 
239 
240     if( acknowledger.isAlive() ){
241       acknowledger.interrupt();
242       flags |= TCP.ACK ;
243     } else if( destseqnum != 0 ) {
244       flags |= TCP.ACK ;
245     }
246 
247     if( retransmit ){
248       if( ( flags & TCP.SYN ) > 0 || ( flags & TCP.FIN ) > 0 ){
249         -- localseqnum ;
250       } else if( ( flags & TCP.PSH ) > 0 ) {
251         localseqnum -= count ;
252       }
253       sendtime = 0 ;
254     } else {
255       sendtime = new Date().getTime();
256     }
257 
258     byte[] sendbuffer = TCPWriter.write( localhost.getAddress() ,
259                                          (short) localport ,
260                                          address.getAddress() ,
261                                          (short) port ,
262                                          localseqnum ,
263                                          destseqnum ,
264                                          ( flags & TCP.ACK ) > 0 ,
265                                          ( flags & TCP.RST ) > 0 ,
266                                          ( flags & TCP.SYN ) > 0 ,
267                                          ( flags & TCP.FIN ) > 0 ,
268                                          ( flags & TCP.PSH ) > 0 ,
269                                          (short) windowsize ,
270                                          options ,
271                                          buffer ,  
272                                          offset ,
273                                          count  );
274 
275     if( includeipheader ){
276 
277       sendbuffer = IP4Writer.write( IP4.TOS_COMMAND ,
278                                     (short) 255 ,
279                                     (byte) protocol ,
280                                     localhost.getAddress() ,
281                                     address.getAddress() , 
282                                     sendbuffer );
283     }
284 
285     if( debug ){
286       System.err.println("SEND:");
287       SocketUtils.dump( System.err , sendbuffer , 0 , sendbuffer.length );
288     }
289 
290     if( ( flags & TCP.SYN ) > 0 || ( flags & TCP.FIN ) > 0 ){
291       ++ localseqnum ;
292     } else if( ( flags & TCP.PSH ) > 0 ) {
293       localseqnum += count ;
294     }
295 
296     socket.send( new DatagramPacket( sendbuffer , sendbuffer.length , address , port ) );
297   }
298 
299   /**
300    * Acknowledges a received TCP message.
301    */
302   
303   public void acknowledge() throws IOException {
304 
305     if( ! implementackdelay ){
306       send( TCP.ACK );
307     } else if( acknowledger.isAlive() ) {
308       acknowledger.interrupt();
309       send( TCP.ACK );
310     } else {
311       acknowledger.start();
312     }
313   }
314 
315   /**
316    * Resets a socket to its default values.
317    */
318   
319   void resetSocket(){
320 
321     address = null ;
322     port = 0 ;
323     
324     try {
325       localport = 0 ;
326       if( localhost == null ){
327         localhost = InetAddress.getLocalHost();
328       }
329     } catch ( UnknownHostException uhe ) {
330       localhost = null ;
331     }
332      
333     state = previousstate = TCP.CLOSED ; 
334     localseqnum = ISNCounter.getCounter();
335     destseqnum = 0 ;
336     acknowledgedseqnum = 0 ;
337     windowsize = maxwindowsize ;
338     destwindowsize = 0 ;
339 
340     readbuffer = new byte[ maxwindowsize ];
341     readoffset = 0 ;
342     readcount  = 0 ;
343  
344     rtt = 1000 ;
345     rto = (long)( beta * rtt );
346     sendtime = 0 ;
347     receivetime = 0 ;
348   }
349 
350   /**
351    * Handles an received TCPMessage
352    * @throws IOException message is illegal
353    */
354 
355   synchronized void receive( TCPMessage message ) throws IOException {
356 
357     if( modelpacketloss && random.nextFloat() < packetloss ){
358       return ;
359     }
360 
361     receivetime = new Date().getTime();
362 
363     if( message.ack ){
364       acknowledgedseqnum = message.acknowledgementnumber ;
365     }
366 
367     if( message.syn || message.fin ){
368       destseqnum = message.sequencenumber + 1 ;
369     } else if( message.psh ) {
370       destseqnum = message.sequencenumber + message.data.length ;
371     }
372       
373     destwindowsize = message.windowsize ;
374 
375     switch( state ){
376 
377     case TCP.CLOSED:
378       break;
379     case TCP.LISTEN:
380       if( message.syn ){
381         send( TCP.SYN | TCP.ACK );
382         previousstate = state ;
383         state = TCP.SYN_RCVD ;
384         notifyAll();
385       } 
386       return ;
387     case TCP.SYN_RCVD:
388       if( message.ack ){
389         previousstate = state ;
390         state = TCP.ESTABLISHED ;
391         notifyAll();
392         return;
393       } else if( message.rst && previousstate == TCP.LISTEN ) {
394         previousstate = state ;
395         state = TCP.LISTEN ;
396         notifyAll();
397         return ;
398       }
399       break;
400     case TCP.SYN_SENT:
401       if( message.syn && message.ack ){
402         send( TCP.ACK );
403         previousstate = state ;
404         state = TCP.ESTABLISHED ;
405         notifyAll();
406         return ;
407       } else if( message.syn ){
408         send( TCP.SYN | TCP.ACK );
409         previousstate = state ;
410         state = TCP.SYN_RCVD ;
411         notifyAll();
412         return;
413       }
414       break;
415     case TCP.ESTABLISHED:
416       if( message.fin ){
417         if( closeserver ){
418           send( TCP.FIN | TCP.ACK );
419           previousstate = state ;
420           state = TCP.LAST_ACK ;
421           notifyAll();
422           return;
423         } else {
424           send( TCP.ACK );
425           previousstate = state ;
426           state = TCP.CLOSE_WAIT ;
427           notifyAll();
428           return ;
429         }
430       } else if( message.psh ){
431         {
432           int i = - 1 ;
433           while( ++ i < message.data.length ){
434             readbuffer[( readoffset + readcount + i ) % maxwindowsize ] = message.data[ i ];
435           }
436         }
437         readcount += message.data.length ;
438         windowsize -= message.data.length ;
439         acknowledge();
440         notifyAll();
441         return ;
442       } else if( message.ack ){
443         notifyAll();
444         return ;
445       }
446       break;
447     case TCP.CLOSE_WAIT :
448       return;
449     case TCP.FIN_WAIT_1 :
450       if( message.fin && message.ack ){
451         send( TCP.ACK );
452         timeWait();
453         return ;
454       } else if( message.fin ){
455         send( TCP.ACK );
456         previousstate = state ;
457         state = TCP.CLOSING ;
458         notifyAll();
459         return ;
460       } else if( message.ack ){
461         previousstate = state ;
462         state = TCP.FIN_WAIT_2 ;
463         notifyAll();
464         return ;
465       }
466       break;
467     case TCP.CLOSING :
468       if( message.ack ){
469         timeWait();
470         return;
471       }
472       break;
473     case TCP.LAST_ACK :
474       if( message.ack ){
475         resetSocket();
476         notifyAll();
477         return;
478       }
479       break;
480     case TCP.FIN_WAIT_2 :
481       if( message.fin ){
482         send( TCP.ACK );
483         timeWait();
484         return ;
485       }
486       break;
487     case TCP.TIME_WAIT :
488       if( message.fin ){
489         send( TCP.ACK );
490         notifyAll();
491         return ;
492       }
493       break;
494     }
495   
496     if( ! message.rst ){
497       System.err.println("RST from state: " + state );
498       send( TCP.RST );
499     }
500 
501     resetSocket();
502 
503     throw new IOException("Connection reset by peer");
504   }
505   
506   void activeOpen( InetAddress address , short port ) throws IOException {
507 
508     if( state != TCP.CLOSED ){
509       throw new IOException("Active Open only permitted from CLOSED state");
510     }
511 
512     this.address = address ;
513     this.port = port ;
514 
515     localseqnum = ISNCounter.getCounter();
516     destseqnum = 0 ;
517 
518     TCPOptions options = new TCPOptions();
519 
520     options.setMaxSegmentSize( (short) 1440 );
521 
522     state = TCP.SYN_SENT ; 
523   }
524 
525   void passiveOpen() throws IOException {
526    
527     if( state != TCP.CLOSED ){
528       throw new IOException("Passive Open only permitted from CLOSED state");
529     }
530 
531     state = TCP.LISTEN ;
532   }
533 
534   synchronized void tcpjClose() throws IOException {
535 
536     switch( state ){
537 
538       case TCP.ESTABLISHED:
539       case TCP.SYN_RCVD:
540         state = TCP.FIN_WAIT_1 ;
541         sendAndAwaitACK( TCP.FIN );
542         break;
543       case TCP.SYN_SENT:
544         resetSocket();
545         break;  
546       case TCP.CLOSE_WAIT:
547         state = TCP.LAST_ACK ;
548         sendAndAwaitACK( TCP.FIN );
549         break;
550       default:
551         // Will reach here if an exception has been thrown during socket set-up
552     }
553 
554     while( state != TCP.CLOSED ){
555       try {
556         wait();
557       } catch( InterruptedException ie ){
558       }
559     }
560 
561     msltimer.interrupt();
562 
563     if( debug ){
564       System.err.println("Connection closed");
565     }
566 
567     resetSocket();
568   }
569       
570   synchronized void setState( int state ){
571     this.state = state ;
572     notifyAll();
573   }
574 
575   synchronized void timeWait() throws IOException {
576     
577     state = TCP.TIME_WAIT ; 
578 
579     msltimer.start();
580   }
581 
582   /**
583    * Writes a buffer and awaits acknowledgement.
584    */
585 
586   public synchronized void write( byte[] buffer , int offset , int count ) throws IOException {
587 
588     if( state != TCP.ESTABLISHED ){
589       throw new IOException("Write not possible from current state");
590     }
591 
592     sendAndAwaitACK( TCP.PSH , new TCPOptions() , buffer , offset , count );
593   }
594 
595   /**
596    * Reads a buffer.
597    */
598   
599   public synchronized int read( byte[] buffer , int offset , int count ) throws IOException {
600 
601     if( state != TCP.ESTABLISHED ){
602       return 0 ;
603     }
604 
605     final int initialcount = count ,
606               initialoffset = offset ;
607 
608     while( count > 0 ){
609       while( readcount == 0 ){
610         try {
611           wait();
612         } catch( InterruptedException ie ) {
613         }
614       }
615       windowsize += Math.min( count , readcount );
616       while( count > 0 && readcount > 0 ){
617         buffer[ offset ] = readbuffer[ readoffset % maxwindowsize ];
618         ++ offset ;
619         ++ readoffset ;
620         -- readcount ;
621         -- count ;
622       }
623     }
624         
625     return initialcount ;
626   }
627 
628   /**
629    * Called by <code>TCPJListener</code> if a message is received.
630    * @see TCPJListener
631    */
632   
633   public void propertyChange( PropertyChangeEvent evt ){
634 
635     IP4Message ipmessage = (IP4Message) evt.getOldValue();
636 
637     if( address instanceof InetAddress ){
638 
639         byte[] destinationaddress = address.getAddress();
640 
641         if( destinationaddress[ 0 ] != ipmessage.source[ 0 ] || 
642             destinationaddress[ 1 ] != ipmessage.source[ 1 ] ||
643             destinationaddress[ 2 ] != ipmessage.source[ 2 ] ||
644             destinationaddress[ 3 ] != ipmessage.source[ 3 ] ){
645             return ;
646         }
647     }
648 
649     if( localhost instanceof InetAddress ){
650  
651         byte[] localaddress = localhost.getAddress();
652 
653         if( localaddress[ 0 ] != ipmessage.destination[ 0 ] ||
654             localaddress[ 1 ] != ipmessage.destination[ 1 ] ||
655             localaddress[ 2 ] != ipmessage.destination[ 2 ] ||
656             localaddress[ 3 ] != ipmessage.destination[ 3 ] ){
657             return ;
658         }
659     }
660 
661     TCPMessage tcpmessage = (TCPMessage) evt.getNewValue();
662 
663     if( port != 0 && port != tcpmessage.sourceport || localport != tcpmessage.destinationport ){
664       return ;
665     }
666 
667     if( debug ){
668       System.err.println("RECEIVE:");
669       SocketUtils.dump( System.err , ipmessage.data , 0 , ipmessage.data.length );
670     }
671 
672     try {
673       if( address == null ){
674         address = InetAddress.getByName( ipmessage.source[ 0 ] + "." +
675                                          ipmessage.source[ 1 ] + "." +
676                                          ipmessage.source[ 2 ] + "." +
677                                          ipmessage.source[ 3 ] );
678       }
679       if( port == 0 ){
680         port = tcpmessage.sourceport ;
681       }
682       receive( tcpmessage );
683     } catch( Exception e ) {
684     }
685   }
686 
687   /**
688    * Creates a new TCPJ socket.
689    */
690   
691   public void create( boolean stream ) throws IOException {
692 
693     if( ! stream ){
694       throw new IOException("TCP/J does not support datagram sockets");
695     }
696 
697     resetSocket();
698   }
699 
700   /**
701    * Binds a socket to the given address and port.
702    */
703   
704   public void bind( InetAddress inetAddress , int port ) throws IOException {
705 
706     localhost = inetAddress ;
707 
708     if( port != (short) port ){
709       throw new IOException("Illegal port number");
710     } else if( port == 0 ) {
711       throw new IOException("Use of port 0 is prohibited");
712     } else if( port >= minEphemeralPort && port <= maxEphemeralPort ) {
713       throw new IOException("Ports " + minEphemeralPort + " to " + maxEphemeralPort + " are reserved for ephemeral use");
714     } else {
715       localport = (short) port ;
716     }
717   }
718 
719   /**
720    * Connects to a remote address.
721    */
722   
723   public void connect( String hostname , int port ) throws IOException {
724 
725     connect( InetAddress.getByName( hostname ) , port );
726   }
727 
728   /**
729    * Connects to a socket address.
730    */
731   
732   public void connect( SocketAddress address , int timeout ) throws IOException {
733     throw new IOException("SocketAddress not supported");
734   }
735   
736   /**
737    * Connects to a remote address.
738    */
739   
740   public synchronized void connect( InetAddress dst , int remotePort ) throws IOException {
741 
742     if( remotePort != (short) remotePort ){
743       throw new IOException("Illegal port number");
744     }
745 
746     TCPOptions options = new TCPOptions();
747 
748     options.setMaxSegmentSize( (short) 1440 );
749 
750     activeOpen( dst , (short) remotePort );
751 
752     sendAndAwaitACK( TCP.SYN , options );
753 
754     while( state != TCP.ESTABLISHED ){
755       try {
756         wait();
757       } catch( InterruptedException ie ) {
758       }
759     }
760   }
761 
762   /**
763    * Accepts a server connection.
764    */
765   
766   public synchronized void accept( SocketImpl newSocket ) {
767 
768     while( state != TCP.ESTABLISHED ){
769       try {
770         wait();
771       } catch ( InterruptedException ie ) {
772       }
773     }
774 
775     TCPJSocketImpl tcpjsocket = (TCPJSocketImpl) newSocket ;
776 
777     tcpjsocket.resetSocket();
778 
779     tcpjsocket.localport = localport ;
780     tcpjsocket.port = port ;
781     tcpjsocket.state = state ;
782     tcpjsocket.previousstate = previousstate ;
783     tcpjsocket.localseqnum = localseqnum ;
784     tcpjsocket.destseqnum = destseqnum ;
785     tcpjsocket.acknowledgedseqnum = acknowledgedseqnum ;
786     tcpjsocket.windowsize = windowsize ;
787     tcpjsocket.destwindowsize = destwindowsize ;
788 
789     if( address instanceof InetAddress ){
790       try {
791         tcpjsocket.address = InetAddress.getByName( address.getHostName() );
792       } catch( UnknownHostException e ){
793         tcpjsocket.address = null ;
794       }
795     } else {
796       tcpjsocket.address = null ;
797     }
798 
799     if( localhost instanceof InetAddress ){
800       try {
801         tcpjsocket.localhost = InetAddress.getByName( localhost.getHostName() );
802       } catch( UnknownHostException e ){
803         tcpjsocket.localhost = null ;
804       }
805     } else {
806       tcpjsocket.localhost = null ;
807     }
808 
809     port  = 0 ;
810     address = null ;
811     state = TCP.LISTEN ;
812   }
813 
814   /**
815    * Closes a connection.
816    */
817   
818   public void close() throws IOException {
819 
820     tcpjClose();
821   }
822 
823   /**
824    * Sets a server to listen for a client connection. 
825    * Note that the backlog argument is ignored - currently,
826    * TCPJ cannot stack connection requests.
827    */
828   
829   public void listen( int backlog ) throws IOException {
830 
831 /*
832     if( backlog != 1 ){
833       throw new IOException("TCP/J only stacks a single connection request");
834     }
835 */
836     passiveOpen();
837   }
838 
839   /**
840    * Sets a TCP option.
841    */
842   
843   public void setOption( int optID , Object value ) throws SocketException {
844     throw new SocketException("No options supported");
845   }
846 
847   /**
848    * Gets a TCP option.
849    */
850   
851   public Object getOption( int optID ) throws SocketException {
852     throw new SocketException("No options supported");
853   }
854 
855   /**
856    * Returns the number of bytes available to be read without polling.
857    */
858   
859   public int available() {
860     return readcount ;
861   }
862 
863   /**
864    * Gets an input stream to read.
865    */
866   
867   public InputStream getInputStream() throws IOException {
868     return new TCPJInputStream( this );
869   }
870 
871   /**
872    * Gets an output stream to write.
873    */
874   
875   public OutputStream getOutputStream() throws IOException {
876     return new TCPJOutputStream( this );
877   }
878   
879   /**
880    * Urgent data is not supported.
881    */
882   
883   public void sendUrgentData(int data) throws IOException {
884   throw new IOException("Urgent data not supported");
885   }  
886 }