Docjar: A Java Source and Docuemnt Enginecom.*    java.*    javax.*    org.*    all    new    plug-in

Quick Search    Search Deep

Source code: edu/emory/mathcs/util/net/inproc/InProcServerSocket.java


1   /* ***** BEGIN LICENSE BLOCK *****
2    * Version: MPL 1.1/GPL 2.0/LGPL 2.1
3    *
4    * The contents of this file are subject to the Mozilla Public License Version
5    * 1.1 (the "License"); you may not use this file except in compliance with
6    * the License. You may obtain a copy of the License at
7    * http://www.mozilla.org/MPL/
8    *
9    * Software distributed under the License is distributed on an "AS IS" basis,
10   * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
11   * for the specific language governing rights and limitations under the
12   * License.
13   *
14   * The Original Code is the Emory Utilities.
15   *
16   * The Initial Developer of the Original Code is
17   * The Distributed Computing Laboratory, Emory University.
18   * Portions created by the Initial Developer are Copyright (C) 2002
19   * the Initial Developer. All Rights Reserved.
20   *
21   * Alternatively, the contents of this file may be used under the terms of
22   * either the GNU General Public License Version 2 or later (the "GPL"), or
23   * the GNU Lesser General Public License Version 2.1 or later (the "LGPL"),
24   * in which case the provisions of the GPL or the LGPL are applicable instead
25   * of those above. If you wish to allow use of your version of this file only
26   * under the terms of either the GPL or the LGPL, and not to allow others to
27   * use your version of this file under the terms of the MPL, indicate your
28   * decision by deleting the provisions above and replace them with the notice
29   * and other provisions required by the GPL or the LGPL. If you do not delete
30   * the provisions above, a recipient may use your version of this file under
31   * the terms of any one of the MPL, the GPL or the LGPL.
32   *
33   * ***** END LICENSE BLOCK ***** */
34  
35  package edu.emory.mathcs.util.net.inproc;
36  
37  import java.io.*;
38  import java.net.*;
39  import java.util.*;
40  
41  import edu.emory.mathcs.util.concurrent.*;
42  import edu.emory.mathcs.util.io.*;
43  
44  /**
45   * Abstraction of a server socket which can be accessed only from within a
46   * process. While this
47   * class fully adheres to the socket API, it is a server socket that accepts
48   * connections only from appropriate {@link InProcSocket "sockets"} within the
49   * same process. The class can be used to create local in-process bindings
50   * within APIs that assume remote access. For instance, when used as an RMI
51   * transport, in-process sockets can interconnect local objects while
52   * maintaining remote invocation semantics (pass-by-value etc.) yet avoiding
53   * security risks associated with network sockets and offering a bit better
54   * performance than a loopback network interface.
55   *
56   * @see InProcSocket
57   *
58   * @author Dawid Kurzyniec
59   * @version 1.0
60   */
61  public abstract class InProcServerSocket extends ServerSocket {
62  
63      final static Map bindings = new HashMap();
64      final static Random random = new Random();
65  
66      boolean bound;
67      volatile boolean closed;
68      int localPort;
69      int soTimeout = 0;
70  
71      BlockingQueue connectionQueue;
72  
73      public InProcServerSocket() throws IOException {
74          super();
75      }
76  
77      public InProcServerSocket(int port) throws IOException {
78          this(port, 50);
79      }
80  
81      public InProcServerSocket(int port, int backlog) throws IOException {
82          super();
83          bind(new InProcSocketAddress(port), backlog);
84      }
85  
86      public synchronized void bind(SocketAddress endpoint, int backlog) throws IOException {
87          ensureNotClosed();
88          if (isBound()) throw new SocketException("Already bound");
89          if (endpoint == null) endpoint = new InProcSocketAddress(0);
90          if (backlog <= 0) throw new IllegalArgumentException("Backlog must be positive");
91          if (!(endpoint instanceof InProcSocketAddress))
92              throw new IllegalArgumentException("Unsupported address type");
93          int port = ((InProcSocketAddress)endpoint).port;
94          try {
95              /** @todo security (checkListen) check? */
96              synchronized (bindings) {
97                  if (port != 0 && bindings.containsKey(new Integer(port))) {
98                      throw new BindException("InProc port " + port + " already in use");
99                  }
100                 else {
101                     boolean ok = false;
102                     for (int i=0; i<100; i++) {
103                         port = random.nextInt() & 0x7FFFFFFF;
104                         // try to fit into the 2-byte range
105                         if (i<4) port = port & 0x3FFF;
106                         else if (i<8) port = port & 0xFFFF;
107                         if (port > 1024 && !bindings.containsKey(new Integer(port))) {
108                             ok = true;
109                             break;
110                         }
111                     }
112                     if (!ok) {
113                         throw new BindException("InProc: Could not find available port to " +
114                                                 "listen on");
115                     }
116                 }
117 
118                 // at this point, we have established an available port number
119                 bindings.put(new Integer(port), this);
120             }
121             localPort = port;
122             bound = true;
123             connectionQueue = new DynamicArrayBlockingQueue(4, backlog);
124         }
125         catch (SecurityException e) {
126             bound = false;
127             throw e;
128         }
129         catch (IOException e) {
130             bound = false;
131             throw e;
132         }
133     }
134 
135     public boolean isClosed() {
136         return closed;
137     }
138 
139     public boolean isBound() {
140         return bound;
141     }
142 
143     public InetAddress getInetAddress() {
144         // not an Inet socket, but for security managers it is better to report
145         // that it is a "local" socket
146         return InProcSocket.inprocInetAddr;
147     }
148 
149     public int getLocalPort() {
150         if (!isBound()) return -1;
151         return localPort;
152     }
153 
154     public SocketAddress getLocalSocketAddress() {
155         if (!isBound()) return null;
156         return new InProcSocketAddress(getLocalPort());
157     }
158 //
159 //    public Socket accept() throws IOException {
160 //        Socket socket = delegate.accept();
161 //        return wrapAcceptedSocket(socket);
162 //    }
163 //
164 
165     public Socket accept() throws IOException {
166         ensureNotClosed();
167         if (!isBound()) throw new SocketException("Socket is not bound yet");
168         int soTimeout = this.soTimeout;
169         ConnReq req;
170         InProcSocket.Channel ch;
171         do {
172             try {
173                 if (soTimeout > 0) {
174                     req = (ConnReq)connectionQueue.poll(soTimeout, TimeUnit.MILLISECONDS);
175                 }
176                 else {
177                     req = (ConnReq)connectionQueue.take();
178                 }
179             }
180             catch (InterruptedException e) {
181                 throw new InterruptedIOException(e.toString());
182             }
183             if (req == null) {
184                 throw new SocketTimeoutException("Timeout on InProcServerSocket.accept");
185             }
186             if (req == TERMINATOR) {
187                 throw new SocketException("Socket closed");
188             }
189             ch = req.accept();
190         }
191         while (ch == null);
192         return new InProcSocket(ch, localPort);
193     }
194 
195 
196     private static final ConnReq TERMINATOR = new ConnReq();
197 
198     public synchronized void close() throws IOException {
199         closed = true;
200         while (true) {
201             ConnReq connReq = (ConnReq)connectionQueue.poll();
202             if (connReq == null) break;
203             connReq.refuse();
204         }
205         // abort possibly blocked accept
206         try {
207             connectionQueue.put(TERMINATOR);
208         }
209         catch (InterruptedException e) {
210             throw new RuntimeException("FATAL: Blocked when putting into empty queue");
211         }
212         if (isBound()) {
213             synchronized (bindings) {
214                 bindings.remove(new Integer(localPort));
215             }
216         }
217     }
218 
219 //    public ServerSocketChannel getChannel() {
220 //        return null;
221 //    }
222 //
223     public void setSoTimeout(int timeout) throws SocketException {
224         if (timeout < 0) throw new IllegalArgumentException("Timeout must be non-negative");
225         ensureNotClosed();
226         soTimeout = timeout;
227     }
228 
229     public int getSoTimeout() throws IOException {
230         ensureNotClosed();
231         return soTimeout;
232     }
233 
234     public void setReuseAddress(boolean on) throws SocketException {
235         ensureNotClosed();
236         // no op
237     }
238 
239     public boolean getReuseAddress() throws SocketException {
240         ensureNotClosed();
241         return true;
242     }
243 
244     public String toString() {
245         if (!isBound()) return "InProcServerSocket[unbound]";
246         return "InProcServerSocket[port=" + localPort + "]";
247     }
248 
249 //    public void setReceiveBufferSize (int size) throws SocketException {
250 //        delegate.setReceiveBufferSize(size);
251 //    }
252 //
253 //    public int getReceiveBufferSize() throws SocketException{
254 //        return delegate.getReceiveBufferSize();
255 //    }
256 //
257 
258     private void ensureNotClosed() throws SocketException {
259         if (isClosed()) throw new SocketException("Socket is closed");
260     }
261 
262     static InProcSocket.Channel connect(int port, int timeout) throws IOException {
263         InProcServerSocket srvsock;
264         synchronized (bindings) {
265             srvsock = (InProcServerSocket)bindings.get(new Integer(port));
266         }
267         if (srvsock == null) {
268             throw new IOException("Connection refused (server not listening)");
269         }
270         boolean success;
271         ConnReq connreq = new ConnReq();
272         synchronized (srvsock) {
273             if (srvsock.isClosed()) throw new IOException("Connection refused (server closed)");
274             success = srvsock.connectionQueue.offer(connreq);
275         }
276         if (!success) {
277             throw new IOException("Connection refused (queue full, try again later)");
278         }
279         return connreq.awaitOrCancel(timeout);
280     }
281 
282     private static class ConnReq {
283         boolean accepted  = false;
284         boolean cancelled = false;
285         boolean refused = false;
286         InProcSocket.Channel srvChannel;
287         InProcSocket.Channel cliChannel;
288 
289         ConnReq() {}
290         public synchronized boolean cancel() {
291             if (accepted || refused) return false;
292             cancelled = true;
293             notifyAll();
294             return true;
295         }
296         public synchronized InProcSocket.Channel accept() {
297             if (cancelled) return null;
298             if (accepted || refused) throw new IllegalStateException("Already responsed");
299             accepted = true;
300             /** @todo bufsize */
301             BufferedPipe upstream = new BufferedPipe();
302             BufferedPipe downstream = new BufferedPipe();
303             srvChannel = new InProcSocket.Channel((TimedInput)upstream.sink(),
304                                                  downstream.source());
305             cliChannel = new InProcSocket.Channel((TimedInput)downstream.sink(),
306                                                  upstream.source());
307             notifyAll();
308             return srvChannel;
309         }
310         public synchronized void refuse() {
311             if (cancelled) return;
312             if (accepted || refused) throw new IllegalStateException("Already responsed");
313             refused = true;
314             notifyAll();
315         }
316         public synchronized InProcSocket.Channel awaitOrCancel(int timeout) throws IOException {
317             try {
318                 if (timeout == 0) {
319                     while (!accepted && !cancelled && !refused) {
320                         wait();
321                     }
322                 }
323                 else {
324                     long endtime = timeout + System.currentTimeMillis();
325                     while (!accepted && !cancelled && !refused) {
326                         long todo = endtime - System.currentTimeMillis();
327                         if (todo > 0) {
328                             wait(todo);
329                         }
330                         else {
331                             cancel();
332                             break;
333                         }
334                     }
335                 }
336 
337                 if (cancelled) throw new SocketTimeoutException("Connection timed out");
338                 if (refused) throw new IOException("Connection refused (server closing)");
339                 // otherwise must be accepted
340                 return cliChannel;
341             }
342             catch (InterruptedException e) {
343                 if (accepted) return cliChannel;
344                 cancel();
345                 throw new InterruptedIOException("Connect interrupted");
346             }
347         }
348     }
349 }
350