Docjar: A Java Source and Docuemnt Enginecom.*    java.*    javax.*    org.*    all    new    plug-in

Quick Search    Search Deep

Source code: Freenet/ConnectionHandler.java


1   package Freenet;
2   import Freenet.support.*;
3   import java.net.*;
4   import java.io.*;
5   import java.io.EOFException;
6   /*
7     This code is part of the Java Adaptive Network Client by Ian Clarke. 
8     It is distributed under the GNU General Public Licence (GPL) 
9     version 2.  See http://www.gnu.org/ for further details of the GPL.
10   */
11  
12  /**
13   * Handles both sending and receiving messages on a connection.
14   *
15   * @author <A HREF="mailto:I.Clarke@strs.co.uk">Ian Clarke</A>
16   * @author <a href="mailto:blanu@uts.cc.utexas.edu">Brandon Wiley</a>
17   **/
18  
19  public class ConnectionHandler extends Thread
20  {
21    // Protected/Private Fields
22      private static long ids=0;
23      private long id;
24    private Connection c;
25    private MessageHandler mh;
26    private volatile Long pongs = new Long(0);
27      private boolean closed;
28      private boolean waiting = true;
29  
30      protected class ConnectionCB implements Callback {
31    public void callback() {
32        close();
33    }
34      }
35  
36    // Constructors
37    public ConnectionHandler(Connection c, MessageHandler mh) {
38  
39        this.c = c;
40        this.mh = mh;
41        this.id = ++ids;
42        Logger.log("ConnectionHandler.java#"+ id,"New connectionhandler with " + peer(),Logger.DEBUGGING);     
43    }
44  
45  
46      // Public Methods
47    public void run() {
48    
49      message: do {
50    RawMessage m=null;
51    Message msg = null;
52    try {
53        if(c==null) 
54      break message;
55  
56        while (!closed && c.in.available() == 0) {
57          try {
58              Thread.sleep(50);
59          } catch (InterruptedException e) {
60          }
61        }
62        if (closed) break message;
63  
64        m = new RawMessage(c.in);
65        closed = !m.isKeepAlive();
66  
67        Logger.log("ConnectionHandler.java#"+ id,"Rawmessage:\n"+m,Logger.MINOR);
68        msg = MessageFactory.toMessage(m);
69  
70    } catch (InvalidMessageException e) {
71        Logger.log("ConnectionHandler.java#" + id,"Invalid message: " + e.toString(),Logger.MINOR);
72        continue message; 
73    } catch (EOFException e) {
74        break message; // this stream is over
75    } catch (IOException e) {
76        break message;
77    }
78  
79    String logstr=m.messageType+" <- "+c.getPeerAddress();
80    if(Logger.verbosity>0) logstr=Long.toHexString(msg.id)+" - "+logstr;
81    Logger.log("ConnectionHandler.java#"+id,logstr,Logger.NORMAL);
82    Logger.log("ConnectionHandler.java#"+id,"Message:\n"+msg,Logger.DEBUG);
83  
84    // set the receivedAt, source, and receivedWith fields
85    Address source = null;
86    try {
87        source = new Address(m.readField("Source"));
88    } catch (Exception e) {}
89  
90    Logger.log("ConnectionHandler.java#"+id,"Got on " + c.getMyAddress() + " from " + (source != null ? c.getPeerAddress(source.listenPart()) : c.getPeerAddress()), Logger.DEBUGGING);
91  
92    msg.initSources(c.getMyAddress(),source == null ? null : c.getPeerAddress(source.listenPart()), m.isKeepAlive() ? this : null);
93  
94    // Handle
95    mh.handle(msg);
96  
97    Logger.log("ConnectionHandler.java#"+id,"Finished with message",Logger.DEBUGGING);
98  
99    if (m.trailingFieldLength() > 0) {
100       Logger.log("ConnectionHandler.java#"+id,"Waiting on trailing field",Logger.DEBUGGING);
101       // wait() lets go of the input so the trailing field can be read by the handling code. The matching notify is (rather deviously) called from the Conduit class. 
102       synchronized(m.trailingFieldStream) {
103     try {
104         m.trailingFieldStream.wait();
105     } catch (InterruptedException e) {}
106     Logger.log("ConnectionHandler.java#"+id,"I got notified!",Logger.DEBUGGING);
107       }
108   }
109     } while (!closed);
110 
111     Logger.log("ConnectionHandler.java#"+id,"Finished with connection - closing",Logger.DEBUGGING);
112     synchronized(c.out) {
113   Logger.log("ConnectionHandler.java#"+id,"Finished with connection - closing",Logger.DEBUGGING);
114   c.close();
115     }
116     }
117 
118      /**
119      * Sends a message using this connection
120      * @param m            the message to send
121      */   
122     public void sendMessage(Message m) throws SendFailedException {
123   sendMessage(m, null, null);
124     }
125 
126     /**
127      * Sends a message using this connection
128      * @param m            the message to send
129      * @param datatunnel   a stream to tunnel the data through (presumably so it can be copied to a file as well)
130      * @param ByteCounter  a way to count how much data is sent
131      */
132 
133     public void sendMessage(Message m, SplitOutputStream datatunnel, ByteCounter count) throws SendFailedException {
134   Logger.log("ConnectionHandler.java#"+id,"Sending a message",Logger.DEBUGGING);
135   RawMessage raw=m.toRawMessage();
136         String logstr=raw.messageType+" -> "+c.getPeerAddress();
137         if(Logger.verbosity>0) logstr=Long.toHexString(m.id)+" - "+logstr;
138   Logger.log("ConnectionHandler.java#"+id,logstr,Logger.NORMAL);
139   synchronized(c.out) {
140       try {
141     raw.writeMessage(c.out);
142       } catch (IOException e) {
143     throw new SendFailedException(c.getPeerAddress());
144       }
145       InputStream data = raw.getTrailing();
146       if (data != null) {
147     ConnectionCB ccb;
148     if (!raw.isKeepAlive())
149         ccb = new ConnectionCB();
150     else
151         ccb = null;
152     
153     if (datatunnel!=null) {
154         datatunnel.addOutput(c.out,ccb);
155         (new Conduit(data, datatunnel, count)).asyncFeed(datatunnel,null,raw.trailingFieldLength()); // the null should be a callback that is called when it gets interupted before finnishing the data - at least now nothing happens, so the data will never get finalized (see Data.java and DataCallback)
156     } else {
157         (new Conduit(data, c.out, count)).asyncFeed(ccb, null, raw.trailingFieldLength());
158     }
159       } else if (!raw.isKeepAlive()) {
160     this.close();
161       }
162   } // the conduit is synchronized on the output stream too, so it should not start until now - there is a small danger here that another sendMessage would win the race instead of the conduit and start instead...
163     }
164     
165     //I'm letting this force close now, maybe it should synchronize somehow (it can't sync to c.in anyways, since that is always waiting on a read in the RawMessage constructor).
166     public void close() {
167   closed = true;
168   this.interrupt();
169     }
170 
171     /**
172      * forces this connection to close. Bad things will happen
173      * if you use this carelessly.
174      **/
175     public void forceClose() {
176   closed = true;
177   c.close();
178     }
179 
180     /**
181      * Checks whether the connection is alive
182      **/
183 
184     public boolean isOpen() {
185   return isAlive() && !closed;
186     }
187 
188     public Address peer() {
189   return c.getPeerAddress();
190     }
191 
192     public Address peer(ListeningAddress laddr) {
193   return c.getPeerAddress(laddr);
194     }
195 
196     public Address local() {
197   return c.getMyAddress();
198     }
199 
200     public Address local(ListeningAddress laddr) {
201   return c.getMyAddress(laddr);
202     }
203 }