Source code: com/act365/net/tcp/TCPJSocketImpl.java
1 /*
2 * JSocket Wrench
3 *
4 * Copyright (C) act365.com October 2003
5 *
6 * Web site: http://www.act365.com/wrench
7 * E-mail: developers@act365.com
8 *
9 * The JSocket Wrench library adds support for low-level Internet protocols
10 * to the Java programming language.
11 *
12 * This program is free software; you can redistribute it and/or modify it
13 * under the terms of the GNU General Public License as published by the Free
14 * Software Foundation; either version 2 of the License, or (at your option)
15 * any later version.
16 *
17 * This program is distributed in the hope that it will be useful,
18 * but WITHOUT ANY WARRANTY; without even the implied warranty of
19 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
20 * Public License for more details.
21 *
22 * You should have received a copy of the GNU General Public License along with
23 * this program; if not, write to the Free Software Foundation, Inc.,
24 * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
25 */
26
27 package com.act365.net.tcp ;
28
29 import com.act365.net.*;
30 import com.act365.net.ip.*;
31
32 import java.beans.* ;
33 import java.io.* ;
34 import java.net.* ;
35 import java.util.*;
36
37 /**
38 * Implements the TCPJ protocol, which is a clone of TCP that would typically be
39 * used with a non-standard IP protocol setting.
40 */
41
42 class TCPJSocketImpl extends SocketImpl implements PropertyChangeListener {
43
44 final static long msltimeout = 10000 ;
45
46 final static int transmissionlimit = 3 ,
47 maxwindowsize = 32767 ,
48 minEphemeralPort = 1024 ,
49 maxEphemeralPort = 5000 ,
50 protocol = SocketUtils.getProtocol();
51
52 final static boolean debug = false ,
53 closeserver = true ,
54 includeipheader = SocketUtils.includeHeader() ,
55 implementackdelay = true ,
56 modelpacketloss = false ;
57
58 final static double alpha = 0.9 ,
59 beta = 2.0 ,
60 packetloss = 0.1 ;
61
62 static int nextEphemeralPort = 1024 ;
63
64 DatagramSocket socket ;
65
66 InetAddress localhost ;
67
68 int state ,
69 previousstate ,
70 localseqnum ,
71 destseqnum ,
72 acknowledgedseqnum ,
73 windowsize ,
74 destwindowsize ,
75 readoffset ,
76 readcount ;
77
78 long rto ,
79 rtt ,
80 sendtime ,
81 receivetime ;
82
83 byte[] readbuffer ;
84
85 TCPAcknowledger acknowledger ;
86
87 TCPMSLTimer msltimer ;
88
89 Random random ;
90
91 /**
92 * Default constructor
93 * @throws SocketException if the underlying <code>DatagramSocket</code> object cannot be opened
94 */
95
96 public TCPJSocketImpl() throws SocketException {
97
98 resetSocket();
99
100 socket = new DatagramSocket();
101
102 acknowledger = new TCPAcknowledger( this , 200 );
103
104 msltimer = new TCPMSLTimer( this , msltimeout );
105
106 TCPJListener.getInstance().addPropertyChangeListener( this );
107
108 if( modelpacketloss ){
109 random = new Random();
110 }
111 }
112
113 /**
114 Sends a TCP message to the destination but doesn't await acknowledgement.
115 Checks whether the receiver is ready to receive.
116 */
117
118 void send( int flags , TCPOptions options , byte[] buffer , int offset , int count , boolean retransmission ) throws IOException {
119
120 int sendsize ;
121
122 if( ( flags & TCP.PSH ) > 0 ){
123 sendsize = localseqnum - acknowledgedseqnum ;
124 while( count > 0 ){
125 while( sendsize >= destwindowsize ){
126 try {
127 wait(); // Persist timer
128 } catch( InterruptedException ie ) {
129 }
130 }
131 sendsize = Math.min( count , acknowledgedseqnum + destwindowsize - localseqnum );
132 transmit( flags , options , buffer , offset , sendsize , retransmission );
133 offset += sendsize ;
134 count -= sendsize ;
135 }
136 } else {
137 transmit( flags , options , buffer , offset , count , retransmission );
138 }
139 }
140
141 /**
142 Updates the Retransmission Timeout Value (RTO).
143 */
144
145 void updateRTO(){
146
147 if( sendtime > 0 && receivetime > 0 ){
148 rtt = (long)( alpha * rtt + ( 1 - alpha )*( receivetime - sendtime ) );
149 rto = (long)( beta * rtt );
150 sendtime = 0 ;
151 receivetime = 0 ;
152
153 if( debug ){
154 System.err.println("Updated rto: " + rto );
155 }
156 }
157 }
158
159 /**
160 Sends a message and awaits acknowledgement. Updates the estimate
161 of the connection round-trip time.
162 */
163
164 synchronized void sendAndAwaitACK( int flags , TCPOptions options , byte[] buffer , int offset , int count ) throws IOException {
165
166 int sendsize = 1 ,
167 counter = 1 ;
168
169 long delay = rto ;
170
171 send( flags , options , buffer , offset , count , false );
172
173 try {
174 wait( delay );
175 delay *= 2 ;
176 sendsize = acknowledgedseqnum == 0 ? 0 : localseqnum - acknowledgedseqnum ;
177 while( ( acknowledgedseqnum == 0 || sendsize > 0 ) && counter ++ < transmissionlimit ){
178 send( flags , options , buffer , Math.max( offset - sendsize , 0 ) , Math.min( sendsize , buffer.length ) , true );
179 wait( delay );
180 delay *= 2 ;
181 sendsize = acknowledgedseqnum == 0 ? 0 : localseqnum - acknowledgedseqnum ;
182 }
183 } catch ( Exception e ) {
184 System.err.println("Exception: " + e.getMessage() );
185 }
186
187 if( sendsize > 0 ){
188 throw new IOException("Connection reset");
189 }
190
191 updateRTO();
192 }
193
194 /**
195 Sends a message with no data but doesn't await acknowledgement.
196 */
197
198 void send( int flags ) throws IOException {
199 send( flags , new TCPOptions() , new byte[0] , 0 , 0 , false );
200 }
201
202 /**
203 Sends a message with no data and awaits acknowledgement.
204 */
205
206 void sendAndAwaitACK( int flags ) throws IOException {
207 sendAndAwaitACK( flags , new TCPOptions() , new byte[0] , 0 , 0 );
208 }
209
210 /**
211 Sends a message with no data but some options and awaits acknowledgement.
212 */
213
214 void sendAndAwaitACK( int flags , TCPOptions options ) throws IOException {
215 sendAndAwaitACK( flags , options , new byte[0] , 0 , 0 );
216 }
217
218 /**
219 Transmits a TCP message to the destination. No check is made beforehand
220 to see whether the receiver is ready to receive.
221 */
222
223 void transmit( int flags , TCPOptions options , byte[] buffer , int offset , int count , boolean retransmit ) throws IOException {
224
225 if( localhost == null ){
226 throw new IOException("Local address not yet set");
227 } else if( address == null ){
228 throw new IOException("Destination address not yet set");
229 } else if( port == 0 ){
230 throw new IOException("Destination port not yet set");
231 }
232
233 if( localport == 0 ){
234 localport = nextEphemeralPort ++ ;
235 if( nextEphemeralPort == maxEphemeralPort ){
236 nextEphemeralPort = minEphemeralPort ;
237 }
238 }
239
240 if( acknowledger.isAlive() ){
241 acknowledger.interrupt();
242 flags |= TCP.ACK ;
243 } else if( destseqnum != 0 ) {
244 flags |= TCP.ACK ;
245 }
246
247 if( retransmit ){
248 if( ( flags & TCP.SYN ) > 0 || ( flags & TCP.FIN ) > 0 ){
249 -- localseqnum ;
250 } else if( ( flags & TCP.PSH ) > 0 ) {
251 localseqnum -= count ;
252 }
253 sendtime = 0 ;
254 } else {
255 sendtime = new Date().getTime();
256 }
257
258 byte[] sendbuffer = TCPWriter.write( localhost.getAddress() ,
259 (short) localport ,
260 address.getAddress() ,
261 (short) port ,
262 localseqnum ,
263 destseqnum ,
264 ( flags & TCP.ACK ) > 0 ,
265 ( flags & TCP.RST ) > 0 ,
266 ( flags & TCP.SYN ) > 0 ,
267 ( flags & TCP.FIN ) > 0 ,
268 ( flags & TCP.PSH ) > 0 ,
269 (short) windowsize ,
270 options ,
271 buffer ,
272 offset ,
273 count );
274
275 if( includeipheader ){
276
277 sendbuffer = IP4Writer.write( IP4.TOS_COMMAND ,
278 (short) 255 ,
279 (byte) protocol ,
280 localhost.getAddress() ,
281 address.getAddress() ,
282 sendbuffer );
283 }
284
285 if( debug ){
286 System.err.println("SEND:");
287 SocketUtils.dump( System.err , sendbuffer , 0 , sendbuffer.length );
288 }
289
290 if( ( flags & TCP.SYN ) > 0 || ( flags & TCP.FIN ) > 0 ){
291 ++ localseqnum ;
292 } else if( ( flags & TCP.PSH ) > 0 ) {
293 localseqnum += count ;
294 }
295
296 socket.send( new DatagramPacket( sendbuffer , sendbuffer.length , address , port ) );
297 }
298
299 /**
300 * Acknowledges a received TCP message.
301 */
302
303 public void acknowledge() throws IOException {
304
305 if( ! implementackdelay ){
306 send( TCP.ACK );
307 } else if( acknowledger.isAlive() ) {
308 acknowledger.interrupt();
309 send( TCP.ACK );
310 } else {
311 acknowledger.start();
312 }
313 }
314
315 /**
316 * Resets a socket to its default values.
317 */
318
319 void resetSocket(){
320
321 address = null ;
322 port = 0 ;
323
324 try {
325 localport = 0 ;
326 if( localhost == null ){
327 localhost = InetAddress.getLocalHost();
328 }
329 } catch ( UnknownHostException uhe ) {
330 localhost = null ;
331 }
332
333 state = previousstate = TCP.CLOSED ;
334 localseqnum = ISNCounter.getCounter();
335 destseqnum = 0 ;
336 acknowledgedseqnum = 0 ;
337 windowsize = maxwindowsize ;
338 destwindowsize = 0 ;
339
340 readbuffer = new byte[ maxwindowsize ];
341 readoffset = 0 ;
342 readcount = 0 ;
343
344 rtt = 1000 ;
345 rto = (long)( beta * rtt );
346 sendtime = 0 ;
347 receivetime = 0 ;
348 }
349
350 /**
351 * Handles an received TCPMessage
352 * @throws IOException message is illegal
353 */
354
355 synchronized void receive( TCPMessage message ) throws IOException {
356
357 if( modelpacketloss && random.nextFloat() < packetloss ){
358 return ;
359 }
360
361 receivetime = new Date().getTime();
362
363 if( message.ack ){
364 acknowledgedseqnum = message.acknowledgementnumber ;
365 }
366
367 if( message.syn || message.fin ){
368 destseqnum = message.sequencenumber + 1 ;
369 } else if( message.psh ) {
370 destseqnum = message.sequencenumber + message.data.length ;
371 }
372
373 destwindowsize = message.windowsize ;
374
375 switch( state ){
376
377 case TCP.CLOSED:
378 break;
379 case TCP.LISTEN:
380 if( message.syn ){
381 send( TCP.SYN | TCP.ACK );
382 previousstate = state ;
383 state = TCP.SYN_RCVD ;
384 notifyAll();
385 }
386 return ;
387 case TCP.SYN_RCVD:
388 if( message.ack ){
389 previousstate = state ;
390 state = TCP.ESTABLISHED ;
391 notifyAll();
392 return;
393 } else if( message.rst && previousstate == TCP.LISTEN ) {
394 previousstate = state ;
395 state = TCP.LISTEN ;
396 notifyAll();
397 return ;
398 }
399 break;
400 case TCP.SYN_SENT:
401 if( message.syn && message.ack ){
402 send( TCP.ACK );
403 previousstate = state ;
404 state = TCP.ESTABLISHED ;
405 notifyAll();
406 return ;
407 } else if( message.syn ){
408 send( TCP.SYN | TCP.ACK );
409 previousstate = state ;
410 state = TCP.SYN_RCVD ;
411 notifyAll();
412 return;
413 }
414 break;
415 case TCP.ESTABLISHED:
416 if( message.fin ){
417 if( closeserver ){
418 send( TCP.FIN | TCP.ACK );
419 previousstate = state ;
420 state = TCP.LAST_ACK ;
421 notifyAll();
422 return;
423 } else {
424 send( TCP.ACK );
425 previousstate = state ;
426 state = TCP.CLOSE_WAIT ;
427 notifyAll();
428 return ;
429 }
430 } else if( message.psh ){
431 {
432 int i = - 1 ;
433 while( ++ i < message.data.length ){
434 readbuffer[( readoffset + readcount + i ) % maxwindowsize ] = message.data[ i ];
435 }
436 }
437 readcount += message.data.length ;
438 windowsize -= message.data.length ;
439 acknowledge();
440 notifyAll();
441 return ;
442 } else if( message.ack ){
443 notifyAll();
444 return ;
445 }
446 break;
447 case TCP.CLOSE_WAIT :
448 return;
449 case TCP.FIN_WAIT_1 :
450 if( message.fin && message.ack ){
451 send( TCP.ACK );
452 timeWait();
453 return ;
454 } else if( message.fin ){
455 send( TCP.ACK );
456 previousstate = state ;
457 state = TCP.CLOSING ;
458 notifyAll();
459 return ;
460 } else if( message.ack ){
461 previousstate = state ;
462 state = TCP.FIN_WAIT_2 ;
463 notifyAll();
464 return ;
465 }
466 break;
467 case TCP.CLOSING :
468 if( message.ack ){
469 timeWait();
470 return;
471 }
472 break;
473 case TCP.LAST_ACK :
474 if( message.ack ){
475 resetSocket();
476 notifyAll();
477 return;
478 }
479 break;
480 case TCP.FIN_WAIT_2 :
481 if( message.fin ){
482 send( TCP.ACK );
483 timeWait();
484 return ;
485 }
486 break;
487 case TCP.TIME_WAIT :
488 if( message.fin ){
489 send( TCP.ACK );
490 notifyAll();
491 return ;
492 }
493 break;
494 }
495
496 if( ! message.rst ){
497 System.err.println("RST from state: " + state );
498 send( TCP.RST );
499 }
500
501 resetSocket();
502
503 throw new IOException("Connection reset by peer");
504 }
505
506 void activeOpen( InetAddress address , short port ) throws IOException {
507
508 if( state != TCP.CLOSED ){
509 throw new IOException("Active Open only permitted from CLOSED state");
510 }
511
512 this.address = address ;
513 this.port = port ;
514
515 localseqnum = ISNCounter.getCounter();
516 destseqnum = 0 ;
517
518 TCPOptions options = new TCPOptions();
519
520 options.setMaxSegmentSize( (short) 1440 );
521
522 state = TCP.SYN_SENT ;
523 }
524
525 void passiveOpen() throws IOException {
526
527 if( state != TCP.CLOSED ){
528 throw new IOException("Passive Open only permitted from CLOSED state");
529 }
530
531 state = TCP.LISTEN ;
532 }
533
534 synchronized void tcpjClose() throws IOException {
535
536 switch( state ){
537
538 case TCP.ESTABLISHED:
539 case TCP.SYN_RCVD:
540 state = TCP.FIN_WAIT_1 ;
541 sendAndAwaitACK( TCP.FIN );
542 break;
543 case TCP.SYN_SENT:
544 resetSocket();
545 break;
546 case TCP.CLOSE_WAIT:
547 state = TCP.LAST_ACK ;
548 sendAndAwaitACK( TCP.FIN );
549 break;
550 default:
551 // Will reach here if an exception has been thrown during socket set-up
552 }
553
554 while( state != TCP.CLOSED ){
555 try {
556 wait();
557 } catch( InterruptedException ie ){
558 }
559 }
560
561 msltimer.interrupt();
562
563 if( debug ){
564 System.err.println("Connection closed");
565 }
566
567 resetSocket();
568 }
569
570 synchronized void setState( int state ){
571 this.state = state ;
572 notifyAll();
573 }
574
575 synchronized void timeWait() throws IOException {
576
577 state = TCP.TIME_WAIT ;
578
579 msltimer.start();
580 }
581
582 /**
583 * Writes a buffer and awaits acknowledgement.
584 */
585
586 public synchronized void write( byte[] buffer , int offset , int count ) throws IOException {
587
588 if( state != TCP.ESTABLISHED ){
589 throw new IOException("Write not possible from current state");
590 }
591
592 sendAndAwaitACK( TCP.PSH , new TCPOptions() , buffer , offset , count );
593 }
594
595 /**
596 * Reads a buffer.
597 */
598
599 public synchronized int read( byte[] buffer , int offset , int count ) throws IOException {
600
601 if( state != TCP.ESTABLISHED ){
602 return 0 ;
603 }
604
605 final int initialcount = count ,
606 initialoffset = offset ;
607
608 while( count > 0 ){
609 while( readcount == 0 ){
610 try {
611 wait();
612 } catch( InterruptedException ie ) {
613 }
614 }
615 windowsize += Math.min( count , readcount );
616 while( count > 0 && readcount > 0 ){
617 buffer[ offset ] = readbuffer[ readoffset % maxwindowsize ];
618 ++ offset ;
619 ++ readoffset ;
620 -- readcount ;
621 -- count ;
622 }
623 }
624
625 return initialcount ;
626 }
627
628 /**
629 * Called by <code>TCPJListener</code> if a message is received.
630 * @see TCPJListener
631 */
632
633 public void propertyChange( PropertyChangeEvent evt ){
634
635 IP4Message ipmessage = (IP4Message) evt.getOldValue();
636
637 if( address instanceof InetAddress ){
638
639 byte[] destinationaddress = address.getAddress();
640
641 if( destinationaddress[ 0 ] != ipmessage.source[ 0 ] ||
642 destinationaddress[ 1 ] != ipmessage.source[ 1 ] ||
643 destinationaddress[ 2 ] != ipmessage.source[ 2 ] ||
644 destinationaddress[ 3 ] != ipmessage.source[ 3 ] ){
645 return ;
646 }
647 }
648
649 if( localhost instanceof InetAddress ){
650
651 byte[] localaddress = localhost.getAddress();
652
653 if( localaddress[ 0 ] != ipmessage.destination[ 0 ] ||
654 localaddress[ 1 ] != ipmessage.destination[ 1 ] ||
655 localaddress[ 2 ] != ipmessage.destination[ 2 ] ||
656 localaddress[ 3 ] != ipmessage.destination[ 3 ] ){
657 return ;
658 }
659 }
660
661 TCPMessage tcpmessage = (TCPMessage) evt.getNewValue();
662
663 if( port != 0 && port != tcpmessage.sourceport || localport != tcpmessage.destinationport ){
664 return ;
665 }
666
667 if( debug ){
668 System.err.println("RECEIVE:");
669 SocketUtils.dump( System.err , ipmessage.data , 0 , ipmessage.data.length );
670 }
671
672 try {
673 if( address == null ){
674 address = InetAddress.getByName( ipmessage.source[ 0 ] + "." +
675 ipmessage.source[ 1 ] + "." +
676 ipmessage.source[ 2 ] + "." +
677 ipmessage.source[ 3 ] );
678 }
679 if( port == 0 ){
680 port = tcpmessage.sourceport ;
681 }
682 receive( tcpmessage );
683 } catch( Exception e ) {
684 }
685 }
686
687 /**
688 * Creates a new TCPJ socket.
689 */
690
691 public void create( boolean stream ) throws IOException {
692
693 if( ! stream ){
694 throw new IOException("TCP/J does not support datagram sockets");
695 }
696
697 resetSocket();
698 }
699
700 /**
701 * Binds a socket to the given address and port.
702 */
703
704 public void bind( InetAddress inetAddress , int port ) throws IOException {
705
706 localhost = inetAddress ;
707
708 if( port != (short) port ){
709 throw new IOException("Illegal port number");
710 } else if( port == 0 ) {
711 throw new IOException("Use of port 0 is prohibited");
712 } else if( port >= minEphemeralPort && port <= maxEphemeralPort ) {
713 throw new IOException("Ports " + minEphemeralPort + " to " + maxEphemeralPort + " are reserved for ephemeral use");
714 } else {
715 localport = (short) port ;
716 }
717 }
718
719 /**
720 * Connects to a remote address.
721 */
722
723 public void connect( String hostname , int port ) throws IOException {
724
725 connect( InetAddress.getByName( hostname ) , port );
726 }
727
728 /**
729 * Connects to a socket address.
730 */
731
732 public void connect( SocketAddress address , int timeout ) throws IOException {
733 throw new IOException("SocketAddress not supported");
734 }
735
736 /**
737 * Connects to a remote address.
738 */
739
740 public synchronized void connect( InetAddress dst , int remotePort ) throws IOException {
741
742 if( remotePort != (short) remotePort ){
743 throw new IOException("Illegal port number");
744 }
745
746 TCPOptions options = new TCPOptions();
747
748 options.setMaxSegmentSize( (short) 1440 );
749
750 activeOpen( dst , (short) remotePort );
751
752 sendAndAwaitACK( TCP.SYN , options );
753
754 while( state != TCP.ESTABLISHED ){
755 try {
756 wait();
757 } catch( InterruptedException ie ) {
758 }
759 }
760 }
761
762 /**
763 * Accepts a server connection.
764 */
765
766 public synchronized void accept( SocketImpl newSocket ) {
767
768 while( state != TCP.ESTABLISHED ){
769 try {
770 wait();
771 } catch ( InterruptedException ie ) {
772 }
773 }
774
775 TCPJSocketImpl tcpjsocket = (TCPJSocketImpl) newSocket ;
776
777 tcpjsocket.resetSocket();
778
779 tcpjsocket.localport = localport ;
780 tcpjsocket.port = port ;
781 tcpjsocket.state = state ;
782 tcpjsocket.previousstate = previousstate ;
783 tcpjsocket.localseqnum = localseqnum ;
784 tcpjsocket.destseqnum = destseqnum ;
785 tcpjsocket.acknowledgedseqnum = acknowledgedseqnum ;
786 tcpjsocket.windowsize = windowsize ;
787 tcpjsocket.destwindowsize = destwindowsize ;
788
789 if( address instanceof InetAddress ){
790 try {
791 tcpjsocket.address = InetAddress.getByName( address.getHostName() );
792 } catch( UnknownHostException e ){
793 tcpjsocket.address = null ;
794 }
795 } else {
796 tcpjsocket.address = null ;
797 }
798
799 if( localhost instanceof InetAddress ){
800 try {
801 tcpjsocket.localhost = InetAddress.getByName( localhost.getHostName() );
802 } catch( UnknownHostException e ){
803 tcpjsocket.localhost = null ;
804 }
805 } else {
806 tcpjsocket.localhost = null ;
807 }
808
809 port = 0 ;
810 address = null ;
811 state = TCP.LISTEN ;
812 }
813
814 /**
815 * Closes a connection.
816 */
817
818 public void close() throws IOException {
819
820 tcpjClose();
821 }
822
823 /**
824 * Sets a server to listen for a client connection.
825 * Note that the backlog argument is ignored - currently,
826 * TCPJ cannot stack connection requests.
827 */
828
829 public void listen( int backlog ) throws IOException {
830
831 /*
832 if( backlog != 1 ){
833 throw new IOException("TCP/J only stacks a single connection request");
834 }
835 */
836 passiveOpen();
837 }
838
839 /**
840 * Sets a TCP option.
841 */
842
843 public void setOption( int optID , Object value ) throws SocketException {
844 throw new SocketException("No options supported");
845 }
846
847 /**
848 * Gets a TCP option.
849 */
850
851 public Object getOption( int optID ) throws SocketException {
852 throw new SocketException("No options supported");
853 }
854
855 /**
856 * Returns the number of bytes available to be read without polling.
857 */
858
859 public int available() {
860 return readcount ;
861 }
862
863 /**
864 * Gets an input stream to read.
865 */
866
867 public InputStream getInputStream() throws IOException {
868 return new TCPJInputStream( this );
869 }
870
871 /**
872 * Gets an output stream to write.
873 */
874
875 public OutputStream getOutputStream() throws IOException {
876 return new TCPJOutputStream( this );
877 }
878
879 /**
880 * Urgent data is not supported.
881 */
882
883 public void sendUrgentData(int data) throws IOException {
884 throw new IOException("Urgent data not supported");
885 }
886 }