Home » openjdk-7 » java » io » [javadoc | source]

    1   /*
    2    * Copyright (c) 1995, 2006, Oracle and/or its affiliates. All rights reserved.
    3    * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
    4    *
    5    * This code is free software; you can redistribute it and/or modify it
    6    * under the terms of the GNU General Public License version 2 only, as
    7    * published by the Free Software Foundation.  Oracle designates this
    8    * particular file as subject to the "Classpath" exception as provided
    9    * by Oracle in the LICENSE file that accompanied this code.
   10    *
   11    * This code is distributed in the hope that it will be useful, but WITHOUT
   12    * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
   13    * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
   14    * version 2 for more details (a copy is included in the LICENSE file that
   15    * accompanied this code).
   16    *
   17    * You should have received a copy of the GNU General Public License version
   18    * 2 along with this work; if not, write to the Free Software Foundation,
   19    * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
   20    *
   21    * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
   22    * or visit www.oracle.com if you need additional information or have any
   23    * questions.
   24    */
   25   
   26   package java.io;
   27   
   28   /**
   29    * A piped input stream should be connected
   30    * to a piped output stream; the piped  input
   31    * stream then provides whatever data bytes
   32    * are written to the piped output  stream.
   33    * Typically, data is read from a <code>PipedInputStream</code>
   34    * object by one thread  and data is written
   35    * to the corresponding <code>PipedOutputStream</code>
   36    * by some  other thread. Attempting to use
   37    * both objects from a single thread is not
   38    * recommended, as it may deadlock the thread.
   39    * The piped input stream contains a buffer,
   40    * decoupling read operations from write operations,
   41    * within limits.
   42    * A pipe is said to be <a name=BROKEN> <i>broken</i> </a> if a
   43    * thread that was providing data bytes to the connected
   44    * piped output stream is no longer alive.
   45    *
   46    * @author  James Gosling
   47    * @see     java.io.PipedOutputStream
   48    * @since   JDK1.0
   49    */
   50   public class PipedInputStream extends InputStream {
   51       boolean closedByWriter = false;
   52       volatile boolean closedByReader = false;
   53       boolean connected = false;
   54   
   55           /* REMIND: identification of the read and write sides needs to be
   56              more sophisticated.  Either using thread groups (but what about
   57              pipes within a thread?) or using finalization (but it may be a
   58              long time until the next GC). */
   59       Thread readSide;
   60       Thread writeSide;
   61   
   62       private static final int DEFAULT_PIPE_SIZE = 1024;
   63   
   64       /**
   65        * The default size of the pipe's circular input buffer.
   66        * @since   JDK1.1
   67        */
   68       // This used to be a constant before the pipe size was allowed
   69       // to change. This field will continue to be maintained
   70       // for backward compatibility.
   71       protected static final int PIPE_SIZE = DEFAULT_PIPE_SIZE;
   72   
   73       /**
   74        * The circular buffer into which incoming data is placed.
   75        * @since   JDK1.1
   76        */
   77       protected byte buffer[];
   78   
   79       /**
   80        * The index of the position in the circular buffer at which the
   81        * next byte of data will be stored when received from the connected
   82        * piped output stream. <code>in&lt;0</code> implies the buffer is empty,
   83        * <code>in==out</code> implies the buffer is full
   84        * @since   JDK1.1
   85        */
   86       protected int in = -1;
   87   
   88       /**
   89        * The index of the position in the circular buffer at which the next
   90        * byte of data will be read by this piped input stream.
   91        * @since   JDK1.1
   92        */
   93       protected int out = 0;
   94   
   95       /**
   96        * Creates a <code>PipedInputStream</code> so
   97        * that it is connected to the piped output
   98        * stream <code>src</code>. Data bytes written
   99        * to <code>src</code> will then be  available
  100        * as input from this stream.
  101        *
  102        * @param      src   the stream to connect to.
  103        * @exception  IOException  if an I/O error occurs.
  104        */
  105       public PipedInputStream(PipedOutputStream src) throws IOException {
  106           this(src, DEFAULT_PIPE_SIZE);
  107       }
  108   
  109       /**
  110        * Creates a <code>PipedInputStream</code> so that it is
  111        * connected to the piped output stream
  112        * <code>src</code> and uses the specified pipe size for
  113        * the pipe's buffer.
  114        * Data bytes written to <code>src</code> will then
  115        * be available as input from this stream.
  116        *
  117        * @param      src   the stream to connect to.
  118        * @param      pipeSize the size of the pipe's buffer.
  119        * @exception  IOException  if an I/O error occurs.
  120        * @exception  IllegalArgumentException if <code>pipeSize <= 0</code>.
  121        * @since      1.6
  122        */
  123       public PipedInputStream(PipedOutputStream src, int pipeSize)
  124               throws IOException {
  125            initPipe(pipeSize);
  126            connect(src);
  127       }
  128   
  129       /**
  130        * Creates a <code>PipedInputStream</code> so
  131        * that it is not yet {@linkplain #connect(java.io.PipedOutputStream)
  132        * connected}.
  133        * It must be {@linkplain java.io.PipedOutputStream#connect(
  134        * java.io.PipedInputStream) connected} to a
  135        * <code>PipedOutputStream</code> before being used.
  136        */
  137       public PipedInputStream() {
  138           initPipe(DEFAULT_PIPE_SIZE);
  139       }
  140   
  141       /**
  142        * Creates a <code>PipedInputStream</code> so that it is not yet
  143        * {@linkplain #connect(java.io.PipedOutputStream) connected} and
  144        * uses the specified pipe size for the pipe's buffer.
  145        * It must be {@linkplain java.io.PipedOutputStream#connect(
  146        * java.io.PipedInputStream)
  147        * connected} to a <code>PipedOutputStream</code> before being used.
  148        *
  149        * @param      pipeSize the size of the pipe's buffer.
  150        * @exception  IllegalArgumentException if <code>pipeSize <= 0</code>.
  151        * @since      1.6
  152        */
  153       public PipedInputStream(int pipeSize) {
  154           initPipe(pipeSize);
  155       }
  156   
  157       private void initPipe(int pipeSize) {
  158            if (pipeSize <= 0) {
  159               throw new IllegalArgumentException("Pipe Size <= 0");
  160            }
  161            buffer = new byte[pipeSize];
  162       }
  163   
  164       /**
  165        * Causes this piped input stream to be connected
  166        * to the piped  output stream <code>src</code>.
  167        * If this object is already connected to some
  168        * other piped output  stream, an <code>IOException</code>
  169        * is thrown.
  170        * <p>
  171        * If <code>src</code> is an
  172        * unconnected piped output stream and <code>snk</code>
  173        * is an unconnected piped input stream, they
  174        * may be connected by either the call:
  175        * <p>
  176        * <pre><code>snk.connect(src)</code> </pre>
  177        * <p>
  178        * or the call:
  179        * <p>
  180        * <pre><code>src.connect(snk)</code> </pre>
  181        * <p>
  182        * The two
  183        * calls have the same effect.
  184        *
  185        * @param      src   The piped output stream to connect to.
  186        * @exception  IOException  if an I/O error occurs.
  187        */
  188       public void connect(PipedOutputStream src) throws IOException {
  189           src.connect(this);
  190       }
  191   
  192       /**
  193        * Receives a byte of data.  This method will block if no input is
  194        * available.
  195        * @param b the byte being received
  196        * @exception IOException If the pipe is <a href=#BROKEN> <code>broken</code></a>,
  197        *          {@link #connect(java.io.PipedOutputStream) unconnected},
  198        *          closed, or if an I/O error occurs.
  199        * @since     JDK1.1
  200        */
  201       protected synchronized void receive(int b) throws IOException {
  202           checkStateForReceive();
  203           writeSide = Thread.currentThread();
  204           if (in == out)
  205               awaitSpace();
  206           if (in < 0) {
  207               in = 0;
  208               out = 0;
  209           }
  210           buffer[in++] = (byte)(b & 0xFF);
  211           if (in >= buffer.length) {
  212               in = 0;
  213           }
  214       }
  215   
  216       /**
  217        * Receives data into an array of bytes.  This method will
  218        * block until some input is available.
  219        * @param b the buffer into which the data is received
  220        * @param off the start offset of the data
  221        * @param len the maximum number of bytes received
  222        * @exception IOException If the pipe is <a href=#BROKEN> broken</a>,
  223        *           {@link #connect(java.io.PipedOutputStream) unconnected},
  224        *           closed,or if an I/O error occurs.
  225        */
  226       synchronized void receive(byte b[], int off, int len)  throws IOException {
  227           checkStateForReceive();
  228           writeSide = Thread.currentThread();
  229           int bytesToTransfer = len;
  230           while (bytesToTransfer > 0) {
  231               if (in == out)
  232                   awaitSpace();
  233               int nextTransferAmount = 0;
  234               if (out < in) {
  235                   nextTransferAmount = buffer.length - in;
  236               } else if (in < out) {
  237                   if (in == -1) {
  238                       in = out = 0;
  239                       nextTransferAmount = buffer.length - in;
  240                   } else {
  241                       nextTransferAmount = out - in;
  242                   }
  243               }
  244               if (nextTransferAmount > bytesToTransfer)
  245                   nextTransferAmount = bytesToTransfer;
  246               assert(nextTransferAmount > 0);
  247               System.arraycopy(b, off, buffer, in, nextTransferAmount);
  248               bytesToTransfer -= nextTransferAmount;
  249               off += nextTransferAmount;
  250               in += nextTransferAmount;
  251               if (in >= buffer.length) {
  252                   in = 0;
  253               }
  254           }
  255       }
  256   
  257       private void checkStateForReceive() throws IOException {
  258           if (!connected) {
  259               throw new IOException("Pipe not connected");
  260           } else if (closedByWriter || closedByReader) {
  261               throw new IOException("Pipe closed");
  262           } else if (readSide != null && !readSide.isAlive()) {
  263               throw new IOException("Read end dead");
  264           }
  265       }
  266   
  267       private void awaitSpace() throws IOException {
  268           while (in == out) {
  269               checkStateForReceive();
  270   
  271               /* full: kick any waiting readers */
  272               notifyAll();
  273               try {
  274                   wait(1000);
  275               } catch (InterruptedException ex) {
  276                   throw new java.io.InterruptedIOException();
  277               }
  278           }
  279       }
  280   
  281       /**
  282        * Notifies all waiting threads that the last byte of data has been
  283        * received.
  284        */
  285       synchronized void receivedLast() {
  286           closedByWriter = true;
  287           notifyAll();
  288       }
  289   
  290       /**
  291        * Reads the next byte of data from this piped input stream. The
  292        * value byte is returned as an <code>int</code> in the range
  293        * <code>0</code> to <code>255</code>.
  294        * This method blocks until input data is available, the end of the
  295        * stream is detected, or an exception is thrown.
  296        *
  297        * @return     the next byte of data, or <code>-1</code> if the end of the
  298        *             stream is reached.
  299        * @exception  IOException  if the pipe is
  300        *           {@link #connect(java.io.PipedOutputStream) unconnected},
  301        *           <a href=#BROKEN> <code>broken</code></a>, closed,
  302        *           or if an I/O error occurs.
  303        */
  304       public synchronized int read()  throws IOException {
  305           if (!connected) {
  306               throw new IOException("Pipe not connected");
  307           } else if (closedByReader) {
  308               throw new IOException("Pipe closed");
  309           } else if (writeSide != null && !writeSide.isAlive()
  310                      && !closedByWriter && (in < 0)) {
  311               throw new IOException("Write end dead");
  312           }
  313   
  314           readSide = Thread.currentThread();
  315           int trials = 2;
  316           while (in < 0) {
  317               if (closedByWriter) {
  318                   /* closed by writer, return EOF */
  319                   return -1;
  320               }
  321               if ((writeSide != null) && (!writeSide.isAlive()) && (--trials < 0)) {
  322                   throw new IOException("Pipe broken");
  323               }
  324               /* might be a writer waiting */
  325               notifyAll();
  326               try {
  327                   wait(1000);
  328               } catch (InterruptedException ex) {
  329                   throw new java.io.InterruptedIOException();
  330               }
  331           }
  332           int ret = buffer[out++] & 0xFF;
  333           if (out >= buffer.length) {
  334               out = 0;
  335           }
  336           if (in == out) {
  337               /* now empty */
  338               in = -1;
  339           }
  340   
  341           return ret;
  342       }
  343   
  344       /**
  345        * Reads up to <code>len</code> bytes of data from this piped input
  346        * stream into an array of bytes. Less than <code>len</code> bytes
  347        * will be read if the end of the data stream is reached or if
  348        * <code>len</code> exceeds the pipe's buffer size.
  349        * If <code>len </code> is zero, then no bytes are read and 0 is returned;
  350        * otherwise, the method blocks until at least 1 byte of input is
  351        * available, end of the stream has been detected, or an exception is
  352        * thrown.
  353        *
  354        * @param      b     the buffer into which the data is read.
  355        * @param      off   the start offset in the destination array <code>b</code>
  356        * @param      len   the maximum number of bytes read.
  357        * @return     the total number of bytes read into the buffer, or
  358        *             <code>-1</code> if there is no more data because the end of
  359        *             the stream has been reached.
  360        * @exception  NullPointerException If <code>b</code> is <code>null</code>.
  361        * @exception  IndexOutOfBoundsException If <code>off</code> is negative,
  362        * <code>len</code> is negative, or <code>len</code> is greater than
  363        * <code>b.length - off</code>
  364        * @exception  IOException if the pipe is <a href=#BROKEN> <code>broken</code></a>,
  365        *           {@link #connect(java.io.PipedOutputStream) unconnected},
  366        *           closed, or if an I/O error occurs.
  367        */
  368       public synchronized int read(byte b[], int off, int len)  throws IOException {
  369           if (b == null) {
  370               throw new NullPointerException();
  371           } else if (off < 0 || len < 0 || len > b.length - off) {
  372               throw new IndexOutOfBoundsException();
  373           } else if (len == 0) {
  374               return 0;
  375           }
  376   
  377           /* possibly wait on the first character */
  378           int c = read();
  379           if (c < 0) {
  380               return -1;
  381           }
  382           b[off] = (byte) c;
  383           int rlen = 1;
  384           while ((in >= 0) && (len > 1)) {
  385   
  386               int available;
  387   
  388               if (in > out) {
  389                   available = Math.min((buffer.length - out), (in - out));
  390               } else {
  391                   available = buffer.length - out;
  392               }
  393   
  394               // A byte is read beforehand outside the loop
  395               if (available > (len - 1)) {
  396                   available = len - 1;
  397               }
  398               System.arraycopy(buffer, out, b, off + rlen, available);
  399               out += available;
  400               rlen += available;
  401               len -= available;
  402   
  403               if (out >= buffer.length) {
  404                   out = 0;
  405               }
  406               if (in == out) {
  407                   /* now empty */
  408                   in = -1;
  409               }
  410           }
  411           return rlen;
  412       }
  413   
  414       /**
  415        * Returns the number of bytes that can be read from this input
  416        * stream without blocking.
  417        *
  418        * @return the number of bytes that can be read from this input stream
  419        *         without blocking, or {@code 0} if this input stream has been
  420        *         closed by invoking its {@link #close()} method, or if the pipe
  421        *         is {@link #connect(java.io.PipedOutputStream) unconnected}, or
  422        *          <a href=#BROKEN> <code>broken</code></a>.
  423        *
  424        * @exception  IOException  if an I/O error occurs.
  425        * @since   JDK1.0.2
  426        */
  427       public synchronized int available() throws IOException {
  428           if(in < 0)
  429               return 0;
  430           else if(in == out)
  431               return buffer.length;
  432           else if (in > out)
  433               return in - out;
  434           else
  435               return in + buffer.length - out;
  436       }
  437   
  438       /**
  439        * Closes this piped input stream and releases any system resources
  440        * associated with the stream.
  441        *
  442        * @exception  IOException  if an I/O error occurs.
  443        */
  444       public void close()  throws IOException {
  445           closedByReader = true;
  446           synchronized (this) {
  447               in = -1;
  448           }
  449       }
  450   }

Home » openjdk-7 » java » io » [javadoc | source]