1 /*
2 * SSHTools - Java SSH2 API
3 *
4 * Copyright (C) 2002-2003 Lee David Painter and Contributors.
5 *
6 * Contributions made by:
7 *
8 * Brett Smith
9 * Richard Pernavas
10 * Erwin Bolwidt
11 *
12 * This program is free software; you can redistribute it and/or
13 * modify it under the terms of the GNU General Public License
14 * as published by the Free Software Foundation; either version 2
15 * of the License, or (at your option) any later version.
16 *
17 * This program is distributed in the hope that it will be useful,
18 * but WITHOUT ANY WARRANTY; without even the implied warranty of
19 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20 * GNU General Public License for more details.
21 *
22 * You should have received a copy of the GNU General Public License
23 * along with this program; if not, write to the Free Software
24 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
25 */
26 package com.sshtools.j2ssh.io;
27
28 import org.apache.commons.logging.Log;
29 import org.apache.commons.logging.LogFactory;
30
31 import java.io.IOException;
32 import java.io.InputStream;
33 import java.io.InterruptedIOException;
34 import java.io.OutputStream;
35
36
37 /**
38 * <p>
39 * This class provides an alternative method of storing data, used within the
40 * API where Piped Streams could have been used. We found that Piped streams
41 * would lock if a thread attempted to read to data when the OutputStream attached
42 * was not being read; since we have no control over when the user will actually
43 * read the data, this behaviour led us to develop this dynamic buffer which
44 * will automatically grow if the buffer is full.
45 * </p>
46 * *
47 * @author Lee David Painter
48 * @version $Revision: 1.20 $
49 */
50 public class DynamicBuffer {
51 private static Log log = LogFactory.getLog(DynamicBuffer.class);
52
53 /** Buffer size when the dynamic buffer is opened */
54 protected static final int DEFAULT_BUFFER_SIZE = 32768;
55
56 /** The buffer */
57 protected byte[] buf;
58
59 /** The current write position */
60 protected int writepos = 0;
61
62 /** The current read position */
63 protected int readpos = 0;
64
65 /** This buffers InputStream */
66 protected InputStream in;
67
68 /** This buffers OutputStream */
69 protected OutputStream out;
70 private boolean closed = false;
71 private int interrupt = 5000;
72
73 /**
74 * Creates a new DynamicBuffer object.
75 */
76 public DynamicBuffer() {
77 buf = new byte[DEFAULT_BUFFER_SIZE];
78 in = new DynamicBufferInputStream();
79 out = new DynamicBufferOutputStream();
80 }
81
82 /**
83 * Get the InputStream of this buffer. Use the stream to read data from
84 * this buffer.
85 *
86 * @return
87 */
88 public InputStream getInputStream() {
89 return in;
90 }
91
92 /**
93 * Get the OutputStream of the buffer. Use this stream to write data to
94 * the buffer.
95 *
96 * @return
97 */
98 public OutputStream getOutputStream() {
99 return out;
100 }
101
102 private synchronized void verifyBufferSize(int count) {
103 // If there is not enough data in the buffer, then first attempt to
104 // move the unread data back to the beginning
105 if (count > (buf.length - writepos)) {
106 System.arraycopy(buf, readpos, buf, 0, writepos - readpos);
107 writepos -= readpos;
108 readpos = 0;
109 }
110
111 // Now double check and increase the buffer size if necersary
112 if (count > (buf.length - writepos)) {
113 byte[] tmp = new byte[buf.length + DEFAULT_BUFFER_SIZE];
114 System.arraycopy(buf, 0, tmp, 0, writepos - readpos);
115 buf = tmp;
116 }
117 }
118
119 /**
120 * Return the number of bytes of data available to be read from the buffer
121 * @return
122 */
123 protected synchronized int available() {
124 return writepos - readpos;
125 }
126
127 private synchronized void block() throws InterruptedException {
128 if (log.isDebugEnabled()) {
129 log.debug("Buffer size: " + String.valueOf(buf.length));
130 log.debug("Unread data: " + String.valueOf(writepos - readpos));
131 }
132
133 // Block and wait for more data
134 if (!closed) {
135 while ((readpos >= writepos) && !closed) {
136 wait(interrupt);
137 }
138 }
139 }
140
141 /**
142 * Closes the buffer
143 */
144 public synchronized void close() {
145 if (!closed) {
146 closed = true;
147 notifyAll();
148 }
149 }
150
151 /**
152 * Write a byte array to the buffer
153 *
154 * @param b
155 *
156 * @throws IOException
157 */
158 protected synchronized void write(int b) throws IOException {
159 if (closed) {
160 throw new IOException("The buffer is closed");
161 }
162
163 verifyBufferSize(1);
164 buf[writepos] = (byte) b;
165 writepos++;
166 notifyAll();
167 }
168
169 /**
170 *
171 *
172 * @param data
173 * @param offset
174 * @param len
175 *
176 * @throws IOException
177 */
178 protected synchronized void write(byte[] data, int offset, int len)
179 throws IOException {
180 if (closed) {
181 throw new IOException("The buffer is closed");
182 }
183
184 verifyBufferSize(len);
185 System.arraycopy(data, offset, buf, writepos, len);
186 writepos += len;
187 notifyAll();
188 }
189
190 public void setBlockInterrupt(int interrupt) {
191 this.interrupt = interrupt;
192 }
193
194 /**
195 * Read a byte from the buffer
196 *
197 * @return
198 *
199 * @throws IOException
200 * @throws InterruptedIOException
201 */
202 protected synchronized int read() throws IOException {
203 try {
204 block();
205 } catch (InterruptedException ex) {
206 throw new InterruptedIOException(
207 "The blocking operation was interrupted");
208 }
209
210 if (closed && (available() <= 0)) {
211 return -1;
212 }
213
214 return (int) buf[readpos++];
215 }
216
217 /**
218 * Read a byte array from the buffer
219 *
220 * @param data
221 * @param offset
222 * @param len
223 *
224 * @return
225 *
226 * @throws IOException
227 * @throws InterruptedIOException
228 */
229 protected synchronized int read(byte[] data, int offset, int len)
230 throws IOException {
231 try {
232 block();
233 } catch (InterruptedException ex) {
234 throw new InterruptedIOException(
235 "The blocking operation was interrupted");
236 }
237
238 if (closed && (available() <= 0)) {
239 return -1;
240 }
241
242 int read = (len > (writepos - readpos)) ? (writepos - readpos) : len;
243 System.arraycopy(buf, readpos, data, offset, read);
244 readpos += read;
245
246 return read;
247 }
248
249 /**
250 * Flush data
251 *
252 * @throws IOException
253 */
254 protected synchronized void flush() throws IOException {
255 notifyAll();
256 }
257
258 class DynamicBufferInputStream extends InputStream {
259 public int read() throws IOException {
260 return DynamicBuffer.this.read();
261 }
262
263 public int read(byte[] data, int offset, int len)
264 throws IOException {
265 return DynamicBuffer.this.read(data, offset, len);
266 }
267
268 public int available() {
269 return DynamicBuffer.this.available();
270 }
271
272 public void close() {
273 DynamicBuffer.this.close();
274 }
275 }
276
277 class DynamicBufferOutputStream extends OutputStream {
278 public void write(int b) throws IOException {
279 DynamicBuffer.this.write(b);
280 }
281
282 public void write(byte[] data, int offset, int len)
283 throws IOException {
284 DynamicBuffer.this.write(data, offset, len);
285 }
286
287 public void flush() throws IOException {
288 DynamicBuffer.this.flush();
289 }
290
291 public void close() {
292 DynamicBuffer.this.close();
293 }
294 }
295 }