Source code: org/apache/axis/attachments/DimeDelimitedInputStream.java
1 /*
2 * Copyright 2001-2004 The Apache Software Foundation.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 package org.apache.axis.attachments;
18
19
20 import org.apache.axis.components.logger.LogFactory;
21 import org.apache.axis.utils.Messages;
22 import org.apache.commons.logging.Log;
23
24 import java.io.IOException;
25
26
27 /**
28 * This class takes the input stream and turns it multiple streams.
29 DIME version 0 format
30 <pre>
31 0 1 2 3
32 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
33 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ ---
34 | VERSION |B|E|C| TYPE_T| OPT_T | OPTIONS_LENGTH | A
35 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
36 | ID_LENGTH | TYPE_LENGTH | Always present 12 bytes
37 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ even on chunked data.
38 | DATA_LENGTH | V
39 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ ---
40 | /
41 / OPTIONS + PADDING /
42 / (absent for version 0) |
43 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
44 | /
45 / ID + PADDING /
46 / |
47 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
48 | /
49 / TYPE + PADDING /
50 / |
51 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
52 | /
53 / DATA + PADDING /
54 / |
55 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
56 </pre>
57 * This implementation of input stream does not support marking operations.
58 *
59 * @author Rick Rineholt
60 */
61 public class DimeDelimitedInputStream extends java.io.FilterInputStream {
62 protected static Log log =
63 LogFactory.getLog(DimeDelimitedInputStream.class.getName());
64
65 java.io.InputStream is = null; //The source input stream.
66 volatile boolean closed = true; //The stream has been closed.
67 boolean theEnd = false; //There are no more streams left.
68 boolean moreChunks = false; //More chunks are a coming!
69 boolean MB = false; //First part of the stream. MUST be SOAP.
70 boolean ME = false; //Last part of stream.
71 DimeTypeNameFormat tnf = null;
72 String type = null;
73 String id = null;
74 long recordLength = 0L; //length of the record.
75 long bytesRead = 0; //How many bytes of the record have been read.
76 int dataPadLength = 0; //How many pad bytes there are.
77 private static byte[] trash = new byte[4];
78 protected int streamNo = 0;
79 protected IOException streamInError = null;
80
81 protected static int streamCount = 0; //number of streams produced.
82
83 protected static synchronized int newStreamNo() {
84 log.debug(Messages.getMessage("streamNo", "" + (streamCount + 1)));
85 return ++streamCount;
86 }
87
88 static boolean isDebugEnabled = false;
89
90 /**
91 * Gets the next stream. From the previous using new buffer reading size.
92 *
93 * @return the dime delmited stream, null if there are no more streams
94 * @throws IOException if there was an error loading the data for the next
95 * stream
96 */
97 synchronized DimeDelimitedInputStream getNextStream()
98 throws IOException
99 {
100 if (null != streamInError) throw streamInError;
101 if (theEnd) return null;
102 if (bytesRead < recordLength || moreChunks) //Stream must be read in succession
103 throw new RuntimeException(Messages.getMessage(
104 "attach.dimeReadFullyError"));
105 dataPadLength -= readPad(dataPadLength);
106
107 //Create an new dime stream that comes after this one.
108 return new DimeDelimitedInputStream(this.is);
109 }
110
111 /**
112 * Create a new dime stream.
113 *
114 * @param is the <code>InputStream</code> to wrap
115 * @throws IOException if anything goes wrong
116 */
117 DimeDelimitedInputStream(java.io.InputStream is) throws IOException {
118 super(null); //we handle everything so this is not necessary, don't won't to hang on to a reference.
119 isDebugEnabled = log.isDebugEnabled();
120 streamNo = newStreamNo();
121 closed = false;
122 this.is = is;
123 readHeader(false);
124 }
125
126 private final int readPad(final int size) throws IOException {
127 if (0 == size) return 0;
128 int read = readFromStream(trash, 0, size);
129
130 if (size != read) {
131 streamInError = new IOException(Messages.getMessage(
132 "attach.dimeNotPaddedCorrectly"));
133 throw streamInError;
134 }
135 return read;
136 }
137
138 private final int readFromStream(final byte[] b) throws IOException {
139 return readFromStream(b, 0, b.length);
140 }
141
142 private final int readFromStream(final byte[] b,
143 final int start, final int length)
144 throws IOException {
145 if (length == 0) return 0;
146
147 int br = 0;
148 int brTotal = 0;
149
150 do {
151 try {
152 br = is.read(b, brTotal + start, length - brTotal);
153 } catch (IOException e) {
154 streamInError = e;
155 throw e;
156 }
157 if (br > 0) brTotal += br;
158 }
159 while (br > -1 && brTotal < length);
160
161 return br > -1 ? brTotal : br;
162 }
163
164 /**
165 * Get the id for this stream part.
166 * @return the id;
167 */
168 public String getContentId() {
169 return id;
170 }
171
172 public DimeTypeNameFormat getDimeTypeNameFormat() {
173 return tnf;
174 }
175
176 /**
177 * Get the type, as read from the header.
178 *
179 * @return the type of this dime
180 */
181
182 public String getType() {
183 return type;
184 }
185
186 /**
187 * Read from the DIME stream.
188 *
189 * @param b is the array to read into.
190 * @param off is the offset
191 * @return the number of bytes read. -1 if endof stream
192 * @throws IOException if data could not be read from the stream
193 */
194 public synchronized int read(byte[] b, final int off,
195 final int len) throws IOException {
196
197 if (closed) {
198 dataPadLength -= readPad(dataPadLength);
199 throw new IOException(Messages.getMessage("streamClosed"));
200 }
201 return _read(b, off, len);
202 }
203
204 protected int _read(byte[] b, final int off, final int len)
205 throws IOException {
206 if (len < 0) throw new IllegalArgumentException
207 (Messages.getMessage("attach.readLengthError",
208 "" + len));
209
210 if (off < 0) throw new IllegalArgumentException
211 (Messages.getMessage("attach.readOffsetError",
212 "" + off));
213 if (b == null) throw new IllegalArgumentException
214 (Messages.getMessage("attach.readArrayNullError"));
215 if (b.length < off + len) throw new IllegalArgumentException
216 (Messages.getMessage("attach.readArraySizeError",
217 "" + b.length, "" + len, "" + off));
218
219 if (null != streamInError) throw streamInError;
220
221 if (0 == len) return 0; //quick.
222
223 if(recordLength == 0 && bytesRead == 0 && !moreChunks){
224 ++bytesRead; //odd case no data to read -- give back 0 next time -1;
225 if(ME){
226 finalClose();
227 }
228 return 0;
229 }
230 if (bytesRead >= recordLength && !moreChunks) {
231 dataPadLength -= readPad(dataPadLength);
232 if(ME){
233 finalClose();
234 }
235 return -1;
236 }
237
238 int totalbytesread = 0;
239 int bytes2read = 0;
240
241 do {
242 if (bytesRead >= recordLength && moreChunks)
243 readHeader(true);
244
245 bytes2read = (int) Math.min(recordLength - bytesRead,
246 (long) len - totalbytesread);
247 bytes2read = (int) Math.min(recordLength - bytesRead,
248 (long) len - totalbytesread);
249 try {
250 bytes2read = is.read(b, off + totalbytesread,
251 bytes2read);
252 } catch (IOException e) {
253 streamInError = e;
254 throw e;
255 }
256
257 if (0 < bytes2read) {
258 totalbytesread += bytes2read;
259 bytesRead += bytes2read;
260 }
261
262 }
263 while (bytes2read > -1 && totalbytesread < len &&
264 (bytesRead < recordLength || moreChunks));
265
266 if (0 > bytes2read) {
267 if (moreChunks) {
268 streamInError = new IOException(Messages.getMessage(
269 "attach.DimeStreamError0"));
270 throw streamInError;
271 }
272 if (bytesRead < recordLength) {
273 streamInError = new IOException(Messages.getMessage
274 ("attach.DimeStreamError1",
275 "" + (recordLength - bytesRead)));
276 throw streamInError;
277 }
278 if (!ME) {
279 streamInError = new IOException(Messages.getMessage(
280 "attach.DimeStreamError0"));
281 throw streamInError;
282 }
283 //in theory the last chunk of data should also have been padded, but lets be tolerant of that.
284 dataPadLength = 0;
285
286 } else if (bytesRead >= recordLength) {
287 //get rid of pading.
288 try {
289 dataPadLength -= readPad(dataPadLength);
290 } catch (IOException e) {
291 //in theory the last chunk of data should also have been padded, but lets be tolerant of that.
292 if (!ME) throw e;
293 else {
294 dataPadLength = 0;
295 streamInError = null;
296 }
297 }
298 }
299
300 if (bytesRead >= recordLength && ME) {
301 finalClose();
302 }
303
304 return totalbytesread >= 0 ? totalbytesread : -1;
305 }
306
307 void readHeader(boolean isChunk) throws IOException {
308
309 bytesRead = 0; //How many bytes of the record have been read.
310 if (isChunk) {
311 if (!moreChunks) throw new RuntimeException(
312 Messages.getMessage("attach.DimeStreamError2"));
313 dataPadLength -= readPad(dataPadLength); //Just incase it was left over.
314 }
315
316 byte[] header = new byte[12];
317
318 if (header.length != readFromStream(header)) {
319 streamInError = new IOException(Messages.getMessage(
320 "attach.DimeStreamError3",
321 "" + header.length));
322 throw streamInError;
323 }
324
325 //VERSION
326 byte version = (byte) ((header[0] >>> 3) & 0x1f);
327
328 if (version > DimeMultiPart.CURRENT_VERSION) {
329 streamInError = new IOException(Messages.getMessage("attach.DimeStreamError4",
330 "" + version,
331 "" + DimeMultiPart.CURRENT_VERSION));
332 throw streamInError;
333 }
334
335 //B, E, C
336 MB = 0 != (0x4 & header[0]);
337 ME = 0 != (0x2 & header[0]);
338 moreChunks = 0 != (0x1 & header[0]);
339
340 //TYPE_T
341 if (!isChunk)
342 tnf = DimeTypeNameFormat.parseByte((byte) ((header[1] >>> 4) & (byte) 0xf));
343
344 //OPTIONS_LENGTH
345 int optionsLength =
346 ((((int) header[2]) << 8) & 0xff00) | ((int) header[3]);
347
348 //ID_LENGTH
349 int idLength =
350 ((((int) header[4]) << 8) & 0xff00) | ((int) header[5]);
351
352 //TYPE_LENGTH
353 int typeLength = ((((int) header[6]) << 8) & 0xff00)
354 | ((int) header[7]);
355
356 //DATA_LENGTH
357 recordLength = ((((long) header[8]) << 24) & 0xff000000L) |
358 ((((long) header[9]) << 16) & 0xff0000L) |
359 ((((long) header[10]) << 8) & 0xff00L) |
360 ((long) header[11] & 0xffL);
361
362 //OPTIONS + PADDING
363
364 if (0 != optionsLength) {
365 byte[] optBytes = new byte[optionsLength];
366
367 if (optionsLength != readFromStream(optBytes)) {
368 streamInError = new IOException(Messages.getMessage(
369 "attach.DimeStreamError5",
370 "" + optionsLength));
371 throw streamInError;
372 }
373 optBytes = null; //Yup throw it away, don't know anything about options.
374
375 int pad = DimeBodyPart.dimePadding(optionsLength);
376
377 if (pad != readFromStream(header, 0, pad)) {
378 streamInError = new IOException(
379 Messages.getMessage("attach.DimeStreamError7"));
380 throw streamInError;
381 }
382 }
383
384 // ID + PADDING
385 if (0 < idLength) {
386 byte[] idBytes = new byte[ idLength];
387
388 if (idLength != readFromStream(idBytes)) {
389 streamInError = new IOException(
390 Messages.getMessage("attach.DimeStreamError8"));
391 throw streamInError;
392 }
393 if (idLength != 0 && !isChunk) {
394 id = new String(idBytes);
395 }
396 int pad = DimeBodyPart.dimePadding(idLength);
397
398 if (pad != readFromStream(header, 0, pad)) {
399 streamInError = new IOException(Messages.getMessage(
400 "attach.DimeStreamError9"));
401 throw streamInError;
402 }
403 }
404
405 //TYPE + PADDING
406 if (0 < typeLength) {
407 byte[] typeBytes = new byte[typeLength];
408
409 if (typeLength != readFromStream(typeBytes)) {
410 streamInError = new IOException(Messages.getMessage(
411 "attach.DimeStreamError10"));
412 throw streamInError;
413 }
414 if (typeLength != 0 && !isChunk) {
415 type = new String(typeBytes);
416 }
417 int pad = DimeBodyPart.dimePadding(typeLength);
418
419 if (pad != readFromStream(header, 0, pad)) {
420 streamInError = new IOException(Messages.getMessage(
421 "attach.DimeStreamError11"));
422
423 throw streamInError;
424 }
425 }
426 log.debug("MB:" + MB + ", ME:" + ME + ", CF:" + moreChunks +
427 "Option length:" + optionsLength +
428 ", ID length:" + idLength +
429 ", typeLength:" + typeLength + ", TYPE_T:" + tnf);
430 log.debug("id:\"" + id + "\"");
431 log.debug("type:\"" + type + "\"");
432 log.debug("recordlength:\"" + recordLength + "\"");
433
434 dataPadLength = DimeBodyPart.dimePadding(recordLength);
435 }
436
437 /**
438 * Read from the delimited stream.
439 * @param b is the array to read into. Read as much as possible
440 * into the size of this array.
441 * @return the number of bytes read. -1 if endof stream
442 * @throws IOException if data could not be read from the stream
443 */
444 public int read(byte[] b) throws IOException {
445 return read(b, 0, b.length);
446 }
447
448 // fixme: this seems a bit inefficient
449 /**
450 * Read from the boundary delimited stream.
451 *
452 * @return the byte read, or -1 if endof stream
453 * @throws IOException if there was an error reading the data
454 */
455 public int read() throws IOException {
456 byte[] b = new byte[1];
457 int read = read(b, 0, 1);
458
459 if (read < 0)
460 return -1; // fixme: should we also check for read != 1?
461 return (b[0] & 0xff); // convert byte value to a positive int
462 }
463
464 /**
465 * Closes the stream.
466 * <p>
467 * This will take care of flushing any remaining data to the strea.
468 * <p>
469 * Multiple calls to this method will result in the stream being closed once
470 * and then all subsequent calls being ignored.
471 *
472 * @throws IOException if the stream could not be closed
473 */
474 public void close() throws IOException {
475 synchronized(this){
476 if (closed) return;
477 closed = true; //mark it closed.
478 }
479 log.debug(Messages.getMessage("bStreamClosed", "" + streamNo));
480 if (bytesRead < recordLength || moreChunks) {
481 //We need get this off the stream.
482 //Easy way to flush through the stream;
483 byte[] readrest = new byte[1024 * 16];
484 int bread = 0;
485
486 do {
487 bread = _read(readrest, 0, readrest.length);//should also close the orginal stream.
488 }
489 while (bread > -1);
490 }
491 dataPadLength -= readPad(dataPadLength);
492 }
493
494 // fixme: if mark is not supported, do we throw an exception here?
495 /**
496 * Mark the stream.
497 * This is not supported.
498 */
499 public void mark(int readlimit) {//do nothing
500 }
501
502 public void reset() throws IOException {
503 streamInError = new IOException(Messages.getMessage(
504 "attach.bounday.mns"));
505 throw streamInError;
506 }
507
508 public boolean markSupported() {
509 return false;
510 }
511
512 public synchronized int available() throws IOException {
513 if (null != streamInError) throw streamInError;
514 int chunkAvail = (int) Math.min((long)
515 Integer.MAX_VALUE, recordLength - bytesRead);
516
517 int streamAvail = 0;
518
519 try {
520 streamAvail = is.available();
521 } catch (IOException e) {
522 streamInError = e;
523 throw e;
524 }
525
526 if (chunkAvail == 0 && moreChunks && (12 + dataPadLength)
527 <= streamAvail) {
528 dataPadLength -= readPad(dataPadLength);
529 readHeader(true);
530 return available();
531 }
532 return Math.min(streamAvail, chunkAvail);
533 }
534
535 protected void finalClose() throws IOException {
536 try{
537 theEnd = true;
538 if(null != is) is.close();
539 }finally{
540 is= null;
541 }
542 }
543 }