1 /*
2 * Copyright 1995-2006 Sun Microsystems, Inc. All Rights Reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation. Sun designates this
8 * particular file as subject to the "Classpath" exception as provided
9 * by Sun in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
22 * CA 95054 USA or visit www.sun.com if you need additional information or
23 * have any questions.
24 */
25
26 package java.io;
27
28 /**
29 * A piped input stream should be connected
30 * to a piped output stream; the piped input
31 * stream then provides whatever data bytes
32 * are written to the piped output stream.
33 * Typically, data is read from a <code>PipedInputStream</code>
34 * object by one thread and data is written
35 * to the corresponding <code>PipedOutputStream</code>
36 * by some other thread. Attempting to use
37 * both objects from a single thread is not
38 * recommended, as it may deadlock the thread.
39 * The piped input stream contains a buffer,
40 * decoupling read operations from write operations,
41 * within limits.
42 * A pipe is said to be <a name=BROKEN> <i>broken</i> </a> if a
43 * thread that was providing data bytes to the connected
44 * piped output stream is no longer alive.
45 *
46 * @author James Gosling
47 * @see java.io.PipedOutputStream
48 * @since JDK1.0
49 */
50 public class PipedInputStream extends InputStream {
51 boolean closedByWriter = false;
52 volatile boolean closedByReader = false;
53 boolean connected = false;
54
55 /* REMIND: identification of the read and write sides needs to be
56 more sophisticated. Either using thread groups (but what about
57 pipes within a thread?) or using finalization (but it may be a
58 long time until the next GC). */
59 Thread readSide;
60 Thread writeSide;
61
62 private static final int DEFAULT_PIPE_SIZE = 1024;
63
64 /**
65 * The default size of the pipe's circular input buffer.
66 * @since JDK1.1
67 */
68 // This used to be a constant before the pipe size was allowed
69 // to change. This field will continue to be maintained
70 // for backward compatibility.
71 protected static final int PIPE_SIZE = DEFAULT_PIPE_SIZE;
72
73 /**
74 * The circular buffer into which incoming data is placed.
75 * @since JDK1.1
76 */
77 protected byte buffer[];
78
79 /**
80 * The index of the position in the circular buffer at which the
81 * next byte of data will be stored when received from the connected
82 * piped output stream. <code>in<0</code> implies the buffer is empty,
83 * <code>in==out</code> implies the buffer is full
84 * @since JDK1.1
85 */
86 protected int in = -1;
87
88 /**
89 * The index of the position in the circular buffer at which the next
90 * byte of data will be read by this piped input stream.
91 * @since JDK1.1
92 */
93 protected int out = 0;
94
95 /**
96 * Creates a <code>PipedInputStream</code> so
97 * that it is connected to the piped output
98 * stream <code>src</code>. Data bytes written
99 * to <code>src</code> will then be available
100 * as input from this stream.
101 *
102 * @param src the stream to connect to.
103 * @exception IOException if an I/O error occurs.
104 */
105 public PipedInputStream(PipedOutputStream src) throws IOException {
106 this(src, DEFAULT_PIPE_SIZE);
107 }
108
109 /**
110 * Creates a <code>PipedInputStream</code> so that it is
111 * connected to the piped output stream
112 * <code>src</code> and uses the specified pipe size for
113 * the pipe's buffer.
114 * Data bytes written to <code>src</code> will then
115 * be available as input from this stream.
116 *
117 * @param src the stream to connect to.
118 * @param pipeSize the size of the pipe's buffer.
119 * @exception IOException if an I/O error occurs.
120 * @exception IllegalArgumentException if <code>pipeSize <= 0</code>.
121 * @since 1.6
122 */
123 public PipedInputStream(PipedOutputStream src, int pipeSize)
124 throws IOException {
125 initPipe(pipeSize);
126 connect(src);
127 }
128
129 /**
130 * Creates a <code>PipedInputStream</code> so
131 * that it is not yet {@linkplain #connect(java.io.PipedOutputStream)
132 * connected}.
133 * It must be {@linkplain java.io.PipedOutputStream#connect(
134 * java.io.PipedInputStream) connected} to a
135 * <code>PipedOutputStream</code> before being used.
136 */
137 public PipedInputStream() {
138 initPipe(DEFAULT_PIPE_SIZE);
139 }
140
141 /**
142 * Creates a <code>PipedInputStream</code> so that it is not yet
143 * {@linkplain #connect(java.io.PipedOutputStream) connected} and
144 * uses the specified pipe size for the pipe's buffer.
145 * It must be {@linkplain java.io.PipedOutputStream#connect(
146 * java.io.PipedInputStream)
147 * connected} to a <code>PipedOutputStream</code> before being used.
148 *
149 * @param pipeSize the size of the pipe's buffer.
150 * @exception IllegalArgumentException if <code>pipeSize <= 0</code>.
151 * @since 1.6
152 */
153 public PipedInputStream(int pipeSize) {
154 initPipe(pipeSize);
155 }
156
157 private void initPipe(int pipeSize) {
158 if (pipeSize <= 0) {
159 throw new IllegalArgumentException("Pipe Size <= 0");
160 }
161 buffer = new byte[pipeSize];
162 }
163
164 /**
165 * Causes this piped input stream to be connected
166 * to the piped output stream <code>src</code>.
167 * If this object is already connected to some
168 * other piped output stream, an <code>IOException</code>
169 * is thrown.
170 * <p>
171 * If <code>src</code> is an
172 * unconnected piped output stream and <code>snk</code>
173 * is an unconnected piped input stream, they
174 * may be connected by either the call:
175 * <p>
176 * <pre><code>snk.connect(src)</code> </pre>
177 * <p>
178 * or the call:
179 * <p>
180 * <pre><code>src.connect(snk)</code> </pre>
181 * <p>
182 * The two
183 * calls have the same effect.
184 *
185 * @param src The piped output stream to connect to.
186 * @exception IOException if an I/O error occurs.
187 */
188 public void connect(PipedOutputStream src) throws IOException {
189 src.connect(this);
190 }
191
192 /**
193 * Receives a byte of data. This method will block if no input is
194 * available.
195 * @param b the byte being received
196 * @exception IOException If the pipe is <a href=#BROKEN> <code>broken</code></a>,
197 * {@link #connect(java.io.PipedOutputStream) unconnected},
198 * closed, or if an I/O error occurs.
199 * @since JDK1.1
200 */
201 protected synchronized void receive(int b) throws IOException {
202 checkStateForReceive();
203 writeSide = Thread.currentThread();
204 if (in == out)
205 awaitSpace();
206 if (in < 0) {
207 in = 0;
208 out = 0;
209 }
210 buffer[in++] = (byte)(b & 0xFF);
211 if (in >= buffer.length) {
212 in = 0;
213 }
214 }
215
216 /**
217 * Receives data into an array of bytes. This method will
218 * block until some input is available.
219 * @param b the buffer into which the data is received
220 * @param off the start offset of the data
221 * @param len the maximum number of bytes received
222 * @exception IOException If the pipe is <a href=#BROKEN> broken</a>,
223 * {@link #connect(java.io.PipedOutputStream) unconnected},
224 * closed,or if an I/O error occurs.
225 */
226 synchronized void receive(byte b[], int off, int len) throws IOException {
227 checkStateForReceive();
228 writeSide = Thread.currentThread();
229 int bytesToTransfer = len;
230 while (bytesToTransfer > 0) {
231 if (in == out)
232 awaitSpace();
233 int nextTransferAmount = 0;
234 if (out < in) {
235 nextTransferAmount = buffer.length - in;
236 } else if (in < out) {
237 if (in == -1) {
238 in = out = 0;
239 nextTransferAmount = buffer.length - in;
240 } else {
241 nextTransferAmount = out - in;
242 }
243 }
244 if (nextTransferAmount > bytesToTransfer)
245 nextTransferAmount = bytesToTransfer;
246 assert(nextTransferAmount > 0);
247 System.arraycopy(b, off, buffer, in, nextTransferAmount);
248 bytesToTransfer -= nextTransferAmount;
249 off += nextTransferAmount;
250 in += nextTransferAmount;
251 if (in >= buffer.length) {
252 in = 0;
253 }
254 }
255 }
256
257 private void checkStateForReceive() throws IOException {
258 if (!connected) {
259 throw new IOException("Pipe not connected");
260 } else if (closedByWriter || closedByReader) {
261 throw new IOException("Pipe closed");
262 } else if (readSide != null && !readSide.isAlive()) {
263 throw new IOException("Read end dead");
264 }
265 }
266
267 private void awaitSpace() throws IOException {
268 while (in == out) {
269 checkStateForReceive();
270
271 /* full: kick any waiting readers */
272 notifyAll();
273 try {
274 wait(1000);
275 } catch (InterruptedException ex) {
276 throw new java.io.InterruptedIOException();
277 }
278 }
279 }
280
281 /**
282 * Notifies all waiting threads that the last byte of data has been
283 * received.
284 */
285 synchronized void receivedLast() {
286 closedByWriter = true;
287 notifyAll();
288 }
289
290 /**
291 * Reads the next byte of data from this piped input stream. The
292 * value byte is returned as an <code>int</code> in the range
293 * <code>0</code> to <code>255</code>.
294 * This method blocks until input data is available, the end of the
295 * stream is detected, or an exception is thrown.
296 *
297 * @return the next byte of data, or <code>-1</code> if the end of the
298 * stream is reached.
299 * @exception IOException if the pipe is
300 * {@link #connect(java.io.PipedOutputStream) unconnected},
301 * <a href=#BROKEN> <code>broken</code></a>, closed,
302 * or if an I/O error occurs.
303 */
304 public synchronized int read() throws IOException {
305 if (!connected) {
306 throw new IOException("Pipe not connected");
307 } else if (closedByReader) {
308 throw new IOException("Pipe closed");
309 } else if (writeSide != null && !writeSide.isAlive()
310 && !closedByWriter && (in < 0)) {
311 throw new IOException("Write end dead");
312 }
313
314 readSide = Thread.currentThread();
315 int trials = 2;
316 while (in < 0) {
317 if (closedByWriter) {
318 /* closed by writer, return EOF */
319 return -1;
320 }
321 if ((writeSide != null) && (!writeSide.isAlive()) && (--trials < 0)) {
322 throw new IOException("Pipe broken");
323 }
324 /* might be a writer waiting */
325 notifyAll();
326 try {
327 wait(1000);
328 } catch (InterruptedException ex) {
329 throw new java.io.InterruptedIOException();
330 }
331 }
332 int ret = buffer[out++] & 0xFF;
333 if (out >= buffer.length) {
334 out = 0;
335 }
336 if (in == out) {
337 /* now empty */
338 in = -1;
339 }
340
341 return ret;
342 }
343
344 /**
345 * Reads up to <code>len</code> bytes of data from this piped input
346 * stream into an array of bytes. Less than <code>len</code> bytes
347 * will be read if the end of the data stream is reached or if
348 * <code>len</code> exceeds the pipe's buffer size.
349 * If <code>len </code> is zero, then no bytes are read and 0 is returned;
350 * otherwise, the method blocks until at least 1 byte of input is
351 * available, end of the stream has been detected, or an exception is
352 * thrown.
353 *
354 * @param b the buffer into which the data is read.
355 * @param off the start offset in the destination array <code>b</code>
356 * @param len the maximum number of bytes read.
357 * @return the total number of bytes read into the buffer, or
358 * <code>-1</code> if there is no more data because the end of
359 * the stream has been reached.
360 * @exception NullPointerException If <code>b</code> is <code>null</code>.
361 * @exception IndexOutOfBoundsException If <code>off</code> is negative,
362 * <code>len</code> is negative, or <code>len</code> is greater than
363 * <code>b.length - off</code>
364 * @exception IOException if the pipe is <a href=#BROKEN> <code>broken</code></a>,
365 * {@link #connect(java.io.PipedOutputStream) unconnected},
366 * closed, or if an I/O error occurs.
367 */
368 public synchronized int read(byte b[], int off, int len) throws IOException {
369 if (b == null) {
370 throw new NullPointerException();
371 } else if (off < 0 || len < 0 || len > b.length - off) {
372 throw new IndexOutOfBoundsException();
373 } else if (len == 0) {
374 return 0;
375 }
376
377 /* possibly wait on the first character */
378 int c = read();
379 if (c < 0) {
380 return -1;
381 }
382 b[off] = (byte) c;
383 int rlen = 1;
384 while ((in >= 0) && (len > 1)) {
385
386 int available;
387
388 if (in > out) {
389 available = Math.min((buffer.length - out), (in - out));
390 } else {
391 available = buffer.length - out;
392 }
393
394 // A byte is read beforehand outside the loop
395 if (available > (len - 1)) {
396 available = len - 1;
397 }
398 System.arraycopy(buffer, out, b, off + rlen, available);
399 out += available;
400 rlen += available;
401 len -= available;
402
403 if (out >= buffer.length) {
404 out = 0;
405 }
406 if (in == out) {
407 /* now empty */
408 in = -1;
409 }
410 }
411 return rlen;
412 }
413
414 /**
415 * Returns the number of bytes that can be read from this input
416 * stream without blocking.
417 *
418 * @return the number of bytes that can be read from this input stream
419 * without blocking, or {@code 0} if this input stream has been
420 * closed by invoking its {@link #close()} method, or if the pipe
421 * is {@link #connect(java.io.PipedOutputStream) unconnected}, or
422 * <a href=#BROKEN> <code>broken</code></a>.
423 *
424 * @exception IOException if an I/O error occurs.
425 * @since JDK1.0.2
426 */
427 public synchronized int available() throws IOException {
428 if(in < 0)
429 return 0;
430 else if(in == out)
431 return buffer.length;
432 else if (in > out)
433 return in - out;
434 else
435 return in + buffer.length - out;
436 }
437
438 /**
439 * Closes this piped input stream and releases any system resources
440 * associated with the stream.
441 *
442 * @exception IOException if an I/O error occurs.
443 */
444 public void close() throws IOException {
445 closedByReader = true;
446 synchronized (this) {
447 in = -1;
448 }
449 }
450 }