Save This Page
Home » hadoop-0.14.4 » org.apache » hadoop » fs » [javadoc | source]
    1   /**
    2    * Licensed to the Apache Software Foundation (ASF) under one
    3    * or more contributor license agreements.  See the NOTICE file
    4    * distributed with this work for additional information
    5    * regarding copyright ownership.  The ASF licenses this file
    6    * to you under the Apache License, Version 2.0 (the
    7    * "License"); you may not use this file except in compliance
    8    * with the License.  You may obtain a copy of the License at
    9    *
   10    *     http://www.apache.org/licenses/LICENSE-2.0
   11    *
   12    * Unless required by applicable law or agreed to in writing, software
   13    * distributed under the License is distributed on an "AS IS" BASIS,
   14    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   15    * See the License for the specific language governing permissions and
   16    * limitations under the License.
   17    */
   18   
   19   package org.apache.hadoop.fs;
   20   
   21   import java.io;
   22   import java.util.Arrays;
   23   import java.util.zip.CRC32;
   24   
   25   import org.apache.commons.logging.Log;
   26   import org.apache.commons.logging.LogFactory;
   27   import org.apache.hadoop.conf.Configuration;
   28   import org.apache.hadoop.util.Progressable;
   29   import org.apache.hadoop.util.StringUtils;
   30   
   31   /****************************************************************
   32    * Abstract Checksumed FileSystem.
   33    * It provide a basice implementation of a Checksumed FileSystem,
   34    * which creates a checksum file for each raw file.
   35    * It generates & verifies checksums at the client side.
   36    *
   37    *****************************************************************/
   38   public abstract class ChecksumFileSystem extends FilterFileSystem {
   39     private static final byte[] CHECKSUM_VERSION = new byte[] {'c', 'r', 'c', 0};
   40   
   41     public static double getApproxChkSumLength(long size) {
   42       return ChecksumFSOutputSummer.CHKSUM_AS_FRACTION * size;
   43     }
   44     
   45     public ChecksumFileSystem(FileSystem fs) {
   46       super(fs);
   47     }
   48   
   49     /** get the raw file system */
   50     public FileSystem getRawFileSystem() {
   51       return fs;
   52     }
   53   
   54     /** Return the name of the checksum file associated with a file.*/
   55     public Path getChecksumFile(Path file) {
   56       return new Path(file.getParent(), "." + file.getName() + ".crc");
   57     }
   58   
   59     /** Return true iff file is a checksum file name.*/
   60     public static boolean isChecksumFile(Path file) {
   61       String name = file.getName();
   62       return name.startsWith(".") && name.endsWith(".crc");
   63     }
   64   
   65     /** Return the length of the checksum file given the size of the 
   66      * actual file.
   67      **/
   68     public long getChecksumFileLength(Path file, long fileSize) {
   69       return ChecksumFSOutputSummer.getChecksumLength(fileSize, getBytesPerSum());
   70     }
   71   
   72     /** Return the bytes Per Checksum */
   73     public int getBytesPerSum() {
   74       return getConf().getInt("io.bytes.per.checksum", 512);
   75     }
   76   
   77     private int getSumBufferSize(int bytesPerSum, int bufferSize) {
   78       int defaultBufferSize = getConf().getInt("io.file.buffer.size", 4096);
   79       int proportionalBufferSize = bufferSize / bytesPerSum;
   80       return Math.max(bytesPerSum,
   81                       Math.max(proportionalBufferSize, defaultBufferSize));
   82     }
   83   
   84     /*******************************************************
   85      * For open()'s FSInputStream
   86      * It verifies that data matches checksums.
   87      *******************************************************/
   88     private static class ChecksumFSInputChecker extends FSInputChecker {
   89       public static final Log LOG 
   90         = LogFactory.getLog("org.apache.hadoop.fs.FSInputChecker");
   91       
   92       private ChecksumFileSystem fs;
   93       private FSDataInputStream datas;
   94       private FSDataInputStream sums;
   95       
   96       private static final int HEADER_LENGTH = 8;
   97       
   98       private int bytesPerSum = 1;
   99       private long fileLen = -1L;
  100       
  101       public ChecksumFSInputChecker(ChecksumFileSystem fs, Path file)
  102         throws IOException {
  103         this(fs, file, fs.getConf().getInt("io.file.buffer.size", 4096));
  104       }
  105       
  106       public ChecksumFSInputChecker(ChecksumFileSystem fs, Path file, int bufferSize)
  107         throws IOException {
  108         super( file, fs.getFileStatus(file).getReplication() );
  109         this.datas = fs.getRawFileSystem().open(file, bufferSize);
  110         this.fs = fs;
  111         Path sumFile = fs.getChecksumFile(file);
  112         try {
  113           int sumBufferSize = fs.getSumBufferSize(fs.getBytesPerSum(), bufferSize);
  114           sums = fs.getRawFileSystem().open(sumFile, sumBufferSize);
  115   
  116           byte[] version = new byte[CHECKSUM_VERSION.length];
  117           sums.readFully(version);
  118           if (!Arrays.equals(version, CHECKSUM_VERSION))
  119             throw new IOException("Not a checksum file: "+sumFile);
  120           this.bytesPerSum = sums.readInt();
  121           set(new CRC32(), bytesPerSum, 4);
  122         } catch (FileNotFoundException e) {         // quietly ignore
  123           set(null, 1, 0);
  124         } catch (IOException e) {                   // loudly ignore
  125           LOG.warn("Problem opening checksum file: "+ file + 
  126                    ".  Ignoring exception: " + 
  127                    StringUtils.stringifyException(e));
  128           set(null, 1, 0);
  129         }
  130       }
  131       
  132       private long getChecksumFilePos( long dataPos ) {
  133         return HEADER_LENGTH + 4*(dataPos/bytesPerSum);
  134       }
  135       
  136       protected long getChunkPosition( long dataPos ) {
  137         return dataPos/bytesPerSum*bytesPerSum;
  138       }
  139       
  140       public int available() throws IOException {
  141         return datas.available() + super.available();
  142       }
  143       
  144       public int read(long position, byte[] b, int off, int len)
  145         throws IOException {
  146         // parameter check
  147         if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
  148           throw new IndexOutOfBoundsException();
  149         } else if (len == 0) {
  150           return 0;
  151         }
  152         if( position<0 ) {
  153           throw new IllegalArgumentException(
  154               "Parameter position can not to be negative");
  155         }
  156   
  157         ChecksumFSInputChecker checker = new ChecksumFSInputChecker(fs, file);
  158         checker.seek(position);
  159         int nread = checker.read(b, off, len);
  160         checker.close();
  161         return nread;
  162       }
  163       
  164       public void close() throws IOException {
  165         datas.close();
  166         if( sums != null ) {
  167           sums.close();
  168         }
  169         set(null, 1, 0);
  170       }
  171       
  172   
  173       @Override
  174       public boolean seekToNewSource(long targetPos) throws IOException {
  175         long sumsPos = getChecksumFilePos(targetPos);
  176         fs.reportChecksumFailure(file, datas, targetPos, sums, sumsPos);
  177         boolean newDataSource = datas.seekToNewSource(targetPos);
  178         return sums.seekToNewSource(sumsPos) || newDataSource;
  179       }
  180   
  181       @Override
  182       protected int readChunk(long pos, byte[] buf, int offset, int len,
  183           byte[] checksum) throws IOException {
  184         boolean eof = false;
  185         if(needChecksum()) {
  186           try {
  187             long checksumPos = getChecksumFilePos(pos); 
  188             if(checksumPos != sums.getPos()) {
  189               sums.seek(checksumPos);
  190             }
  191             sums.readFully(checksum);
  192           } catch (EOFException e) {
  193             eof = true;
  194           }
  195           len = bytesPerSum;
  196         }
  197         if(pos != datas.getPos()) {
  198           datas.seek(pos);
  199         }
  200         int nread = readFully(datas, buf, offset, len);
  201         if( eof && nread > 0) {
  202           throw new ChecksumException("Checksum error: "+file+" at "+pos, pos);
  203         }
  204         return nread;
  205       }
  206       
  207       /* Return the file length */
  208       private long getFileLength() throws IOException {
  209         if( fileLen==-1L ) {
  210           fileLen = fs.getContentLength(file);
  211         }
  212         return fileLen;
  213       }
  214       
  215       /**
  216        * Skips over and discards <code>n</code> bytes of data from the
  217        * input stream.
  218        *
  219        *The <code>skip</code> method skips over some smaller number of bytes
  220        * when reaching end of file before <code>n</code> bytes have been skipped.
  221        * The actual number of bytes skipped is returned.  If <code>n</code> is
  222        * negative, no bytes are skipped.
  223        *
  224        * @param      n   the number of bytes to be skipped.
  225        * @return     the actual number of bytes skipped.
  226        * @exception  IOException  if an I/O error occurs.
  227        *             ChecksumException if the chunk to skip to is corrupted
  228        */
  229       public synchronized long skip(long n) throws IOException {
  230         long curPos = getPos();
  231         long fileLength = getFileLength();
  232         if( n+curPos > fileLength ) {
  233           n = fileLength - curPos;
  234         }
  235         return super.skip(n);
  236       }
  237       
  238       /**
  239        * Seek to the given position in the stream.
  240        * The next read() will be from that position.
  241        * 
  242        * <p>This method does not allow seek past the end of the file.
  243        * This produces IOException.
  244        *
  245        * @param      pos   the postion to seek to.
  246        * @exception  IOException  if an I/O error occurs or seeks after EOF
  247        *             ChecksumException if the chunk to seek to is corrupted
  248        */
  249   
  250       public synchronized void seek(long pos) throws IOException {
  251         if(pos>getFileLength()) {
  252           throw new IOException("Cannot seek after EOF");
  253         }
  254         super.seek(pos);
  255       }
  256   
  257     }
  258   
  259     /**
  260      * Opens an FSDataInputStream at the indicated Path.
  261      * @param f the file name to open
  262      * @param bufferSize the size of the buffer to be used.
  263      */
  264     @Override
  265     public FSDataInputStream open(Path f, int bufferSize) throws IOException {
  266       return new FSDataInputStream(
  267           new ChecksumFSInputChecker(this, f, bufferSize) );
  268     }
  269   
  270     /** This class provides an output stream for a checksummed file.
  271      * It generates checksums for data. */
  272     private static class ChecksumFSOutputSummer extends FSOutputSummer {
  273       private FSDataOutputStream datas;    
  274       private FSDataOutputStream sums;
  275       private static final float CHKSUM_AS_FRACTION = 0.01f;
  276       
  277       public ChecksumFSOutputSummer(ChecksumFileSystem fs, 
  278                             Path file, 
  279                             boolean overwrite, 
  280                             short replication,
  281                             long blockSize,
  282                             Configuration conf)
  283         throws IOException {
  284         this(fs, file, overwrite, 
  285              conf.getInt("io.file.buffer.size", 4096),
  286              replication, blockSize, null);
  287       }
  288       
  289       public ChecksumFSOutputSummer(ChecksumFileSystem fs, 
  290                             Path file, 
  291                             boolean overwrite,
  292                             int bufferSize,
  293                             short replication,
  294                             long blockSize,
  295                             Progressable progress)
  296         throws IOException {
  297         super(new CRC32(), fs.getBytesPerSum(), 4);
  298         int bytesPerSum = fs.getBytesPerSum();
  299         this.datas = fs.getRawFileSystem().create(file, overwrite, bufferSize, 
  300                                            replication, blockSize, progress);
  301         int sumBufferSize = fs.getSumBufferSize(bytesPerSum, bufferSize);
  302         this.sums = fs.getRawFileSystem().create(fs.getChecksumFile(file), true, 
  303                                                  sumBufferSize, replication,
  304                                                  blockSize);
  305         sums.write(CHECKSUM_VERSION, 0, CHECKSUM_VERSION.length);
  306         sums.writeInt(bytesPerSum);
  307       }
  308       
  309       public void close() throws IOException {
  310         flushBuffer();
  311         sums.close();
  312         datas.close();
  313       }
  314       
  315       public static long getChecksumLength(long size, int bytesPerSum) {
  316         //the checksum length is equal to size passed divided by bytesPerSum +
  317         //bytes written in the beginning of the checksum file.  
  318         return ((long)(Math.ceil((float)size/bytesPerSum)) + 1) * 4 + 
  319           CHECKSUM_VERSION.length;  
  320       }
  321   
  322       @Override
  323       protected void writeChunk(byte[] b, int offset, int len, byte[] checksum)
  324       throws IOException {
  325         datas.write(b, offset, len);
  326         sums.write(checksum);
  327       }
  328     }
  329   
  330     /**
  331      * Opens an FSDataOutputStream at the indicated Path with write-progress
  332      * reporting.
  333      * @param f the file name to open
  334      * @param overwrite if a file with this name already exists, then if true,
  335      *   the file will be overwritten, and if false an error will be thrown.
  336      * @param bufferSize the size of the buffer to be used.
  337      * @param replication required block replication for the file. 
  338      */
  339     @Override
  340     public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize,
  341                                      short replication, long blockSize, Progressable progress)
  342       throws IOException {
  343       Path parent = f.getParent();
  344       if (parent != null && !mkdirs(parent)) {
  345         throw new IOException("Mkdirs failed to create " + parent);
  346       }
  347       return new FSDataOutputStream(new ChecksumFSOutputSummer(
  348           this, f, overwrite, bufferSize, replication, blockSize, progress));
  349     }
  350   
  351     /**
  352      * Set replication for an existing file.
  353      * Implement the abstract <tt>setReplication</tt> of <tt>FileSystem</tt>
  354      * @param src file name
  355      * @param replication new replication
  356      * @throws IOException
  357      * @return true if successful;
  358      *         false if file does not exist or is a directory
  359      */
  360     public boolean setReplication(Path src, short replication) throws IOException {
  361       boolean value = fs.setReplication(src, replication);
  362       if (!value)
  363         return false;
  364   
  365       Path checkFile = getChecksumFile(src);
  366       if (exists(checkFile))
  367         fs.setReplication(checkFile, replication);
  368   
  369       return true;
  370     }
  371   
  372     /**
  373      * Rename files/dirs
  374      */
  375     public boolean rename(Path src, Path dst) throws IOException {
  376       if (fs.isDirectory(src)) {
  377         return fs.rename(src, dst);
  378       } else {
  379   
  380         boolean value = fs.rename(src, dst);
  381         if (!value)
  382           return false;
  383   
  384         Path checkFile = getChecksumFile(src);
  385         if (fs.exists(checkFile)) { //try to rename checksum
  386           if (fs.isDirectory(dst)) {
  387             value = fs.rename(checkFile, dst);
  388           } else {
  389             value = fs.rename(checkFile, getChecksumFile(dst));
  390           }
  391         }
  392   
  393         return value;
  394       }
  395     }
  396   
  397     /**
  398      * Get rid of Path f, whether a true file or dir.
  399      */
  400     public boolean delete(Path f) throws IOException {
  401       if (fs.isDirectory(f)) {
  402         return fs.delete(f);
  403       } else {
  404         Path checkFile = getChecksumFile(f);
  405         if (fs.exists(checkFile)) {
  406           fs.delete(checkFile);
  407         }
  408   
  409         return fs.delete(f);
  410       }
  411     }
  412   
  413     final private static PathFilter DEFAULT_FILTER = new PathFilter() {
  414         public boolean accept(Path file) {
  415           return !isChecksumFile(file);
  416         }
  417       };
  418   
  419     /** 
  420      * Filter raw files in the given pathes using the default checksum filter. 
  421      * @param files a list of paths
  422      * @return a list of files under the source paths
  423      * @exception IOException
  424      */
  425     @Override
  426     public Path[] listPaths(Path[] files) throws IOException {
  427       return fs.listPaths(files, DEFAULT_FILTER);
  428     }
  429   
  430     /** 
  431      * Filter raw files in the given path using the default checksum filter. 
  432      * @param f source path
  433      * @return a list of files under the source path
  434      * @exception IOException
  435      */
  436     public Path[] listPaths(Path f) throws IOException {
  437       return fs.listPaths(f, DEFAULT_FILTER);
  438     }
  439   
  440     @Override
  441     public boolean mkdirs(Path f) throws IOException {
  442       return fs.mkdirs(f);
  443     }
  444   
  445     @Override
  446     public void lock(Path f, boolean shared) throws IOException {
  447       if (fs.isDirectory(f)) {
  448         fs.lock(f, shared);
  449       } else {
  450         Path checkFile = getChecksumFile(f);
  451         if (fs.exists(checkFile)) {
  452           fs.lock(checkFile, shared);
  453         }
  454         fs.lock(f, shared);
  455       }
  456     }
  457   
  458     @Override
  459     public void release(Path f) throws IOException {
  460       if (fs.isDirectory(f)) {
  461         fs.release(f);
  462       } else {
  463         Path checkFile = getChecksumFile(f);
  464         if (fs.exists(checkFile)) {
  465           fs.release(getChecksumFile(f));
  466         }
  467         fs.release(f);
  468       }
  469     }
  470   
  471     @Override
  472     public void copyFromLocalFile(boolean delSrc, Path src, Path dst)
  473       throws IOException {
  474       Configuration conf = getConf();
  475       FileUtil.copy(getLocal(conf), src, this, dst, delSrc, conf);
  476     }
  477   
  478     /**
  479      * The src file is under FS, and the dst is on the local disk.
  480      * Copy it from FS control to the local dst name.
  481      */
  482     @Override
  483     public void copyToLocalFile(boolean delSrc, Path src, Path dst)
  484       throws IOException {
  485       Configuration conf = getConf();
  486       FileUtil.copy(this, src, getLocal(conf), dst, delSrc, conf);
  487     }
  488   
  489     /**
  490      * The src file is under FS, and the dst is on the local disk.
  491      * Copy it from FS control to the local dst name.
  492      * If src and dst are directories, the copyCrc parameter
  493      * determines whether to copy CRC files.
  494      */
  495     public void copyToLocalFile(Path src, Path dst, boolean copyCrc)
  496       throws IOException {
  497       if (!fs.isDirectory(src)) { // source is a file
  498         fs.copyToLocalFile(src, dst);
  499         FileSystem localFs = getLocal(getConf());
  500         if (localFs instanceof ChecksumFileSystem) {
  501           localFs = ((ChecksumFileSystem) localFs).getRawFileSystem();
  502         }
  503         if (localFs.isDirectory(dst)) {
  504           dst = new Path(dst, src.getName());
  505         }
  506         dst = getChecksumFile(dst);
  507         if (localFs.exists(dst)) { //remove old local checksum file
  508           localFs.delete(dst);
  509         }
  510         Path checksumFile = getChecksumFile(src);
  511         if (copyCrc && fs.exists(checksumFile)) { //copy checksum file
  512           fs.copyToLocalFile(checksumFile, dst);
  513         }
  514       } else {
  515         Path[] srcs = listPaths(src);
  516         for (Path srcFile : srcs) {
  517           copyToLocalFile(srcFile, new Path(dst, srcFile.getName()), copyCrc);
  518         }
  519       }
  520     }
  521   
  522     @Override
  523     public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile)
  524       throws IOException {
  525       return tmpLocalFile;
  526     }
  527   
  528     @Override
  529     public void completeLocalOutput(Path fsOutputFile, Path tmpLocalFile)
  530       throws IOException {
  531       moveFromLocalFile(tmpLocalFile, fsOutputFile);
  532     }
  533   
  534     /**
  535      * Report a checksum error to the file system.
  536      * @param f the file name containing the error
  537      * @param in the stream open on the file
  538      * @param inPos the position of the beginning of the bad data in the file
  539      * @param sums the stream open on the checksum file
  540      * @param sumsPos the position of the beginning of the bad data in the checksum file
  541      * @return if retry is neccessary
  542      */
  543     public boolean reportChecksumFailure(Path f, FSDataInputStream in,
  544                                          long inPos, FSDataInputStream sums, long sumsPos) {
  545       return false;
  546     }
  547   }

Save This Page
Home » hadoop-0.14.4 » org.apache » hadoop » fs » [javadoc | source]