Source code: Freenet/ConnectionHandler.java
1 package Freenet;
2 import Freenet.support.*;
3 import java.net.*;
4 import java.io.*;
5 import java.io.EOFException;
6 /*
7 This code is part of the Java Adaptive Network Client by Ian Clarke.
8 It is distributed under the GNU General Public Licence (GPL)
9 version 2. See http://www.gnu.org/ for further details of the GPL.
10 */
11
12 /**
13 * Handles both sending and receiving messages on a connection.
14 *
15 * @author <A HREF="mailto:I.Clarke@strs.co.uk">Ian Clarke</A>
16 * @author <a href="mailto:blanu@uts.cc.utexas.edu">Brandon Wiley</a>
17 **/
18
19 public class ConnectionHandler extends Thread
20 {
21 // Protected/Private Fields
22 private static long ids=0;
23 private long id;
24 private Connection c;
25 private MessageHandler mh;
26 private volatile Long pongs = new Long(0);
27 private boolean closed;
28 private boolean waiting = true;
29
30 protected class ConnectionCB implements Callback {
31 public void callback() {
32 close();
33 }
34 }
35
36 // Constructors
37 public ConnectionHandler(Connection c, MessageHandler mh) {
38
39 this.c = c;
40 this.mh = mh;
41 this.id = ++ids;
42 Logger.log("ConnectionHandler.java#"+ id,"New connectionhandler with " + peer(),Logger.DEBUGGING);
43 }
44
45
46 // Public Methods
47 public void run() {
48
49 message: do {
50 RawMessage m=null;
51 Message msg = null;
52 try {
53 if(c==null)
54 break message;
55
56 while (!closed && c.in.available() == 0) {
57 try {
58 Thread.sleep(50);
59 } catch (InterruptedException e) {
60 }
61 }
62 if (closed) break message;
63
64 m = new RawMessage(c.in);
65 closed = !m.isKeepAlive();
66
67 Logger.log("ConnectionHandler.java#"+ id,"Rawmessage:\n"+m,Logger.MINOR);
68 msg = MessageFactory.toMessage(m);
69
70 } catch (InvalidMessageException e) {
71 Logger.log("ConnectionHandler.java#" + id,"Invalid message: " + e.toString(),Logger.MINOR);
72 continue message;
73 } catch (EOFException e) {
74 break message; // this stream is over
75 } catch (IOException e) {
76 break message;
77 }
78
79 String logstr=m.messageType+" <- "+c.getPeerAddress();
80 if(Logger.verbosity>0) logstr=Long.toHexString(msg.id)+" - "+logstr;
81 Logger.log("ConnectionHandler.java#"+id,logstr,Logger.NORMAL);
82 Logger.log("ConnectionHandler.java#"+id,"Message:\n"+msg,Logger.DEBUG);
83
84 // set the receivedAt, source, and receivedWith fields
85 Address source = null;
86 try {
87 source = new Address(m.readField("Source"));
88 } catch (Exception e) {}
89
90 Logger.log("ConnectionHandler.java#"+id,"Got on " + c.getMyAddress() + " from " + (source != null ? c.getPeerAddress(source.listenPart()) : c.getPeerAddress()), Logger.DEBUGGING);
91
92 msg.initSources(c.getMyAddress(),source == null ? null : c.getPeerAddress(source.listenPart()), m.isKeepAlive() ? this : null);
93
94 // Handle
95 mh.handle(msg);
96
97 Logger.log("ConnectionHandler.java#"+id,"Finished with message",Logger.DEBUGGING);
98
99 if (m.trailingFieldLength() > 0) {
100 Logger.log("ConnectionHandler.java#"+id,"Waiting on trailing field",Logger.DEBUGGING);
101 // wait() lets go of the input so the trailing field can be read by the handling code. The matching notify is (rather deviously) called from the Conduit class.
102 synchronized(m.trailingFieldStream) {
103 try {
104 m.trailingFieldStream.wait();
105 } catch (InterruptedException e) {}
106 Logger.log("ConnectionHandler.java#"+id,"I got notified!",Logger.DEBUGGING);
107 }
108 }
109 } while (!closed);
110
111 Logger.log("ConnectionHandler.java#"+id,"Finished with connection - closing",Logger.DEBUGGING);
112 synchronized(c.out) {
113 Logger.log("ConnectionHandler.java#"+id,"Finished with connection - closing",Logger.DEBUGGING);
114 c.close();
115 }
116 }
117
118 /**
119 * Sends a message using this connection
120 * @param m the message to send
121 */
122 public void sendMessage(Message m) throws SendFailedException {
123 sendMessage(m, null, null);
124 }
125
126 /**
127 * Sends a message using this connection
128 * @param m the message to send
129 * @param datatunnel a stream to tunnel the data through (presumably so it can be copied to a file as well)
130 * @param ByteCounter a way to count how much data is sent
131 */
132
133 public void sendMessage(Message m, SplitOutputStream datatunnel, ByteCounter count) throws SendFailedException {
134 Logger.log("ConnectionHandler.java#"+id,"Sending a message",Logger.DEBUGGING);
135 RawMessage raw=m.toRawMessage();
136 String logstr=raw.messageType+" -> "+c.getPeerAddress();
137 if(Logger.verbosity>0) logstr=Long.toHexString(m.id)+" - "+logstr;
138 Logger.log("ConnectionHandler.java#"+id,logstr,Logger.NORMAL);
139 synchronized(c.out) {
140 try {
141 raw.writeMessage(c.out);
142 } catch (IOException e) {
143 throw new SendFailedException(c.getPeerAddress());
144 }
145 InputStream data = raw.getTrailing();
146 if (data != null) {
147 ConnectionCB ccb;
148 if (!raw.isKeepAlive())
149 ccb = new ConnectionCB();
150 else
151 ccb = null;
152
153 if (datatunnel!=null) {
154 datatunnel.addOutput(c.out,ccb);
155 (new Conduit(data, datatunnel, count)).asyncFeed(datatunnel,null,raw.trailingFieldLength()); // the null should be a callback that is called when it gets interupted before finnishing the data - at least now nothing happens, so the data will never get finalized (see Data.java and DataCallback)
156 } else {
157 (new Conduit(data, c.out, count)).asyncFeed(ccb, null, raw.trailingFieldLength());
158 }
159 } else if (!raw.isKeepAlive()) {
160 this.close();
161 }
162 } // the conduit is synchronized on the output stream too, so it should not start until now - there is a small danger here that another sendMessage would win the race instead of the conduit and start instead...
163 }
164
165 //I'm letting this force close now, maybe it should synchronize somehow (it can't sync to c.in anyways, since that is always waiting on a read in the RawMessage constructor).
166 public void close() {
167 closed = true;
168 this.interrupt();
169 }
170
171 /**
172 * forces this connection to close. Bad things will happen
173 * if you use this carelessly.
174 **/
175 public void forceClose() {
176 closed = true;
177 c.close();
178 }
179
180 /**
181 * Checks whether the connection is alive
182 **/
183
184 public boolean isOpen() {
185 return isAlive() && !closed;
186 }
187
188 public Address peer() {
189 return c.getPeerAddress();
190 }
191
192 public Address peer(ListeningAddress laddr) {
193 return c.getPeerAddress(laddr);
194 }
195
196 public Address local() {
197 return c.getMyAddress();
198 }
199
200 public Address local(ListeningAddress laddr) {
201 return c.getMyAddress(laddr);
202 }
203 }