1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19 package org.apache.hadoop.fs;
20
21 import java.io;
22 import java.util.Arrays;
23 import java.util.zip.CRC32;
24
25 import org.apache.commons.logging.Log;
26 import org.apache.commons.logging.LogFactory;
27 import org.apache.hadoop.conf.Configuration;
28 import org.apache.hadoop.util.Progressable;
29 import org.apache.hadoop.util.StringUtils;
30
31 /****************************************************************
32 * Abstract Checksumed FileSystem.
33 * It provide a basice implementation of a Checksumed FileSystem,
34 * which creates a checksum file for each raw file.
35 * It generates & verifies checksums at the client side.
36 *
37 *****************************************************************/
38 public abstract class ChecksumFileSystem extends FilterFileSystem {
39 private static final byte[] CHECKSUM_VERSION = new byte[] {'c', 'r', 'c', 0};
40
41 public static double getApproxChkSumLength(long size) {
42 return ChecksumFSOutputSummer.CHKSUM_AS_FRACTION * size;
43 }
44
45 public ChecksumFileSystem(FileSystem fs) {
46 super(fs);
47 }
48
49 /** get the raw file system */
50 public FileSystem getRawFileSystem() {
51 return fs;
52 }
53
54 /** Return the name of the checksum file associated with a file.*/
55 public Path getChecksumFile(Path file) {
56 return new Path(file.getParent(), "." + file.getName() + ".crc");
57 }
58
59 /** Return true iff file is a checksum file name.*/
60 public static boolean isChecksumFile(Path file) {
61 String name = file.getName();
62 return name.startsWith(".") && name.endsWith(".crc");
63 }
64
65 /** Return the length of the checksum file given the size of the
66 * actual file.
67 **/
68 public long getChecksumFileLength(Path file, long fileSize) {
69 return ChecksumFSOutputSummer.getChecksumLength(fileSize, getBytesPerSum());
70 }
71
72 /** Return the bytes Per Checksum */
73 public int getBytesPerSum() {
74 return getConf().getInt("io.bytes.per.checksum", 512);
75 }
76
77 private int getSumBufferSize(int bytesPerSum, int bufferSize) {
78 int defaultBufferSize = getConf().getInt("io.file.buffer.size", 4096);
79 int proportionalBufferSize = bufferSize / bytesPerSum;
80 return Math.max(bytesPerSum,
81 Math.max(proportionalBufferSize, defaultBufferSize));
82 }
83
84 /*******************************************************
85 * For open()'s FSInputStream
86 * It verifies that data matches checksums.
87 *******************************************************/
88 private static class ChecksumFSInputChecker extends FSInputChecker {
89 public static final Log LOG
90 = LogFactory.getLog("org.apache.hadoop.fs.FSInputChecker");
91
92 private ChecksumFileSystem fs;
93 private FSDataInputStream datas;
94 private FSDataInputStream sums;
95
96 private static final int HEADER_LENGTH = 8;
97
98 private int bytesPerSum = 1;
99 private long fileLen = -1L;
100
101 public ChecksumFSInputChecker(ChecksumFileSystem fs, Path file)
102 throws IOException {
103 this(fs, file, fs.getConf().getInt("io.file.buffer.size", 4096));
104 }
105
106 public ChecksumFSInputChecker(ChecksumFileSystem fs, Path file, int bufferSize)
107 throws IOException {
108 super( file, fs.getFileStatus(file).getReplication() );
109 this.datas = fs.getRawFileSystem().open(file, bufferSize);
110 this.fs = fs;
111 Path sumFile = fs.getChecksumFile(file);
112 try {
113 int sumBufferSize = fs.getSumBufferSize(fs.getBytesPerSum(), bufferSize);
114 sums = fs.getRawFileSystem().open(sumFile, sumBufferSize);
115
116 byte[] version = new byte[CHECKSUM_VERSION.length];
117 sums.readFully(version);
118 if (!Arrays.equals(version, CHECKSUM_VERSION))
119 throw new IOException("Not a checksum file: "+sumFile);
120 this.bytesPerSum = sums.readInt();
121 set(new CRC32(), bytesPerSum, 4);
122 } catch (FileNotFoundException e) { // quietly ignore
123 set(null, 1, 0);
124 } catch (IOException e) { // loudly ignore
125 LOG.warn("Problem opening checksum file: "+ file +
126 ". Ignoring exception: " +
127 StringUtils.stringifyException(e));
128 set(null, 1, 0);
129 }
130 }
131
132 private long getChecksumFilePos( long dataPos ) {
133 return HEADER_LENGTH + 4*(dataPos/bytesPerSum);
134 }
135
136 protected long getChunkPosition( long dataPos ) {
137 return dataPos/bytesPerSum*bytesPerSum;
138 }
139
140 public int available() throws IOException {
141 return datas.available() + super.available();
142 }
143
144 public int read(long position, byte[] b, int off, int len)
145 throws IOException {
146 // parameter check
147 if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
148 throw new IndexOutOfBoundsException();
149 } else if (len == 0) {
150 return 0;
151 }
152 if( position<0 ) {
153 throw new IllegalArgumentException(
154 "Parameter position can not to be negative");
155 }
156
157 ChecksumFSInputChecker checker = new ChecksumFSInputChecker(fs, file);
158 checker.seek(position);
159 int nread = checker.read(b, off, len);
160 checker.close();
161 return nread;
162 }
163
164 public void close() throws IOException {
165 datas.close();
166 if( sums != null ) {
167 sums.close();
168 }
169 set(null, 1, 0);
170 }
171
172
173 @Override
174 public boolean seekToNewSource(long targetPos) throws IOException {
175 long sumsPos = getChecksumFilePos(targetPos);
176 fs.reportChecksumFailure(file, datas, targetPos, sums, sumsPos);
177 boolean newDataSource = datas.seekToNewSource(targetPos);
178 return sums.seekToNewSource(sumsPos) || newDataSource;
179 }
180
181 @Override
182 protected int readChunk(long pos, byte[] buf, int offset, int len,
183 byte[] checksum) throws IOException {
184 boolean eof = false;
185 if(needChecksum()) {
186 try {
187 long checksumPos = getChecksumFilePos(pos);
188 if(checksumPos != sums.getPos()) {
189 sums.seek(checksumPos);
190 }
191 sums.readFully(checksum);
192 } catch (EOFException e) {
193 eof = true;
194 }
195 len = bytesPerSum;
196 }
197 if(pos != datas.getPos()) {
198 datas.seek(pos);
199 }
200 int nread = readFully(datas, buf, offset, len);
201 if( eof && nread > 0) {
202 throw new ChecksumException("Checksum error: "+file+" at "+pos, pos);
203 }
204 return nread;
205 }
206
207 /* Return the file length */
208 private long getFileLength() throws IOException {
209 if( fileLen==-1L ) {
210 fileLen = fs.getContentLength(file);
211 }
212 return fileLen;
213 }
214
215 /**
216 * Skips over and discards <code>n</code> bytes of data from the
217 * input stream.
218 *
219 *The <code>skip</code> method skips over some smaller number of bytes
220 * when reaching end of file before <code>n</code> bytes have been skipped.
221 * The actual number of bytes skipped is returned. If <code>n</code> is
222 * negative, no bytes are skipped.
223 *
224 * @param n the number of bytes to be skipped.
225 * @return the actual number of bytes skipped.
226 * @exception IOException if an I/O error occurs.
227 * ChecksumException if the chunk to skip to is corrupted
228 */
229 public synchronized long skip(long n) throws IOException {
230 long curPos = getPos();
231 long fileLength = getFileLength();
232 if( n+curPos > fileLength ) {
233 n = fileLength - curPos;
234 }
235 return super.skip(n);
236 }
237
238 /**
239 * Seek to the given position in the stream.
240 * The next read() will be from that position.
241 *
242 * <p>This method does not allow seek past the end of the file.
243 * This produces IOException.
244 *
245 * @param pos the postion to seek to.
246 * @exception IOException if an I/O error occurs or seeks after EOF
247 * ChecksumException if the chunk to seek to is corrupted
248 */
249
250 public synchronized void seek(long pos) throws IOException {
251 if(pos>getFileLength()) {
252 throw new IOException("Cannot seek after EOF");
253 }
254 super.seek(pos);
255 }
256
257 }
258
259 /**
260 * Opens an FSDataInputStream at the indicated Path.
261 * @param f the file name to open
262 * @param bufferSize the size of the buffer to be used.
263 */
264 @Override
265 public FSDataInputStream open(Path f, int bufferSize) throws IOException {
266 return new FSDataInputStream(
267 new ChecksumFSInputChecker(this, f, bufferSize) );
268 }
269
270 /** This class provides an output stream for a checksummed file.
271 * It generates checksums for data. */
272 private static class ChecksumFSOutputSummer extends FSOutputSummer {
273 private FSDataOutputStream datas;
274 private FSDataOutputStream sums;
275 private static final float CHKSUM_AS_FRACTION = 0.01f;
276
277 public ChecksumFSOutputSummer(ChecksumFileSystem fs,
278 Path file,
279 boolean overwrite,
280 short replication,
281 long blockSize,
282 Configuration conf)
283 throws IOException {
284 this(fs, file, overwrite,
285 conf.getInt("io.file.buffer.size", 4096),
286 replication, blockSize, null);
287 }
288
289 public ChecksumFSOutputSummer(ChecksumFileSystem fs,
290 Path file,
291 boolean overwrite,
292 int bufferSize,
293 short replication,
294 long blockSize,
295 Progressable progress)
296 throws IOException {
297 super(new CRC32(), fs.getBytesPerSum(), 4);
298 int bytesPerSum = fs.getBytesPerSum();
299 this.datas = fs.getRawFileSystem().create(file, overwrite, bufferSize,
300 replication, blockSize, progress);
301 int sumBufferSize = fs.getSumBufferSize(bytesPerSum, bufferSize);
302 this.sums = fs.getRawFileSystem().create(fs.getChecksumFile(file), true,
303 sumBufferSize, replication,
304 blockSize);
305 sums.write(CHECKSUM_VERSION, 0, CHECKSUM_VERSION.length);
306 sums.writeInt(bytesPerSum);
307 }
308
309 public void close() throws IOException {
310 flushBuffer();
311 sums.close();
312 datas.close();
313 }
314
315 public static long getChecksumLength(long size, int bytesPerSum) {
316 //the checksum length is equal to size passed divided by bytesPerSum +
317 //bytes written in the beginning of the checksum file.
318 return ((long)(Math.ceil((float)size/bytesPerSum)) + 1) * 4 +
319 CHECKSUM_VERSION.length;
320 }
321
322 @Override
323 protected void writeChunk(byte[] b, int offset, int len, byte[] checksum)
324 throws IOException {
325 datas.write(b, offset, len);
326 sums.write(checksum);
327 }
328 }
329
330 /**
331 * Opens an FSDataOutputStream at the indicated Path with write-progress
332 * reporting.
333 * @param f the file name to open
334 * @param overwrite if a file with this name already exists, then if true,
335 * the file will be overwritten, and if false an error will be thrown.
336 * @param bufferSize the size of the buffer to be used.
337 * @param replication required block replication for the file.
338 */
339 @Override
340 public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize,
341 short replication, long blockSize, Progressable progress)
342 throws IOException {
343 Path parent = f.getParent();
344 if (parent != null && !mkdirs(parent)) {
345 throw new IOException("Mkdirs failed to create " + parent);
346 }
347 return new FSDataOutputStream(new ChecksumFSOutputSummer(
348 this, f, overwrite, bufferSize, replication, blockSize, progress));
349 }
350
351 /**
352 * Set replication for an existing file.
353 * Implement the abstract <tt>setReplication</tt> of <tt>FileSystem</tt>
354 * @param src file name
355 * @param replication new replication
356 * @throws IOException
357 * @return true if successful;
358 * false if file does not exist or is a directory
359 */
360 public boolean setReplication(Path src, short replication) throws IOException {
361 boolean value = fs.setReplication(src, replication);
362 if (!value)
363 return false;
364
365 Path checkFile = getChecksumFile(src);
366 if (exists(checkFile))
367 fs.setReplication(checkFile, replication);
368
369 return true;
370 }
371
372 /**
373 * Rename files/dirs
374 */
375 public boolean rename(Path src, Path dst) throws IOException {
376 if (fs.isDirectory(src)) {
377 return fs.rename(src, dst);
378 } else {
379
380 boolean value = fs.rename(src, dst);
381 if (!value)
382 return false;
383
384 Path checkFile = getChecksumFile(src);
385 if (fs.exists(checkFile)) { //try to rename checksum
386 if (fs.isDirectory(dst)) {
387 value = fs.rename(checkFile, dst);
388 } else {
389 value = fs.rename(checkFile, getChecksumFile(dst));
390 }
391 }
392
393 return value;
394 }
395 }
396
397 /**
398 * Get rid of Path f, whether a true file or dir.
399 */
400 public boolean delete(Path f) throws IOException {
401 if (fs.isDirectory(f)) {
402 return fs.delete(f);
403 } else {
404 Path checkFile = getChecksumFile(f);
405 if (fs.exists(checkFile)) {
406 fs.delete(checkFile);
407 }
408
409 return fs.delete(f);
410 }
411 }
412
413 final private static PathFilter DEFAULT_FILTER = new PathFilter() {
414 public boolean accept(Path file) {
415 return !isChecksumFile(file);
416 }
417 };
418
419 /**
420 * Filter raw files in the given pathes using the default checksum filter.
421 * @param files a list of paths
422 * @return a list of files under the source paths
423 * @exception IOException
424 */
425 @Override
426 public Path[] listPaths(Path[] files) throws IOException {
427 return fs.listPaths(files, DEFAULT_FILTER);
428 }
429
430 /**
431 * Filter raw files in the given path using the default checksum filter.
432 * @param f source path
433 * @return a list of files under the source path
434 * @exception IOException
435 */
436 public Path[] listPaths(Path f) throws IOException {
437 return fs.listPaths(f, DEFAULT_FILTER);
438 }
439
440 @Override
441 public boolean mkdirs(Path f) throws IOException {
442 return fs.mkdirs(f);
443 }
444
445 @Override
446 public void lock(Path f, boolean shared) throws IOException {
447 if (fs.isDirectory(f)) {
448 fs.lock(f, shared);
449 } else {
450 Path checkFile = getChecksumFile(f);
451 if (fs.exists(checkFile)) {
452 fs.lock(checkFile, shared);
453 }
454 fs.lock(f, shared);
455 }
456 }
457
458 @Override
459 public void release(Path f) throws IOException {
460 if (fs.isDirectory(f)) {
461 fs.release(f);
462 } else {
463 Path checkFile = getChecksumFile(f);
464 if (fs.exists(checkFile)) {
465 fs.release(getChecksumFile(f));
466 }
467 fs.release(f);
468 }
469 }
470
471 @Override
472 public void copyFromLocalFile(boolean delSrc, Path src, Path dst)
473 throws IOException {
474 Configuration conf = getConf();
475 FileUtil.copy(getLocal(conf), src, this, dst, delSrc, conf);
476 }
477
478 /**
479 * The src file is under FS, and the dst is on the local disk.
480 * Copy it from FS control to the local dst name.
481 */
482 @Override
483 public void copyToLocalFile(boolean delSrc, Path src, Path dst)
484 throws IOException {
485 Configuration conf = getConf();
486 FileUtil.copy(this, src, getLocal(conf), dst, delSrc, conf);
487 }
488
489 /**
490 * The src file is under FS, and the dst is on the local disk.
491 * Copy it from FS control to the local dst name.
492 * If src and dst are directories, the copyCrc parameter
493 * determines whether to copy CRC files.
494 */
495 public void copyToLocalFile(Path src, Path dst, boolean copyCrc)
496 throws IOException {
497 if (!fs.isDirectory(src)) { // source is a file
498 fs.copyToLocalFile(src, dst);
499 FileSystem localFs = getLocal(getConf());
500 if (localFs instanceof ChecksumFileSystem) {
501 localFs = ((ChecksumFileSystem) localFs).getRawFileSystem();
502 }
503 if (localFs.isDirectory(dst)) {
504 dst = new Path(dst, src.getName());
505 }
506 dst = getChecksumFile(dst);
507 if (localFs.exists(dst)) { //remove old local checksum file
508 localFs.delete(dst);
509 }
510 Path checksumFile = getChecksumFile(src);
511 if (copyCrc && fs.exists(checksumFile)) { //copy checksum file
512 fs.copyToLocalFile(checksumFile, dst);
513 }
514 } else {
515 Path[] srcs = listPaths(src);
516 for (Path srcFile : srcs) {
517 copyToLocalFile(srcFile, new Path(dst, srcFile.getName()), copyCrc);
518 }
519 }
520 }
521
522 @Override
523 public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile)
524 throws IOException {
525 return tmpLocalFile;
526 }
527
528 @Override
529 public void completeLocalOutput(Path fsOutputFile, Path tmpLocalFile)
530 throws IOException {
531 moveFromLocalFile(tmpLocalFile, fsOutputFile);
532 }
533
534 /**
535 * Report a checksum error to the file system.
536 * @param f the file name containing the error
537 * @param in the stream open on the file
538 * @param inPos the position of the beginning of the bad data in the file
539 * @param sums the stream open on the checksum file
540 * @param sumsPos the position of the beginning of the bad data in the checksum file
541 * @return if retry is neccessary
542 */
543 public boolean reportChecksumFailure(Path f, FSDataInputStream in,
544 long inPos, FSDataInputStream sums, long sumsPos) {
545 return false;
546 }
547 }