Save This Page
Home » hadoop-0.14.4 » org.apache » hadoop » fs » [javadoc | source]
    1   /**
    2    * Licensed to the Apache Software Foundation (ASF) under one
    3    * or more contributor license agreements.  See the NOTICE file
    4    * distributed with this work for additional information
    5    * regarding copyright ownership.  The ASF licenses this file
    6    * to you under the Apache License, Version 2.0 (the
    7    * "License"); you may not use this file except in compliance
    8    * with the License.  You may obtain a copy of the License at
    9    *
   10    *     http://www.apache.org/licenses/LICENSE-2.0
   11    *
   12    * Unless required by applicable law or agreed to in writing, software
   13    * distributed under the License is distributed on an "AS IS" BASIS,
   14    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   15    * See the License for the specific language governing permissions and
   16    * limitations under the License.
   17    */
   18   
   19   package org.apache.hadoop.fs;
   20   
   21   import java.io;
   22   import java.util;
   23   
   24   import org.apache.commons.logging;
   25   
   26   import org.apache.hadoop.util;
   27   import org.apache.hadoop.fs.FileSystem;
   28   import org.apache.hadoop.fs.Path;
   29   import org.apache.hadoop.util.DiskChecker.DiskErrorException;
   30   import org.apache.hadoop.conf.Configuration; 
   31   
   32   /** An implementation of a round-robin scheme for disk allocation for creating
   33    * files. The way it works is that it is kept track what disk was last
   34    * allocated for a file write. For the current request, the next disk from
   35    * the set of disks would be allocated if the free space on the disk is 
   36    * sufficient enough to accomodate the file that is being considered for
   37    * creation. If the space requirements cannot be met, the next disk in order
   38    * would be tried and so on till a disk is found with sufficient capacity.
   39    * Once a disk with sufficient space is identified, a check is done to make
   40    * sure that the disk is writable. Also, there is an API provided that doesn't
   41    * take the space requirements into consideration but just checks whether the
   42    * disk under consideration is writable (this should be used for cases where
   43    * the file size is not known apriori). An API is provided to read a path that
   44    * was created earlier. That API works by doing a scan of all the disks for the
   45    * input pathname.
   46    * This implementation also provides the functionality of having multiple 
   47    * allocators per JVM (one for each unique functionality or context, like 
   48    * mapred, dfs-client, etc.). It ensures that there is only one instance of
   49    * an allocator per context per JVM.
   50    * Note:
   51    * 1. The contexts referred above are actually the configuration items defined
   52    * in the Configuration class like "mapred.local.dir" (for which we want to 
   53    * control the dir allocations). The context-strings are exactly those 
   54    * configuration items.
   55    * 2. This implementation does not take into consideration cases where
   56    * a disk becomes read-only or goes out of space while a file is being written
   57    * to (disks are shared between multiple processes, and so the latter situation
   58    * is probable).
   59    * 3. In the class implementation, "Disk" is referred to as "Dir", which
   60    * actually points to the configured directory on the Disk which will be the
   61    * parent for all file write/read allocations.
   62    */
   63   public class LocalDirAllocator {
   64     
   65     //A Map from the config item names like "mapred.local.dir", 
   66     //"dfs.client.buffer.dir" to the instance of the AllocatorPerContext. This
   67     //is a static object to make sure there exists exactly one instance per JVM
   68     private static Map <String, AllocatorPerContext> contexts = 
   69                    new TreeMap<String, AllocatorPerContext>();
   70     private String contextCfgItemName;
   71   
   72     /**Create an allocator object
   73      * @param contextCfgItemName
   74      */
   75     public LocalDirAllocator(String contextCfgItemName) {
   76       this.contextCfgItemName = contextCfgItemName;
   77     }
   78     
   79     /** This method must be used to obtain the dir allocation context for a 
   80      * particular value of the context name. The context name must be an item
   81      * defined in the Configuration object for which we want to control the 
   82      * dir allocations (e.g., <code>mapred.local.dir</code>). The method will
   83      * create a context for that name if it doesn't already exist.
   84      */
   85     private AllocatorPerContext obtainContext(String contextCfgItemName) {
   86       synchronized (contexts) {
   87         AllocatorPerContext l = contexts.get(contextCfgItemName);
   88         if (l == null) {
   89           contexts.put(contextCfgItemName, 
   90                       (l = new AllocatorPerContext(contextCfgItemName)));
   91         }
   92         return l;
   93       }
   94     }
   95     
   96     /** Get a path from the local FS. This method should be used if the size of 
   97      *  the file is not known apriori. We go round-robin over the set of disks
   98      *  (via the configured dirs) and return the first complete path where
   99      *  we could create the parent directory of the passed path. 
  100      *  @param pathStr the requested path (this will be created on the first 
  101      *  available disk)
  102      *  @param conf the Configuration object
  103      *  @return the complete path to the file on a local disk
  104      *  @throws IOException
  105      */
  106     public Path getLocalPathForWrite(String pathStr, 
  107         Configuration conf) throws IOException {
  108       return getLocalPathForWrite(pathStr, -1, conf);
  109     }
  110     
  111     /** Get a path from the local FS. Pass size as -1 if not known apriori. We
  112      *  round-robin over the set of disks (via the configured dirs) and return
  113      *  the first complete path which has enough space 
  114      *  @param pathStr the requested path (this will be created on the first 
  115      *  available disk)
  116      *  @param size the size of the file that is going to be written
  117      *  @param conf the Configuration object
  118      *  @return the complete path to the file on a local disk
  119      *  @throws IOException
  120      */
  121     public Path getLocalPathForWrite(String pathStr, long size, 
  122         Configuration conf) throws IOException {
  123       AllocatorPerContext context = obtainContext(contextCfgItemName);
  124       return context.getLocalPathForWrite(pathStr, size, conf);
  125     }
  126     
  127     /** Get a path from the local FS for reading. We search through all the
  128      *  configured dirs for the file's existence and return the complete
  129      *  path to the file when we find one 
  130      *  @param pathStr the requested file (this will be searched)
  131      *  @param conf the Configuration object
  132      *  @return the complete path to the file on a local disk
  133      *  @throws IOException
  134      */
  135     public Path getLocalPathToRead(String pathStr, 
  136         Configuration conf) throws IOException {
  137       AllocatorPerContext context = obtainContext(contextCfgItemName);
  138       return context.getLocalPathToRead(pathStr, conf);
  139     }
  140   
  141     /** Creates a temporary file in the local FS. Pass size as -1 if not known 
  142      *  apriori. We round-robin over the set of disks (via the configured dirs) 
  143      *  and select the first complete path which has enough space. A file is
  144      *  created on this directory. The file is guaranteed to go away when the
  145      *  JVM exits.
  146      *  @param pathStr prefix for the temporary file
  147      *  @param size the size of the file that is going to be written
  148      *  @param conf the Configuration object
  149      *  @return a unique temporary file
  150      *  @throws IOException
  151      */
  152     public File createTmpFileForWrite(String pathStr, long size, 
  153         Configuration conf) throws IOException {
  154       AllocatorPerContext context = obtainContext(contextCfgItemName);
  155       return context.createTmpFileForWrite(pathStr, size, conf);
  156     }
  157     
  158     /** Method to check whether a context is valid
  159      * @param contextCfgItemName
  160      * @return true/false
  161      */
  162     public static boolean isContextValid(String contextCfgItemName) {
  163       synchronized (contexts) {
  164         return contexts.containsKey(contextCfgItemName);
  165       }
  166     }
  167       
  168     private static class AllocatorPerContext {
  169   
  170       private final Log LOG =
  171         LogFactory.getLog("org.apache.hadoop.fs.AllocatorPerContext");
  172   
  173       private int dirNumLastAccessed;
  174       private FileSystem localFS;
  175       private DF[] dirDF;
  176       private String contextCfgItemName;
  177       private String[] localDirs;
  178       private String savedLocalDirs = "";
  179   
  180       public AllocatorPerContext(String contextCfgItemName) {
  181         this.contextCfgItemName = contextCfgItemName;
  182       }
  183   
  184       /** This method gets called everytime before any read/write to make sure
  185        * that any change to localDirs is reflected immediately.
  186        */
  187       private void confChanged(Configuration conf) throws IOException {
  188         String newLocalDirs = conf.get(contextCfgItemName);
  189         if (!newLocalDirs.equals(savedLocalDirs)) {
  190           localDirs = conf.getStrings(contextCfgItemName);
  191           localFS = FileSystem.getLocal(conf);
  192           int numDirs = localDirs.length;
  193           dirDF = new DF[numDirs];
  194           for (int i = 0; i < numDirs; i++) {
  195             try {
  196               localFS.mkdirs(new Path(localDirs[i]));
  197             } catch (IOException ie) { } //ignore
  198             dirDF[i] = new DF(new File(localDirs[i]), 30000);
  199           }
  200           dirNumLastAccessed = 0;
  201           savedLocalDirs = newLocalDirs;
  202         }
  203       }
  204   
  205       private Path createPath(String path) throws IOException {
  206         Path file = new Path(new Path(localDirs[dirNumLastAccessed]),
  207                                       path);
  208         //check whether we are able to create a directory here. If the disk
  209         //happens to be RDONLY we will fail
  210         try {
  211           DiskChecker.checkDir(new File(file.getParent().toUri().getPath()));
  212           return file;
  213         } catch (DiskErrorException d) {
  214           LOG.warn(StringUtils.stringifyException(d));
  215           return null;
  216         }
  217       }
  218   
  219       /** Get a path from the local FS. This method should be used if the size of 
  220        *  the file is not known apriori. We go round-robin over the set of disks
  221        *  (via the configured dirs) and return the first complete path where
  222        *  we could create the parent directory of the passed path. 
  223        */
  224       public synchronized Path getLocalPathForWrite(String path, 
  225           Configuration conf) throws IOException {
  226         return getLocalPathForWrite(path, -1, conf);
  227       }
  228   
  229       /** Get a path from the local FS. Pass size as -1 if not known apriori. We
  230        *  round-robin over the set of disks (via the configured dirs) and return
  231        *  the first complete path which has enough space 
  232        */
  233       public synchronized Path getLocalPathForWrite(String pathStr, long size, 
  234           Configuration conf) throws IOException {
  235         confChanged(conf);
  236         int numDirs = localDirs.length;
  237         int numDirsSearched = 0;
  238         //remove the leading slash from the path (to make sure that the uri
  239         //resolution results in a valid path on the dir being checked)
  240         if (pathStr.startsWith("/")) {
  241           pathStr = pathStr.substring(1);
  242         }
  243         Path returnPath = null;
  244         while (numDirsSearched < numDirs && returnPath == null) {
  245           if (size >= 0) {
  246             long capacity = dirDF[dirNumLastAccessed].getAvailable();
  247             if (capacity > size) {
  248               returnPath = createPath(pathStr);
  249             }
  250           } else {
  251             returnPath = createPath(pathStr);
  252           }
  253           dirNumLastAccessed++;
  254           dirNumLastAccessed = dirNumLastAccessed % numDirs; 
  255           numDirsSearched++;
  256         } 
  257   
  258         if (returnPath != null) {
  259           return returnPath;
  260         }
  261         
  262         //no path found
  263         throw new DiskErrorException("Could not find any valid local " +
  264             "directory for " + pathStr);
  265       }
  266   
  267       /** Creates a file on the local FS. Pass size as -1 if not known apriori. We
  268        *  round-robin over the set of disks (via the configured dirs) and return
  269        *  a file on the first path which has enough space. The file is guaranteed
  270        *  to go away when the JVM exits.
  271        */
  272       public File createTmpFileForWrite(String pathStr, long size, 
  273           Configuration conf) throws IOException {
  274   
  275         // find an appropriate directory
  276         Path path = getLocalPathForWrite(pathStr, size, conf);
  277         File dir = new File(path.getParent().toUri().getPath());
  278         String prefix = path.getName();
  279   
  280         // create a temp file on this directory
  281         File result = File.createTempFile(prefix, null, dir);
  282         result.deleteOnExit();
  283         return result;
  284       }
  285   
  286       /** Get a path from the local FS for reading. We search through all the
  287        *  configured dirs for the file's existence and return the complete
  288        *  path to the file when we find one 
  289        */
  290       public synchronized Path getLocalPathToRead(String pathStr, 
  291           Configuration conf) throws IOException {
  292         confChanged(conf);
  293         int numDirs = localDirs.length;
  294         int numDirsSearched = 0;
  295         //remove the leading slash from the path (to make sure that the uri
  296         //resolution results in a valid path on the dir being checked)
  297         if (pathStr.startsWith("/")) {
  298           pathStr = pathStr.substring(1);
  299         }
  300         while (numDirsSearched < numDirs) {
  301           Path file = new Path(localDirs[numDirsSearched], pathStr);
  302           if (localFS.exists(file)) {
  303             return file;
  304           }
  305           numDirsSearched++;
  306         }
  307   
  308         //no path found
  309         throw new DiskErrorException ("Could not find " + pathStr +" in any of" +
  310         " the configured local directories");
  311       }
  312     }
  313   }

Save This Page
Home » hadoop-0.14.4 » org.apache » hadoop » fs » [javadoc | source]