Save This Page
Home » hadoop-0.14.4 » org.apache » hadoop » mapred » [javadoc | source]
    1   /**
    2    * Licensed to the Apache Software Foundation (ASF) under one
    3    * or more contributor license agreements.  See the NOTICE file
    4    * distributed with this work for additional information
    5    * regarding copyright ownership.  The ASF licenses this file
    6    * to you under the Apache License, Version 2.0 (the
    7    * "License"); you may not use this file except in compliance
    8    * with the License.  You may obtain a copy of the License at
    9    *
   10    *     http://www.apache.org/licenses/LICENSE-2.0
   11    *
   12    * Unless required by applicable law or agreed to in writing, software
   13    * distributed under the License is distributed on an "AS IS" BASIS,
   14    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   15    * See the License for the specific language governing permissions and
   16    * limitations under the License.
   17    */
   18   
   19   package org.apache.hadoop.mapred;
   20   
   21   import java.io.IOException;
   22   
   23   import org.apache.hadoop.fs;
   24   import org.apache.hadoop.fs.LocalDirAllocator;
   25   import org.apache.hadoop.conf;
   26   
   27   /**
   28    * Manipulate the working area for the transient store for maps and reduces.
   29    */ 
   30   class MapOutputFile {
   31   
   32     private JobConf conf;
   33     private LocalDirAllocator lDirAlloc = 
   34                               new LocalDirAllocator("mapred.local.dir");
   35     
   36     /** Return the path to local map output file created earlier
   37      * @param mapTaskId a map task id
   38      */
   39     public Path getOutputFile(String mapTaskId)
   40       throws IOException {
   41       return lDirAlloc.getLocalPathToRead(mapTaskId+"/file.out", conf);
   42     }
   43   
   44     /** Create a local map output file name.
   45      * @param mapTaskId a map task id
   46      * @param size the size of the file
   47      */
   48     public Path getOutputFileForWrite(String mapTaskId, long size)
   49       throws IOException {
   50       return lDirAlloc.getLocalPathForWrite(mapTaskId+"/file.out", size, conf);
   51     }
   52   
   53     /** Return the path to a local map output index file created earlier
   54      * @param mapTaskId a map task id
   55      */
   56     public Path getOutputIndexFile(String mapTaskId)
   57       throws IOException {
   58       return lDirAlloc.getLocalPathToRead(mapTaskId + "/file.out.index", conf);
   59     }
   60   
   61     /** Create a local map output index file name.
   62      * @param mapTaskId a map task id
   63      * @param size the size of the file
   64      */
   65     public Path getOutputIndexFileForWrite(String mapTaskId, long size)
   66       throws IOException {
   67       return lDirAlloc.getLocalPathForWrite(mapTaskId + "/file.out.index", 
   68                                             size, conf);
   69     }
   70   
   71     /** Return a local map spill file created earlier.
   72      * @param mapTaskId a map task id
   73      * @param spillNumber the number
   74      */
   75     public Path getSpillFile(String mapTaskId, int spillNumber)
   76       throws IOException {
   77       return lDirAlloc.getLocalPathToRead(mapTaskId+"/spill" +spillNumber+".out",
   78                                           conf);
   79     }
   80   
   81     /** Create a local map spill file name.
   82      * @param mapTaskId a map task id
   83      * @param spillNumber the number
   84      * @param size the size of the file
   85      */
   86     public Path getSpillFileForWrite(String mapTaskId, int spillNumber, 
   87            long size) throws IOException {
   88       return lDirAlloc.getLocalPathForWrite(mapTaskId+
   89                                                     "/spill" +spillNumber+".out",
   90                                                     size, conf);
   91     }
   92   
   93     /** Return a local map spill index file created earlier
   94      * @param mapTaskId a map task id
   95      * @param spillNumber the number
   96      */
   97     public Path getSpillIndexFile(String mapTaskId, int spillNumber)
   98       throws IOException {
   99       return lDirAlloc.getLocalPathToRead(
  100           mapTaskId+"/spill" +spillNumber+".out.index", conf);
  101     }
  102   
  103     /** Create a local map spill index file name.
  104      * @param mapTaskId a map task id
  105      * @param spillNumber the number
  106      * @param size the size of the file
  107      */
  108     public Path getSpillIndexFileForWrite(String mapTaskId, int spillNumber,
  109            long size) throws IOException {
  110       return lDirAlloc.getLocalPathForWrite(
  111           mapTaskId+"/spill" +spillNumber+".out.index", size, conf);
  112     }
  113   
  114     /** Return a local reduce input file created earlier
  115      * @param mapTaskId a map task id
  116      * @param reduceTaskId a reduce task id
  117      */
  118     public Path getInputFile(int mapId, String reduceTaskId)
  119       throws IOException {
  120       // TODO *oom* should use a format here
  121       return lDirAlloc.getLocalPathToRead(reduceTaskId + "/map_"+mapId+".out",
  122                                           conf);
  123     }
  124   
  125     /** Create a local reduce input file name.
  126      * @param mapTaskId a map task id
  127      * @param reduceTaskId a reduce task id
  128      * @param size the size of the file
  129      */
  130     public Path getInputFileForWrite(int mapId, String reduceTaskId, long size)
  131       throws IOException {
  132       // TODO *oom* should use a format here
  133       return lDirAlloc.getLocalPathForWrite(reduceTaskId + "/map_"+mapId+".out",
  134                                             size, conf);
  135     }
  136   
  137     /** Removes all of the files related to a task. */
  138     public void removeAll(String taskId) throws IOException {
  139       conf.deleteLocalFiles(taskId);
  140     }
  141   
  142     /** 
  143      * Removes all contents of temporary storage.  Called upon 
  144      * startup, to remove any leftovers from previous run.
  145      */
  146     public void cleanupStorage() throws IOException {
  147       conf.deleteLocalFiles();
  148     }
  149   
  150     public void setConf(Configuration conf) {
  151       if (conf instanceof JobConf) {
  152         this.conf = (JobConf) conf;
  153       } else {
  154         this.conf = new JobConf(conf);
  155       }
  156     }
  157   }

Save This Page
Home » hadoop-0.14.4 » org.apache » hadoop » mapred » [javadoc | source]