Source code: rcs/utils/CorrectedPipedInputStream.java
1 package rcs.utils;
2
3
4 import java.io.InputStream;
5 import java.io.IOException;
6
7 import rcs.utils.CorrectedPipedOutputStream;
8
9
10
11 /**
12 * This class provides the same interface as java.io.PipedInputStream
13 * except that it corrects the problem that when java.io.PipedOutputStream
14 * has 1k or more written to it, it blocks until the some of the data is read
15 * from the input pipe before more can be written.
16 * CorrectedPipedInputStream/CorrectedPipedOutputStream only block
17 * for mutual exclusion but will allow any amount of data(atleast
18 * until you run out of memory) to be written to the pipe without waiting
19 * for a read.
20 *
21 * <pre>
22 * Related Documentation:
23 * <A HREF="http://isd.cme.nist.gov/proj/rcs_lib">RCS Library</a>, <A HREF="http://isd.cme.nist.gov/proj/rcs_lib/NMLjava.html">NML Programmers Guide (Java Version)</a>
24 *
25 * Source Code:
26 * <A HREF="CorrectedPipedInputStream.java">CorrectedPipedInputStream.java</a>
27 *
28 * </pre>
29 *
30 * @see rcs.utils.CorrectedPipedOutputStream
31 *
32 * @author Will Shackleford -- <A HREF="mailto:shackle@cme.nist.gov">shackle@cme.nist.gov</a>
33 */
34 public class CorrectedPipedInputStream extends InputStream
35 {
36 CorrectedPipedOutputStream out_stream = null;
37 CorrectedPipeData pipe_data = null;
38 static final public boolean debug_on = false;
39
40 // Constructors
41 public CorrectedPipedInputStream()
42 {
43 out_stream = null;
44 pipe_data = new CorrectedPipeData();
45 }
46
47 public CorrectedPipedInputStream(CorrectedPipedOutputStream out)
48 {
49 out_stream = out;
50 pipe_data = out_stream.pipe_data;
51 out_stream.in_stream = this;
52 }
53 // Methods
54 public int available()
55 {
56 if(null == pipe_data)
57 {
58 return 0;
59 }
60 if(null == pipe_data.buffer)
61 {
62 return 0;
63 }
64 if( pipe_data.length > pipe_data.buffer.length)
65 {
66 return 0;
67 }
68 if(pipe_data.offset < pipe_data.length)
69 {
70 return 0;
71 }
72 return pipe_data.length - pipe_data.offset;
73
74 }
75
76 public void close()
77 {
78 }
79
80 public int read() throws IOException
81 {
82 if(null == pipe_data)
83 {
84 throw new IOException();
85 }
86 while(true)
87 {
88 if(null != pipe_data.buffer)
89 {
90 if(pipe_data.buffer.length > 0 &&
91 pipe_data.offset < pipe_data.length)
92 {
93 break;
94 }
95 }
96 try
97 {
98 pipe_data.WaitForData();
99 }
100 catch(Exception e)
101 {
102 e.printStackTrace();
103 throw new IOException();
104 }
105 }
106 if(pipe_data.offset < 0)
107 {
108 throw new IOException();
109 }
110 if(pipe_data.offset > pipe_data.length)
111 {
112 throw new IOException();
113 }
114 if(pipe_data.length > pipe_data.buffer.length)
115 {
116 throw new IOException();
117 }
118 int retval = 0;
119 synchronized(pipe_data)
120 {
121 retval = (int) pipe_data.buffer[pipe_data.offset];
122 if(retval < 0)
123 {
124 retval += 256;
125 }
126 pipe_data.offset++;
127 }
128 boolean data_posted = false;
129 pipe_data.PostNewData();
130 if(debug_on)
131 {
132 System.out.println("CorrectedPipedInputStream.read() returning "+retval+". (off = "+pipe_data.offset+", len = "+pipe_data.length+")");
133 }
134 return retval;
135 }
136
137 public int read(byte b[]) throws IOException
138 {
139 return read(b,0,b.length);
140 }
141
142 public int read(byte b[], int off, int len) throws IOException
143 {
144 int bytes_read = 0;
145 if(debug_on)
146 {
147 System.out.println("CorrectedPipedInputStream: pipe_data.buffer = "+new String(pipe_data.buffer,off,len) +", pipe_data.offset = "+pipe_data.offset+", pipe_data.length = "+pipe_data.length);
148 }
149 if(null == pipe_data)
150 {
151 throw new IOException();
152 }
153 while(true)
154 {
155 if(null != pipe_data.buffer)
156 {
157 if(pipe_data.buffer.length > 0 &&
158 pipe_data.offset < pipe_data.length)
159 {
160 break;
161 }
162 }
163 try
164 {
165 pipe_data.WaitForData();
166 }
167 catch(Exception e)
168 {
169 e.printStackTrace();
170 throw new IOException();
171 }
172 }
173 if(pipe_data.offset < 0)
174 {
175 throw new IOException();
176 }
177 if(pipe_data.offset > pipe_data.length)
178 {
179 throw new IOException();
180 }
181 if(pipe_data.length > pipe_data.buffer.length)
182 {
183 throw new IOException();
184 }
185 synchronized(pipe_data)
186 {
187 while(bytes_read < len && pipe_data.offset < pipe_data.length)
188 {
189 b[bytes_read+off] = pipe_data.buffer[pipe_data.offset];
190 pipe_data.offset++;
191 bytes_read++;
192 }
193 }
194 pipe_data.PostNewData();
195 if(debug_on)
196 {
197 System.out.println("CorrectedPipedInputStream: b = "+new String(b,off,len) +", off = "+off+", len = "+len+", bytes_read = "+bytes_read);
198 }
199
200 return bytes_read;
201 }
202 }
203
204
205
206
207
208
209