Source code: edu/emory/mathcs/util/net/inproc/InProcServerSocket.java
1 /* ***** BEGIN LICENSE BLOCK *****
2 * Version: MPL 1.1/GPL 2.0/LGPL 2.1
3 *
4 * The contents of this file are subject to the Mozilla Public License Version
5 * 1.1 (the "License"); you may not use this file except in compliance with
6 * the License. You may obtain a copy of the License at
7 * http://www.mozilla.org/MPL/
8 *
9 * Software distributed under the License is distributed on an "AS IS" basis,
10 * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
11 * for the specific language governing rights and limitations under the
12 * License.
13 *
14 * The Original Code is the Emory Utilities.
15 *
16 * The Initial Developer of the Original Code is
17 * The Distributed Computing Laboratory, Emory University.
18 * Portions created by the Initial Developer are Copyright (C) 2002
19 * the Initial Developer. All Rights Reserved.
20 *
21 * Alternatively, the contents of this file may be used under the terms of
22 * either the GNU General Public License Version 2 or later (the "GPL"), or
23 * the GNU Lesser General Public License Version 2.1 or later (the "LGPL"),
24 * in which case the provisions of the GPL or the LGPL are applicable instead
25 * of those above. If you wish to allow use of your version of this file only
26 * under the terms of either the GPL or the LGPL, and not to allow others to
27 * use your version of this file under the terms of the MPL, indicate your
28 * decision by deleting the provisions above and replace them with the notice
29 * and other provisions required by the GPL or the LGPL. If you do not delete
30 * the provisions above, a recipient may use your version of this file under
31 * the terms of any one of the MPL, the GPL or the LGPL.
32 *
33 * ***** END LICENSE BLOCK ***** */
34
35 package edu.emory.mathcs.util.net.inproc;
36
37 import java.io.*;
38 import java.net.*;
39 import java.util.*;
40
41 import edu.emory.mathcs.util.concurrent.*;
42 import edu.emory.mathcs.util.io.*;
43
44 /**
45 * Abstraction of a server socket which can be accessed only from within a
46 * process. While this
47 * class fully adheres to the socket API, it is a server socket that accepts
48 * connections only from appropriate {@link InProcSocket "sockets"} within the
49 * same process. The class can be used to create local in-process bindings
50 * within APIs that assume remote access. For instance, when used as an RMI
51 * transport, in-process sockets can interconnect local objects while
52 * maintaining remote invocation semantics (pass-by-value etc.) yet avoiding
53 * security risks associated with network sockets and offering a bit better
54 * performance than a loopback network interface.
55 *
56 * @see InProcSocket
57 *
58 * @author Dawid Kurzyniec
59 * @version 1.0
60 */
61 public abstract class InProcServerSocket extends ServerSocket {
62
63 final static Map bindings = new HashMap();
64 final static Random random = new Random();
65
66 boolean bound;
67 volatile boolean closed;
68 int localPort;
69 int soTimeout = 0;
70
71 BlockingQueue connectionQueue;
72
73 public InProcServerSocket() throws IOException {
74 super();
75 }
76
77 public InProcServerSocket(int port) throws IOException {
78 this(port, 50);
79 }
80
81 public InProcServerSocket(int port, int backlog) throws IOException {
82 super();
83 bind(new InProcSocketAddress(port), backlog);
84 }
85
86 public synchronized void bind(SocketAddress endpoint, int backlog) throws IOException {
87 ensureNotClosed();
88 if (isBound()) throw new SocketException("Already bound");
89 if (endpoint == null) endpoint = new InProcSocketAddress(0);
90 if (backlog <= 0) throw new IllegalArgumentException("Backlog must be positive");
91 if (!(endpoint instanceof InProcSocketAddress))
92 throw new IllegalArgumentException("Unsupported address type");
93 int port = ((InProcSocketAddress)endpoint).port;
94 try {
95 /** @todo security (checkListen) check? */
96 synchronized (bindings) {
97 if (port != 0 && bindings.containsKey(new Integer(port))) {
98 throw new BindException("InProc port " + port + " already in use");
99 }
100 else {
101 boolean ok = false;
102 for (int i=0; i<100; i++) {
103 port = random.nextInt() & 0x7FFFFFFF;
104 // try to fit into the 2-byte range
105 if (i<4) port = port & 0x3FFF;
106 else if (i<8) port = port & 0xFFFF;
107 if (port > 1024 && !bindings.containsKey(new Integer(port))) {
108 ok = true;
109 break;
110 }
111 }
112 if (!ok) {
113 throw new BindException("InProc: Could not find available port to " +
114 "listen on");
115 }
116 }
117
118 // at this point, we have established an available port number
119 bindings.put(new Integer(port), this);
120 }
121 localPort = port;
122 bound = true;
123 connectionQueue = new DynamicArrayBlockingQueue(4, backlog);
124 }
125 catch (SecurityException e) {
126 bound = false;
127 throw e;
128 }
129 catch (IOException e) {
130 bound = false;
131 throw e;
132 }
133 }
134
135 public boolean isClosed() {
136 return closed;
137 }
138
139 public boolean isBound() {
140 return bound;
141 }
142
143 public InetAddress getInetAddress() {
144 // not an Inet socket, but for security managers it is better to report
145 // that it is a "local" socket
146 return InProcSocket.inprocInetAddr;
147 }
148
149 public int getLocalPort() {
150 if (!isBound()) return -1;
151 return localPort;
152 }
153
154 public SocketAddress getLocalSocketAddress() {
155 if (!isBound()) return null;
156 return new InProcSocketAddress(getLocalPort());
157 }
158 //
159 // public Socket accept() throws IOException {
160 // Socket socket = delegate.accept();
161 // return wrapAcceptedSocket(socket);
162 // }
163 //
164
165 public Socket accept() throws IOException {
166 ensureNotClosed();
167 if (!isBound()) throw new SocketException("Socket is not bound yet");
168 int soTimeout = this.soTimeout;
169 ConnReq req;
170 InProcSocket.Channel ch;
171 do {
172 try {
173 if (soTimeout > 0) {
174 req = (ConnReq)connectionQueue.poll(soTimeout, TimeUnit.MILLISECONDS);
175 }
176 else {
177 req = (ConnReq)connectionQueue.take();
178 }
179 }
180 catch (InterruptedException e) {
181 throw new InterruptedIOException(e.toString());
182 }
183 if (req == null) {
184 throw new SocketTimeoutException("Timeout on InProcServerSocket.accept");
185 }
186 if (req == TERMINATOR) {
187 throw new SocketException("Socket closed");
188 }
189 ch = req.accept();
190 }
191 while (ch == null);
192 return new InProcSocket(ch, localPort);
193 }
194
195
196 private static final ConnReq TERMINATOR = new ConnReq();
197
198 public synchronized void close() throws IOException {
199 closed = true;
200 while (true) {
201 ConnReq connReq = (ConnReq)connectionQueue.poll();
202 if (connReq == null) break;
203 connReq.refuse();
204 }
205 // abort possibly blocked accept
206 try {
207 connectionQueue.put(TERMINATOR);
208 }
209 catch (InterruptedException e) {
210 throw new RuntimeException("FATAL: Blocked when putting into empty queue");
211 }
212 if (isBound()) {
213 synchronized (bindings) {
214 bindings.remove(new Integer(localPort));
215 }
216 }
217 }
218
219 // public ServerSocketChannel getChannel() {
220 // return null;
221 // }
222 //
223 public void setSoTimeout(int timeout) throws SocketException {
224 if (timeout < 0) throw new IllegalArgumentException("Timeout must be non-negative");
225 ensureNotClosed();
226 soTimeout = timeout;
227 }
228
229 public int getSoTimeout() throws IOException {
230 ensureNotClosed();
231 return soTimeout;
232 }
233
234 public void setReuseAddress(boolean on) throws SocketException {
235 ensureNotClosed();
236 // no op
237 }
238
239 public boolean getReuseAddress() throws SocketException {
240 ensureNotClosed();
241 return true;
242 }
243
244 public String toString() {
245 if (!isBound()) return "InProcServerSocket[unbound]";
246 return "InProcServerSocket[port=" + localPort + "]";
247 }
248
249 // public void setReceiveBufferSize (int size) throws SocketException {
250 // delegate.setReceiveBufferSize(size);
251 // }
252 //
253 // public int getReceiveBufferSize() throws SocketException{
254 // return delegate.getReceiveBufferSize();
255 // }
256 //
257
258 private void ensureNotClosed() throws SocketException {
259 if (isClosed()) throw new SocketException("Socket is closed");
260 }
261
262 static InProcSocket.Channel connect(int port, int timeout) throws IOException {
263 InProcServerSocket srvsock;
264 synchronized (bindings) {
265 srvsock = (InProcServerSocket)bindings.get(new Integer(port));
266 }
267 if (srvsock == null) {
268 throw new IOException("Connection refused (server not listening)");
269 }
270 boolean success;
271 ConnReq connreq = new ConnReq();
272 synchronized (srvsock) {
273 if (srvsock.isClosed()) throw new IOException("Connection refused (server closed)");
274 success = srvsock.connectionQueue.offer(connreq);
275 }
276 if (!success) {
277 throw new IOException("Connection refused (queue full, try again later)");
278 }
279 return connreq.awaitOrCancel(timeout);
280 }
281
282 private static class ConnReq {
283 boolean accepted = false;
284 boolean cancelled = false;
285 boolean refused = false;
286 InProcSocket.Channel srvChannel;
287 InProcSocket.Channel cliChannel;
288
289 ConnReq() {}
290 public synchronized boolean cancel() {
291 if (accepted || refused) return false;
292 cancelled = true;
293 notifyAll();
294 return true;
295 }
296 public synchronized InProcSocket.Channel accept() {
297 if (cancelled) return null;
298 if (accepted || refused) throw new IllegalStateException("Already responsed");
299 accepted = true;
300 /** @todo bufsize */
301 BufferedPipe upstream = new BufferedPipe();
302 BufferedPipe downstream = new BufferedPipe();
303 srvChannel = new InProcSocket.Channel((TimedInput)upstream.sink(),
304 downstream.source());
305 cliChannel = new InProcSocket.Channel((TimedInput)downstream.sink(),
306 upstream.source());
307 notifyAll();
308 return srvChannel;
309 }
310 public synchronized void refuse() {
311 if (cancelled) return;
312 if (accepted || refused) throw new IllegalStateException("Already responsed");
313 refused = true;
314 notifyAll();
315 }
316 public synchronized InProcSocket.Channel awaitOrCancel(int timeout) throws IOException {
317 try {
318 if (timeout == 0) {
319 while (!accepted && !cancelled && !refused) {
320 wait();
321 }
322 }
323 else {
324 long endtime = timeout + System.currentTimeMillis();
325 while (!accepted && !cancelled && !refused) {
326 long todo = endtime - System.currentTimeMillis();
327 if (todo > 0) {
328 wait(todo);
329 }
330 else {
331 cancel();
332 break;
333 }
334 }
335 }
336
337 if (cancelled) throw new SocketTimeoutException("Connection timed out");
338 if (refused) throw new IOException("Connection refused (server closing)");
339 // otherwise must be accepted
340 return cliChannel;
341 }
342 catch (InterruptedException e) {
343 if (accepted) return cliChannel;
344 cancel();
345 throw new InterruptedIOException("Connect interrupted");
346 }
347 }
348 }
349 }
350