Save This Page
Home » hadoop-0.14.4 » org.apache » hadoop » mapred » [javadoc | source]
    1   /**
    2    * Licensed to the Apache Software Foundation (ASF) under one
    3    * or more contributor license agreements.  See the NOTICE file
    4    * distributed with this work for additional information
    5    * regarding copyright ownership.  The ASF licenses this file
    6    * to you under the Apache License, Version 2.0 (the
    7    * "License"); you may not use this file except in compliance
    8    * with the License.  You may obtain a copy of the License at
    9    *
   10    *     http://www.apache.org/licenses/LICENSE-2.0
   11    *
   12    * Unless required by applicable law or agreed to in writing, software
   13    * distributed under the License is distributed on an "AS IS" BASIS,
   14    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   15    * See the License for the specific language governing permissions and
   16    * limitations under the License.
   17    */
   18   
   19   package org.apache.hadoop.mapred;
   20   
   21   import java.io.DataInput;
   22   import java.io.DataOutput;
   23   import java.io.File;
   24   import java.io.IOException;
   25   import java.net.URI;
   26   import java.net.URL;
   27   import java.net.URLClassLoader;
   28   import java.text.DecimalFormat;
   29   import java.util.ArrayList;
   30   import java.util.Collections;
   31   import java.util.HashSet;
   32   import java.util.Hashtable;
   33   import java.util.Iterator;
   34   import java.util.List;
   35   import java.util.Map;
   36   import java.util.Random;
   37   import java.util.Set;
   38   import java.util.TreeSet;
   39   import java.util.concurrent.atomic.AtomicBoolean;
   40   
   41   import org.apache.commons.logging.Log;
   42   import org.apache.commons.logging.LogFactory;
   43   import org.apache.hadoop.conf.Configuration;
   44   import org.apache.hadoop.fs.FileSystem;
   45   import org.apache.hadoop.fs.InMemoryFileSystem;
   46   import org.apache.hadoop.fs.LocalFileSystem;
   47   import org.apache.hadoop.fs.Path;
   48   import org.apache.hadoop.fs.PathFilter;
   49   import org.apache.hadoop.io.DataInputBuffer;
   50   import org.apache.hadoop.io.DataOutputBuffer;
   51   import org.apache.hadoop.io.IntWritable;
   52   import org.apache.hadoop.io.SequenceFile;
   53   import org.apache.hadoop.io.Writable;
   54   import org.apache.hadoop.io.WritableComparable;
   55   import org.apache.hadoop.io.WritableComparator;
   56   import org.apache.hadoop.io.WritableFactories;
   57   import org.apache.hadoop.io.WritableFactory;
   58   import org.apache.hadoop.metrics.MetricsContext;
   59   import org.apache.hadoop.metrics.MetricsRecord;
   60   import org.apache.hadoop.metrics.MetricsUtil;
   61   import org.apache.hadoop.metrics.Updater;
   62   import org.apache.hadoop.util.Progress;
   63   import org.apache.hadoop.util.ReflectionUtils;
   64   import org.apache.hadoop.util.StringUtils;
   65   import org.apache.hadoop.util.DiskChecker.DiskErrorException;
   66   
   67   import static org.apache.hadoop.mapred.Task.Counter.*;
   68   
   69   /** A Reduce task. */
   70   class ReduceTask extends Task {
   71   
   72     static {                                        // register a ctor
   73       WritableFactories.setFactory
   74         (ReduceTask.class,
   75          new WritableFactory() {
   76            public Writable newInstance() { return new ReduceTask(); }
   77          });
   78     }
   79     
   80     private static final Log LOG = LogFactory.getLog(ReduceTask.class.getName());
   81     private int numMaps;
   82     private ReduceCopier reduceCopier;
   83   
   84     { 
   85       getProgress().setStatus("reduce"); 
   86       setPhase(TaskStatus.Phase.SHUFFLE);        // phase to start with 
   87     }
   88   
   89     private Progress copyPhase = getProgress().addPhase("copy");
   90     private Progress sortPhase  = getProgress().addPhase("sort");
   91     private Progress reducePhase = getProgress().addPhase("reduce");
   92   
   93     public ReduceTask() {}
   94   
   95     public ReduceTask(String jobId, String jobFile, String tipId, String taskId,
   96                       int partition, int numMaps) {
   97       super(jobId, jobFile, tipId, taskId, partition);
   98       this.numMaps = numMaps;
   99     }
  100   
  101     public TaskRunner createRunner(TaskTracker tracker) throws IOException {
  102       return new ReduceTaskRunner(this, tracker, this.conf);
  103     }
  104   
  105     public boolean isMapTask() {
  106       return false;
  107     }
  108   
  109     public int getNumMaps() { return numMaps; }
  110     
  111     /**
  112      * Localize the given JobConf to be specific for this task.
  113      */
  114     public void localizeConfiguration(JobConf conf) throws IOException {
  115       super.localizeConfiguration(conf);
  116       conf.setNumMapTasks(numMaps);
  117     }
  118   
  119     public void write(DataOutput out) throws IOException {
  120       super.write(out);
  121   
  122       out.writeInt(numMaps);                        // write the number of maps
  123     }
  124   
  125     public void readFields(DataInput in) throws IOException {
  126       super.readFields(in);
  127   
  128       numMaps = in.readInt();
  129     }
  130   
  131     /** Iterates values while keys match in sorted input. */
  132     static class ValuesIterator implements Iterator {
  133       private SequenceFile.Sorter.RawKeyValueIterator in; //input iterator
  134       private WritableComparable key;               // current key
  135       private Writable value;                       // current value
  136       private boolean hasNext;                      // more w/ this key
  137       private boolean more;                         // more in file
  138       private WritableComparator comparator;
  139       private Class keyClass;
  140       private Class valClass;
  141       private Configuration conf;
  142       private DataOutputBuffer valOut = new DataOutputBuffer();
  143       private DataInputBuffer valIn = new DataInputBuffer();
  144       private DataInputBuffer keyIn = new DataInputBuffer();
  145       protected Reporter reporter;
  146   
  147       public ValuesIterator (SequenceFile.Sorter.RawKeyValueIterator in, 
  148                              WritableComparator comparator, Class keyClass,
  149                              Class valClass, Configuration conf, 
  150                              Reporter reporter)
  151         throws IOException {
  152         this.in = in;
  153         this.conf = conf;
  154         this.comparator = comparator;
  155         this.keyClass = keyClass;
  156         this.valClass = valClass;
  157         this.reporter = reporter;
  158         getNext();
  159       }
  160   
  161       /// Iterator methods
  162   
  163       public boolean hasNext() { return hasNext; }
  164   
  165       public Object next() {
  166         Object result = value;                      // save value
  167         try {
  168           getNext();                                  // move to next
  169         } catch (IOException e) {
  170           throw new RuntimeException(e);
  171         }
  172         reporter.progress();
  173         return result;                              // return saved value
  174       }
  175   
  176       public void remove() { throw new RuntimeException("not implemented"); }
  177   
  178       /// Auxiliary methods
  179   
  180       /** Start processing next unique key. */
  181       public void nextKey() {
  182         while (hasNext) { next(); }                 // skip any unread
  183         hasNext = more;
  184       }
  185   
  186       /** True iff more keys remain. */
  187       public boolean more() { return more; }
  188   
  189       /** The current key. */
  190       public WritableComparable getKey() { return key; }
  191   
  192       private void getNext() throws IOException {
  193         Writable lastKey = key;                     // save previous key
  194         try {
  195           key = (WritableComparable)ReflectionUtils.newInstance(keyClass, this.conf);
  196           value = (Writable)ReflectionUtils.newInstance(valClass, this.conf);
  197         } catch (Exception e) {
  198           throw new RuntimeException(e);
  199         }
  200         more = in.next();
  201         if (more) {
  202           //de-serialize the raw key/value
  203           keyIn.reset(in.getKey().getData(), in.getKey().getLength());
  204           key.readFields(keyIn);
  205           valOut.reset();
  206           (in.getValue()).writeUncompressedBytes(valOut);
  207           valIn.reset(valOut.getData(), valOut.getLength());
  208           value.readFields(valIn);
  209   
  210           if (lastKey == null) {
  211             hasNext = true;
  212           } else {
  213             hasNext = (comparator.compare(key, lastKey) == 0);
  214           }
  215         } else {
  216           hasNext = false;
  217         }
  218       }
  219     }
  220     private class ReduceValuesIterator extends ValuesIterator {
  221       public ReduceValuesIterator (SequenceFile.Sorter.RawKeyValueIterator in,
  222                                    WritableComparator comparator, Class keyClass,
  223                                    Class valClass,
  224                                    Configuration conf, Reporter reporter)
  225         throws IOException {
  226         super(in, comparator, keyClass, valClass, conf, reporter);
  227       }
  228       public void informReduceProgress() {
  229         reducePhase.set(super.in.getProgress().get()); // update progress
  230         reporter.progress();
  231       }
  232       public Object next() {
  233         reporter.incrCounter(REDUCE_INPUT_RECORDS, 1);
  234         return super.next();
  235       }
  236     }
  237   
  238     public void run(JobConf job, final TaskUmbilicalProtocol umbilical)
  239       throws IOException {
  240       Reducer reducer = (Reducer)ReflectionUtils.newInstance(
  241                                                              job.getReducerClass(), job);
  242   
  243       // start thread that will handle communication with parent
  244       startCommunicationThread(umbilical);
  245   
  246       FileSystem lfs = FileSystem.getLocal(job);
  247       if (!job.get("mapred.job.tracker", "local").equals("local")) {
  248         reduceCopier = new ReduceCopier(umbilical, job);
  249         if (!reduceCopier.fetchOutputs()) {
  250           throw new IOException(getTaskId() + "The reduce copier failed");
  251         }
  252       }
  253       copyPhase.complete();                         // copy is already complete
  254       
  255   
  256       // open a file to collect map output
  257       // since we don't know how many map outputs got merged in memory, we have
  258       // to check whether a given map output exists, and if it does, add it in
  259       // the list of files to merge, otherwise not.
  260       List<Path> mapFilesList = new ArrayList<Path>();
  261       for(int i=0; i < numMaps; i++) {
  262         Path f;
  263         try {
  264           //catch and ignore DiskErrorException, since some map outputs will
  265           //really be absent (inmem merge).
  266           f = mapOutputFile.getInputFile(i, getTaskId());
  267         } catch (DiskErrorException d) { 
  268           continue;
  269         }
  270         if (lfs.exists(f))
  271           mapFilesList.add(f);
  272       }
  273       Path[] mapFiles = new Path[mapFilesList.size()];
  274       mapFiles = mapFilesList.toArray(mapFiles);
  275       
  276       Path tempDir = new Path(getTaskId()); 
  277   
  278       SequenceFile.Sorter.RawKeyValueIterator rIter;
  279    
  280       setPhase(TaskStatus.Phase.SORT); 
  281   
  282       final Reporter reporter = getReporter(umbilical);
  283       
  284       // sort the input file
  285       SequenceFile.Sorter sorter = new SequenceFile.Sorter(lfs, 
  286           job.getOutputKeyComparator(), job.getMapOutputValueClass(), job);
  287       sorter.setProgressable(reporter);
  288       rIter = sorter.merge(mapFiles, tempDir, 
  289           !conf.getKeepFailedTaskFiles()); // sort
  290   
  291       sortPhase.complete();                         // sort is complete
  292       setPhase(TaskStatus.Phase.REDUCE); 
  293   
  294       // make output collector
  295       String finalName = getOutputName(getPartition());
  296       FileSystem fs = FileSystem.get(job);
  297   
  298       final RecordWriter out = 
  299         job.getOutputFormat().getRecordWriter(fs, job, finalName, reporter);  
  300       
  301       OutputCollector collector = new OutputCollector() {
  302           public void collect(WritableComparable key, Writable value)
  303             throws IOException {
  304             out.write(key, value);
  305             reporter.incrCounter(REDUCE_OUTPUT_RECORDS, 1);
  306             // indicate that progress update needs to be sent
  307             reporter.progress();
  308           }
  309         };
  310       
  311       // apply reduce function
  312       try {
  313         Class keyClass = job.getMapOutputKeyClass();
  314         Class valClass = job.getMapOutputValueClass();
  315         
  316         ReduceValuesIterator values = new ReduceValuesIterator(rIter, 
  317             job.getOutputValueGroupingComparator(), keyClass, valClass, 
  318             job, reporter);
  319         values.informReduceProgress();
  320         while (values.more()) {
  321           reporter.incrCounter(REDUCE_INPUT_GROUPS, 1);
  322           reducer.reduce(values.getKey(), values, collector, reporter);
  323           values.nextKey();
  324           values.informReduceProgress();
  325         }
  326   
  327         //Clean up: repeated in catch block below
  328         reducer.close();
  329         out.close(reporter);
  330         //End of clean up.
  331       } catch (IOException ioe) {
  332         try {
  333           reducer.close();
  334         } catch (IOException ignored) {}
  335           
  336         try {
  337           out.close(reporter);
  338         } catch (IOException ignored) {}
  339         
  340         throw ioe;
  341       }
  342       done(umbilical);
  343     }
  344   
  345     class ReduceCopier implements MRConstants {
  346   
  347       /** Reference to the umbilical object */
  348       private TaskUmbilicalProtocol umbilical;
  349       
  350       /** Reference to the task object */
  351       
  352       /** Number of ms before timing out a copy */
  353       private static final int STALLED_COPY_TIMEOUT = 3 * 60 * 1000;
  354       
  355       /**
  356        * our reduce task instance
  357        */
  358       private ReduceTask reduceTask;
  359       
  360       /**
  361        * the list of map outputs currently being copied
  362        */
  363       private List<MapOutputLocation> scheduledCopies;
  364       
  365       /**
  366        *  the results of dispatched copy attempts
  367        */
  368       private List<CopyResult> copyResults;
  369       
  370       /**
  371        *  the number of outputs to copy in parallel
  372        */
  373       private int numCopiers;
  374       
  375       /**
  376        * the maximum amount of time (less 1 minute) to wait to 
  377        * contact a host after a copy from it fails. We wait for (1 min +
  378        * Random.nextInt(maxBackoff)) seconds.
  379        */
  380       private int maxBackoff;
  381       
  382       /**
  383        * busy hosts from which copies are being backed off
  384        * Map of host -> next contact time
  385        */
  386       private Map<String, Long> penaltyBox;
  387       
  388       /**
  389        * the set of unique hosts from which we are copying
  390        */
  391       private Set<String> uniqueHosts;
  392       
  393       /**
  394        * the last time we polled the job tracker
  395        */
  396       private long lastPollTime;
  397       
  398       /**
  399        * A reference to the in memory file system for writing the map outputs to.
  400        */
  401       private InMemoryFileSystem inMemFileSys;
  402       
  403       /**
  404        * A reference to the local file system for writing the map outputs to.
  405        */
  406       private FileSystem localFileSys;
  407       
  408       /**
  409        * An instance of the sorter used for doing merge
  410        */
  411       private SequenceFile.Sorter sorter;
  412       
  413       /**
  414        * A reference to the throwable object (if merge throws an exception)
  415        */
  416       private volatile Throwable mergeThrowable;
  417       
  418       /** 
  419        * A flag to indicate that merge is in progress
  420        */
  421       private volatile boolean mergeInProgress = false;
  422       
  423       /**
  424        * When we accumulate mergeThreshold number of files in ram, we merge/spill
  425        */
  426       private int mergeThreshold = 500;
  427       
  428       /**
  429        * The threads for fetching the files.
  430        */
  431       private MapOutputCopier[] copiers = null;
  432       
  433       /**
  434        * The object for metrics reporting.
  435        */
  436       private ShuffleClientMetrics shuffleClientMetrics = null;
  437       
  438       /**
  439        * the minimum interval between tasktracker polls
  440        */
  441       private static final long MIN_POLL_INTERVAL = 1000;
  442       
  443       /**
  444        * the number of map output locations to poll for at one time
  445        */  
  446       private int probe_sample_size = 100;
  447       
  448       /**
  449        * a list of map output locations for fetch retrials 
  450        */
  451       private List<MapOutputLocation> retryFetches =
  452         new ArrayList<MapOutputLocation>();
  453       
  454       /** 
  455        * The set of required map outputs
  456        */
  457       private Set <Integer> neededOutputs = 
  458         Collections.synchronizedSet(new TreeSet<Integer>());
  459       
  460       /** 
  461       * The set of obsolete map taskids.
  462       */
  463       private Set <String> obsoleteMapIds = 
  464         Collections.synchronizedSet(new TreeSet<String>());
  465   
  466       private Random random = null;
  467       
  468       /**
  469        * the max size of the merge output from ramfs
  470        */
  471       private long ramfsMergeOutputSize;
  472       
  473       /**
  474        * This class contains the methods that should be used for metrics-reporting
  475        * the specific metrics for shuffle. This class actually reports the
  476        * metrics for the shuffle client (the ReduceTask), and hence the name
  477        * ShuffleClientMetrics.
  478        */
  479       class ShuffleClientMetrics implements Updater {
  480         private MetricsRecord shuffleMetrics = null;
  481         private int numFailedFetches = 0;
  482         private int numSuccessFetches = 0;
  483         private long numBytes = 0;
  484         private int numThreadsBusy = 0;
  485         ShuffleClientMetrics(JobConf conf) {
  486           MetricsContext metricsContext = MetricsUtil.getContext("mapred");
  487           this.shuffleMetrics = 
  488             MetricsUtil.createRecord(metricsContext, "shuffleInput");
  489           this.shuffleMetrics.setTag("user", conf.getUser());
  490           this.shuffleMetrics.setTag("jobName", conf.getJobName());
  491           this.shuffleMetrics.setTag("jobId", ReduceTask.this.getJobId());
  492           this.shuffleMetrics.setTag("taskId", getTaskId());
  493           this.shuffleMetrics.setTag("sessionId", conf.getSessionId());
  494           metricsContext.registerUpdater(this);
  495         }
  496         public synchronized void inputBytes(long numBytes) {
  497           this.numBytes += numBytes;
  498         }
  499         public synchronized void failedFetch() {
  500           ++numFailedFetches;
  501         }
  502         public synchronized void successFetch() {
  503           ++numSuccessFetches;
  504         }
  505         public synchronized void threadBusy() {
  506           ++numThreadsBusy;
  507         }
  508         public synchronized void threadFree() {
  509           --numThreadsBusy;
  510         }
  511         public void doUpdates(MetricsContext unused) {
  512           synchronized (this) {
  513             shuffleMetrics.incrMetric("shuffle_input_bytes", numBytes);
  514             shuffleMetrics.incrMetric("shuffle_failed_fetches", 
  515                                       numFailedFetches);
  516             shuffleMetrics.incrMetric("shuffle_success_fetches", 
  517                                       numSuccessFetches);
  518             if (numCopiers != 0) {
  519               shuffleMetrics.setMetric("shuffle_fetchers_busy_percent",
  520                   100*((float)numThreadsBusy/numCopiers));
  521             } else {
  522               shuffleMetrics.setMetric("shuffle_fetchers_busy_percent", 0);
  523             }
  524             numBytes = 0;
  525             numSuccessFetches = 0;
  526             numFailedFetches = 0;
  527           }
  528           shuffleMetrics.update();
  529         }
  530       }
  531   
  532       /** Represents the result of an attempt to copy a map output */
  533       private class CopyResult {
  534         
  535         // the map output location against which a copy attempt was made
  536         private final MapOutputLocation loc;
  537         
  538         // the size of the file copied, -1 if the transfer failed
  539         private final long size;
  540         
  541         //a flag signifying whether a copy result is obsolete
  542         private static final int OBSOLETE = -2;
  543         
  544         CopyResult(MapOutputLocation loc, long size) {
  545           this.loc = loc;
  546           this.size = size;
  547         }
  548         
  549         public int getMapId() { return loc.getMapId(); }
  550         public boolean getSuccess() { return size >= 0; }
  551         public boolean isObsolete() { 
  552           return size == OBSOLETE;
  553         }
  554         public long getSize() { return size; }
  555         public String getHost() { return loc.getHost(); }
  556         public MapOutputLocation getLocation() { return loc; }
  557       }
  558       
  559       private int extractMapIdFromPathName(Path pathname) {
  560         //all paths end with map_<id>.out
  561         String firstPathName = pathname.getName();
  562         int beginIndex = firstPathName.lastIndexOf("map_");
  563         int endIndex = firstPathName.lastIndexOf(".out");
  564         return Integer.parseInt(firstPathName.substring(beginIndex +
  565                                 "map_".length(), endIndex));
  566       }
  567       
  568       private int nextMapOutputCopierId = 0;
  569       
  570       /** Copies map outputs as they become available */
  571       private class MapOutputCopier extends Thread {
  572         
  573         private MapOutputLocation currentLocation = null;
  574         private int id = nextMapOutputCopierId++;
  575         private Reporter reporter;
  576         
  577         public MapOutputCopier(Reporter reporter) {
  578           setName("MapOutputCopier " + reduceTask.getTaskId() + "." + id);
  579           LOG.debug(getName() + " created");
  580           this.reporter = reporter;
  581         }
  582         
  583         /**
  584          * Fail the current file that we are fetching
  585          * @return were we currently fetching?
  586          */
  587         public synchronized boolean fail() {
  588           if (currentLocation != null) {
  589             finish(-1);
  590             return true;
  591           } else {
  592             return false;
  593           }
  594         }
  595         
  596         /**
  597          * Get the current map output location.
  598          */
  599         public synchronized MapOutputLocation getLocation() {
  600           return currentLocation;
  601         }
  602         
  603         private synchronized void start(MapOutputLocation loc) {
  604           currentLocation = loc;
  605         }
  606         
  607         private synchronized void finish(long size) {
  608           if (currentLocation != null) {
  609             LOG.debug(getName() + " finishing " + currentLocation + " =" + size);
  610             synchronized (copyResults) {
  611               copyResults.add(new CopyResult(currentLocation, size));
  612               copyResults.notify();
  613             }
  614             currentLocation = null;
  615           }
  616         }
  617         
  618         /** Loop forever and fetch map outputs as they become available.
  619          * The thread exits when it is interrupted by {@link ReduceTaskRunner}
  620          */
  621         public void run() {
  622           while (true) {        
  623             try {
  624               MapOutputLocation loc = null;
  625               long size = -1;
  626               
  627               synchronized (scheduledCopies) {
  628                 while (scheduledCopies.isEmpty()) {
  629                   scheduledCopies.wait();
  630                 }
  631                 loc = scheduledCopies.remove(0);
  632               }
  633               
  634               try {
  635                 shuffleClientMetrics.threadBusy();
  636                 start(loc);
  637                 size = copyOutput(loc);
  638                 shuffleClientMetrics.successFetch();
  639               } catch (IOException e) {
  640                 LOG.warn(reduceTask.getTaskId() + " copy failed: " +
  641                          loc.getMapTaskId() + " from " + loc.getHost());
  642                 LOG.warn(StringUtils.stringifyException(e));
  643                 shuffleClientMetrics.failedFetch();
  644   
  645                 // Reset
  646                 size = -1;
  647               } finally {
  648                 shuffleClientMetrics.threadFree();
  649                 finish(size);
  650               }
  651             } catch (InterruptedException e) { 
  652               return; // ALL DONE
  653             } catch (Throwable th) {
  654               LOG.error("Map output copy failure: " + 
  655                         StringUtils.stringifyException(th));
  656             }
  657           }
  658         }
  659         
  660         /** Copies a a map output from a remote host, via HTTP. 
  661          * @param currentLocation the map output location to be copied
  662          * @return the path (fully qualified) of the copied file
  663          * @throws IOException if there is an error copying the file
  664          * @throws InterruptedException if the copier should give up
  665          */
  666         private long copyOutput(MapOutputLocation loc
  667                                 ) throws IOException, InterruptedException {
  668           // check if we still need to copy the output from this location
  669           if (!neededOutputs.contains(loc.getMapId()) || 
  670               obsoleteMapIds.contains(loc.getMapTaskId())) {
  671             return CopyResult.OBSOLETE;
  672           }
  673           
  674           String reduceId = reduceTask.getTaskId();
  675           LOG.info(reduceId + " Copying " + loc.getMapTaskId() +
  676                    " output from " + loc.getHost() + ".");
  677           // a temp filename. If this file gets created in ramfs, we're fine,
  678           // else, we will check the localFS to find a suitable final location
  679           // for this path
  680           Path filename = new Path("/" + reduceId + "/map_" +
  681                                    loc.getMapId() + ".out");
  682           // a working filename that will be unique to this attempt
  683           Path tmpFilename = new Path(filename + "-" + id);
  684           // this copies the map output file
  685           tmpFilename = loc.getFile(inMemFileSys, localFileSys, shuffleClientMetrics,
  686                                     tmpFilename, lDirAlloc, 
  687                                     conf, reduceTask.getPartition(), 
  688                                     STALLED_COPY_TIMEOUT, reporter);
  689           if (!neededOutputs.contains(loc.getMapId())) {
  690             if (tmpFilename != null) {
  691               FileSystem fs = tmpFilename.getFileSystem(conf);
  692               fs.delete(tmpFilename);
  693             }
  694             return CopyResult.OBSOLETE;
  695           }
  696           if (tmpFilename == null)
  697             throw new IOException("File " + filename + "-" + id + 
  698                                   " not created");
  699           long bytes = -1;
  700           // lock the ReduceTask while we do the rename
  701           synchronized (ReduceTask.this) {
  702             // This file could have been created in the inmemory
  703             // fs or the localfs. So need to get the filesystem owning the path. 
  704             FileSystem fs = tmpFilename.getFileSystem(conf);
  705             if (!neededOutputs.contains(loc.getMapId())) {
  706               fs.delete(tmpFilename);
  707               return CopyResult.OBSOLETE;
  708             }
  709             
  710             bytes = fs.getLength(tmpFilename);
  711             //resolve the final filename against the directory where the tmpFile
  712             //got created
  713             filename = new Path(tmpFilename.getParent(), filename.getName());
  714             // if we can't rename the file, something is broken (and IOException
  715             // will be thrown). 
  716             if (!fs.rename(tmpFilename, filename)) {
  717               fs.delete(tmpFilename);
  718               bytes = -1;
  719               throw new IOException("failure to rename map output " + 
  720                                     tmpFilename);
  721             }
  722             
  723             LOG.info(reduceId + " done copying " + loc.getMapTaskId() +
  724                      " output from " + loc.getHost() + ".");
  725             //Create a thread to do merges. Synchronize access/update to 
  726             //mergeInProgress
  727             if (!mergeInProgress && 
  728                 (inMemFileSys.getPercentUsed() >= MAX_INMEM_FILESYS_USE || 
  729                  (mergeThreshold > 0 && 
  730                   inMemFileSys.getNumFiles(MAP_OUTPUT_FILTER) >= 
  731                   mergeThreshold))&&
  732                 mergeThrowable == null) {
  733               LOG.info(reduceId + " InMemoryFileSystem " + 
  734                        inMemFileSys.getUri().toString() +
  735                        " is " + inMemFileSys.getPercentUsed() + 
  736                        " full. Triggering merge");
  737               InMemFSMergeThread m = new InMemFSMergeThread(inMemFileSys,
  738                                                             (LocalFileSystem)localFileSys, sorter);
  739               m.setName("Thread for merging in memory files");
  740               m.setDaemon(true);
  741               mergeInProgress = true;
  742               m.start();
  743             }
  744             neededOutputs.remove(loc.getMapId());
  745           }
  746           return bytes;
  747         }
  748         
  749       }
  750       
  751       private void configureClasspath(JobConf conf)
  752         throws IOException {
  753         
  754         // get the task and the current classloader which will become the parent
  755         Task task = ReduceTask.this;
  756         ClassLoader parent = conf.getClassLoader();   
  757         
  758         // get the work directory which holds the elements we are dynamically
  759         // adding to the classpath
  760         File workDir = new File(task.getJobFile()).getParentFile();
  761         File jobCacheDir = new File(workDir.getParent(), "work");
  762         ArrayList<URL> urllist = new ArrayList<URL>();
  763         
  764         // add the jars and directories to the classpath
  765         String jar = conf.getJar();
  766         if (jar != null) {      
  767           File[] libs = new File(jobCacheDir, "lib").listFiles();
  768           if (libs != null) {
  769             for (int i = 0; i < libs.length; i++) {
  770               urllist.add(libs[i].toURL());
  771             }
  772           }
  773           urllist.add(new File(jobCacheDir, "classes").toURL());
  774           urllist.add(jobCacheDir.toURL());
  775           
  776         }
  777         urllist.add(workDir.toURL());
  778         
  779         // create a new classloader with the old classloader as its parent
  780         // then set that classloader as the one used by the current jobconf
  781         URL[] urls = urllist.toArray(new URL[urllist.size()]);
  782         URLClassLoader loader = new URLClassLoader(urls, parent);
  783         conf.setClassLoader(loader);
  784       }
  785       
  786       public ReduceCopier(TaskUmbilicalProtocol umbilical, JobConf conf)
  787         throws IOException {
  788         
  789         configureClasspath(conf);
  790         this.shuffleClientMetrics = new ShuffleClientMetrics(conf);
  791         this.umbilical = umbilical;      
  792         this.reduceTask = ReduceTask.this;
  793         this.scheduledCopies = new ArrayList<MapOutputLocation>(100);
  794         this.copyResults = new ArrayList<CopyResult>(100);    
  795         this.numCopiers = conf.getInt("mapred.reduce.parallel.copies", 5);
  796         this.maxBackoff = conf.getInt("mapred.reduce.copy.backoff", 300);
  797         this.mergeThreshold = conf.getInt("mapred.inmem.merge.threshold", 1000);
  798         
  799         //we want to distinguish inmem fs instances for different reduces. Hence,
  800         //append a unique string in the uri for the inmem fs name
  801         URI uri = URI.create("ramfs://mapoutput" + reduceTask.hashCode());
  802         inMemFileSys = (InMemoryFileSystem)FileSystem.get(uri, conf);
  803         LOG.info(reduceTask.getTaskId() + " Created an InMemoryFileSystem, uri: "
  804                  + uri);
  805         ramfsMergeOutputSize = (long)(MAX_INMEM_FILESYS_USE * 
  806                                       inMemFileSys.getFSSize());
  807         localFileSys = FileSystem.getLocal(conf);
  808         //create an instance of the sorter
  809         sorter =
  810           new SequenceFile.Sorter(inMemFileSys, conf.getOutputKeyComparator(), 
  811                                   conf.getMapOutputValueClass(), conf);
  812         sorter.setProgressable(getReporter(umbilical));
  813         
  814         // hosts -> next contact time
  815         this.penaltyBox = new Hashtable<String, Long>();
  816         
  817         // hostnames
  818         this.uniqueHosts = new HashSet<String>();
  819         
  820         this.lastPollTime = 0;
  821         
  822   
  823         // Seed the random number generator with a reasonably globally unique seed
  824         long randomSeed = System.nanoTime() + 
  825                           (long)Math.pow(this.reduceTask.getPartition(),
  826                                          (this.reduceTask.getPartition()%10)
  827                                         );
  828         this.random = new Random(randomSeed);
  829       }
  830       
  831       public boolean fetchOutputs() throws IOException {
  832         final int      numOutputs = reduceTask.getNumMaps();
  833         List<MapOutputLocation> knownOutputs = 
  834           new ArrayList<MapOutputLocation>(numCopiers);
  835         int            numInFlight = 0, numCopied = 0;
  836         int            lowThreshold = numCopiers*2;
  837         long           bytesTransferred = 0;
  838         DecimalFormat  mbpsFormat = new DecimalFormat("0.00");
  839         Random         backoff = new Random();
  840         final Progress copyPhase = 
  841           reduceTask.getProgress().phase();
  842         
  843         //tweak the probe sample size (make it a function of numCopiers)
  844         probe_sample_size = Math.max(numCopiers*5, 50);
  845         
  846         for (int i = 0; i < numOutputs; i++) {
  847           neededOutputs.add(i);
  848           copyPhase.addPhase();       // add sub-phase per file
  849         }
  850         
  851         copiers = new MapOutputCopier[numCopiers];
  852         
  853         Reporter reporter = getReporter(umbilical);
  854         // start all the copying threads
  855         for (int i=0; i < copiers.length; i++) {
  856           copiers[i] = new MapOutputCopier(reporter);
  857           copiers[i].start();
  858         }
  859         
  860         // start the clock for bandwidth measurement
  861         long startTime = System.currentTimeMillis();
  862         long currentTime = startTime;
  863         IntWritable fromEventId = new IntWritable(0);
  864         
  865         try {
  866           // loop until we get all required outputs
  867           while (!neededOutputs.isEmpty() && mergeThrowable == null) {
  868             
  869             LOG.info(reduceTask.getTaskId() + " Need " + neededOutputs.size() +
  870             " map output(s)");
  871             
  872             try {
  873               // Put the hash entries for the failed fetches. Entries here
  874               // might be replaced by (mapId) hashkeys from new successful 
  875               // Map executions, if the fetch failures were due to lost tasks.
  876               // The replacements, if at all, will happen when we query the
  877               // tasktracker and put the mapId hashkeys with new 
  878               // MapOutputLocations as values
  879               knownOutputs.addAll(retryFetches);
  880               
  881               // The call getMapCompletionEvents will update fromEventId to
  882               // used for the next call to getMapCompletionEvents
  883               int currentNumKnownMaps = knownOutputs.size();
  884               int currentNumObsoleteMapIds = obsoleteMapIds.size();
  885               getMapCompletionEvents(fromEventId, knownOutputs);
  886   
  887               LOG.info(reduceTask.getTaskId() + ": " +  
  888                        "Got " + (knownOutputs.size()-currentNumKnownMaps) + 
  889                        " new map-outputs & " + 
  890                        (obsoleteMapIds.size()-currentNumObsoleteMapIds) + 
  891                        " obsolete map-outputs from tasktracker and " + 
  892                        retryFetches.size() + " map-outputs from previous failures"
  893                       );
  894   
  895               // clear the "failed" fetches hashmap
  896               retryFetches.clear();
  897             }
  898             catch (IOException ie) {
  899               LOG.warn(reduceTask.getTaskId() +
  900                       " Problem locating map outputs: " +
  901                       StringUtils.stringifyException(ie));
  902             }
  903             
  904             // now walk through the cache and schedule what we can
  905             int numKnown = knownOutputs.size(), numScheduled = 0;
  906             int numSlow = 0, numDups = 0;
  907             
  908             LOG.info(reduceTask.getTaskId() + " Got " + numKnown + 
  909                      " known map output location(s); scheduling...");
  910             
  911             synchronized (scheduledCopies) {
  912               // Randomize the map output locations to prevent 
  913               // all reduce-tasks swamping the same tasktracker
  914               Collections.shuffle(knownOutputs, this.random);
  915               
  916               Iterator locIt = knownOutputs.iterator();
  917               
  918               currentTime = System.currentTimeMillis();
  919               while (locIt.hasNext()) {
  920                 
  921                 MapOutputLocation loc = (MapOutputLocation)locIt.next();
  922                 
  923                 // Do not schedule fetches from OBSOLETE maps
  924                 if (obsoleteMapIds.contains(loc.getMapTaskId())) {
  925                   locIt.remove();
  926                   continue;
  927                 }
  928   
  929                 Long penaltyEnd = penaltyBox.get(loc.getHost());
  930                 boolean penalized = false, duplicate = false;
  931                 
  932                 if (penaltyEnd != null && currentTime < penaltyEnd.longValue()) {
  933                   penalized = true; numSlow++;
  934                 }
  935                 if (uniqueHosts.contains(loc.getHost())) {
  936                   duplicate = true; numDups++;
  937                 }
  938                 
  939                 if (!penalized && !duplicate) {
  940                   uniqueHosts.add(loc.getHost());
  941                   scheduledCopies.add(loc);
  942                   locIt.remove();  // remove from knownOutputs
  943                   numInFlight++; numScheduled++;
  944                 }
  945               }
  946               scheduledCopies.notifyAll();
  947             }
  948             LOG.info(reduceTask.getTaskId() + " Scheduled " + numScheduled +
  949                      " of " + numKnown + " known outputs (" + numSlow +
  950                      " slow hosts and " + numDups + " dup hosts)");
  951             
  952             // if we have no copies in flight and we can't schedule anything
  953             // new, just wait for a bit
  954             try {
  955               if (numInFlight == 0 && numScheduled == 0) {
  956                 // we should indicate progress as we don't want TT to think
  957                 // we're stuck and kill us
  958                 reporter.progress();
  959                 Thread.sleep(5000);
  960               }
  961             } catch (InterruptedException e) { } // IGNORE
  962             
  963             while (numInFlight > 0 && mergeThrowable == null) {
  964               LOG.debug(reduceTask.getTaskId() + " numInFlight = " + 
  965                         numInFlight);
  966               CopyResult cr = getCopyResult();
  967               
  968               if (cr != null) {
  969                 if (cr.getSuccess()) {  // a successful copy
  970                   numCopied++;
  971                   bytesTransferred += cr.getSize();
  972                   
  973                   long secsSinceStart = 
  974                     (System.currentTimeMillis()-startTime)/1000+1;
  975                   float mbs = ((float)bytesTransferred)/(1024*1024);
  976                   float transferRate = mbs/secsSinceStart;
  977                   
  978                   copyPhase.startNextPhase();
  979                   copyPhase.setStatus("copy (" + numCopied + " of " + numOutputs 
  980                                       + " at " +
  981                                       mbpsFormat.format(transferRate) +  " MB/s)");          
  982                 } else if (cr.isObsolete()) {
  983                   //ignore
  984                   LOG.info(reduceTask.getTaskId() + 
  985                            " Ignoring obsolete copy result for Map Task: " + 
  986                            cr.getLocation().getMapTaskId() + " from host: " + 
  987                            cr.getHost());
  988                 } else {
  989                   retryFetches.add(cr.getLocation());
  990                   
  991                   // wait a random amount of time for next contact
  992                   currentTime = System.currentTimeMillis();
  993                   long nextContact = currentTime + 60 * 1000 +
  994                     backoff.nextInt(maxBackoff*1000);
  995                   penaltyBox.put(cr.getHost(), nextContact);          
  996                   LOG.warn(reduceTask.getTaskId() + " adding host " +
  997                            cr.getHost() + " to penalty box, next contact in " +
  998                            ((nextContact-currentTime)/1000) + " seconds");
  999                   
 1000                   // other outputs from the failed host may be present in the
 1001                   // knownOutputs cache, purge them. This is important in case
 1002                   // the failure is due to a lost tasktracker (causes many
 1003                   // unnecessary backoffs). If not, we only take a small hit
 1004                   // polling the tasktracker a few more times
 1005                   Iterator locIt = knownOutputs.iterator();
 1006                   while (locIt.hasNext()) {
 1007                     MapOutputLocation loc = (MapOutputLocation)locIt.next();
 1008                     if (cr.getHost().equals(loc.getHost())) {
 1009                       retryFetches.add(loc);
 1010                       locIt.remove();
 1011                     }
 1012                   }
 1013                 }
 1014                 uniqueHosts.remove(cr.getHost());
 1015                 numInFlight--;
 1016               }
 1017               
 1018               boolean busy = true;
 1019               // ensure we have enough to keep us busy
 1020               if (numInFlight < lowThreshold && (numOutputs-numCopied) > 
 1021                   probe_sample_size) {
 1022                 busy = false;
 1023               }
 1024               //Check whether we have more CopyResult to check. If there is none,
 1025               //and we are not busy enough, break
 1026               synchronized (copyResults) {
 1027                 if (copyResults.size() == 0 && !busy) {
 1028                   break;
 1029                 }
 1030               }
 1031             }
 1032             
 1033           }
 1034           
 1035           // all done, inform the copiers to exit
 1036           synchronized (copiers) {
 1037             synchronized (scheduledCopies) {
 1038               for (int i=0; i < copiers.length; i++) {
 1039                 copiers[i].interrupt();
 1040                 copiers[i] = null;
 1041               }
 1042             }
 1043           }
 1044           
 1045           //Do a merge of in-memory files (if there are any)
 1046           if (mergeThrowable == null) {
 1047             try {
 1048               //wait for an ongoing merge (if it is in flight) to complete
 1049               while (mergeInProgress) {
 1050                 Thread.sleep(200);
 1051               }
 1052               LOG.info(reduceTask.getTaskId() + 
 1053                        " Copying of all map outputs complete. " + 
 1054                        "Initiating the last merge on the remaining files in " + 
 1055                        inMemFileSys.getUri());
 1056               if (mergeThrowable != null) {
 1057                 //this could happen if the merge that
 1058                 //was in progress threw an exception
 1059                 throw mergeThrowable;
 1060               }
 1061               //initiate merge
 1062               Path[] inMemClosedFiles = inMemFileSys.getFiles(MAP_OUTPUT_FILTER);
 1063               if (inMemClosedFiles.length == 0) {
 1064                 LOG.info(reduceTask.getTaskId() + "Nothing to merge from " + 
 1065                          inMemFileSys.getUri());
 1066                 return neededOutputs.isEmpty();
 1067               }
 1068               //name this output file same as the name of the first file that is 
 1069               //there in the current list of inmem files (this is guaranteed to be
 1070               //absent on the disk currently. So we don't overwrite a prev. 
 1071               //created spill). Also we need to create the output file now since
 1072               //it is not guaranteed that this file will be present after merge
 1073               //is called (we delete empty sequence files as soon as we see them
 1074               //in the merge method)
 1075               int mapId = extractMapIdFromPathName(inMemClosedFiles[0]);
 1076               Path outputPath = mapOutputFile.getInputFileForWrite(mapId, 
 1077                                reduceTask.getTaskId(), ramfsMergeOutputSize);
 1078               SequenceFile.Writer writer = sorter.cloneFileAttributes(
 1079                                                                       inMemFileSys.makeQualified(inMemClosedFiles[0]), 
 1080                                                                       localFileSys.makeQualified(outputPath), null);
 1081               
 1082               SequenceFile.Sorter.RawKeyValueIterator rIter = null;
 1083               try {
 1084                 rIter = sorter.merge(inMemClosedFiles, true, 
 1085                                      inMemClosedFiles.length, 
 1086                                      new Path(reduceTask.getTaskId()));
 1087               } catch (Exception e) { 
 1088                 //make sure that we delete the ondisk file that we created earlier
 1089                 //when we invoked cloneFileAttributes
 1090                 writer.close();
 1091                 localFileSys.delete(inMemClosedFiles[0]);
 1092                 throw new IOException (StringUtils.stringifyException(e));
 1093               }
 1094               sorter.writeFile(rIter, writer);
 1095               writer.close();
 1096               LOG.info(reduceTask.getTaskId() +
 1097                        " Merge of the " +inMemClosedFiles.length +
 1098                        " files in InMemoryFileSystem complete." +
 1099                        " Local file is " + outputPath);
 1100             } catch (Throwable t) {
 1101               LOG.warn(reduceTask.getTaskId() +
 1102                        " Final merge of the inmemory files threw an exception: " + 
 1103                        StringUtils.stringifyException(t));
 1104               return false;
 1105             }
 1106           }
 1107           return mergeThrowable == null && neededOutputs.isEmpty();
 1108         } finally {
 1109           inMemFileSys.close();
 1110         }
 1111       }
 1112       
 1113       
 1114       private CopyResult getCopyResult() {  
 1115         synchronized (copyResults) {
 1116           while (copyResults.isEmpty()) {
 1117             try {
 1118               copyResults.wait();
 1119             } catch (InterruptedException e) { }
 1120           }
 1121           if (copyResults.isEmpty()) {
 1122             return null;
 1123           } else {
 1124             return copyResults.remove(0);
 1125           }
 1126         }    
 1127       }
 1128       
 1129       /** 
 1130        * Queries the task tracker for a set of map-completion events from
 1131        * a given event ID.
 1132        * @param fromEventId the first event ID we want to start from, this is
 1133        *                    modified by the call to this method
 1134        * @param jobClient the job tracker
 1135        * @return a set of locations to copy outputs from
 1136        * @throws IOException
 1137        */  
 1138       private void getMapCompletionEvents(IntWritable fromEventId, 
 1139                                           List<MapOutputLocation> knownOutputs)
 1140         throws IOException {
 1141         
 1142         long currentTime = System.currentTimeMillis();    
 1143         long pollTime = lastPollTime + MIN_POLL_INTERVAL;
 1144         while (currentTime < pollTime) {
 1145           try {
 1146             Thread.sleep(pollTime-currentTime);
 1147           } catch (InterruptedException ie) { } // IGNORE
 1148           currentTime = System.currentTimeMillis();
 1149         }
 1150         
 1151         TaskCompletionEvent events[] = 
 1152           umbilical.getMapCompletionEvents(reduceTask.getJobId(),
 1153                                            fromEventId.get(), probe_sample_size);
 1154         
 1155         // Note the last successful poll time-stamp
 1156         lastPollTime = currentTime;
 1157   
 1158         // Update the last seen event ID
 1159         fromEventId.set(fromEventId.get() + events.length);
 1160           
 1161         // Process the TaskCompletionEvents:
 1162         // 1. Save the SUCCEEDED maps in knownOutputs to fetch the outputs.
 1163         // 2. Save the OBSOLETE/FAILED/KILLED maps in obsoleteOutputs to stop fetching
 1164         //    from those maps.
 1165         // 3. Remove TIPFAILED maps from neededOutputs since we don't need their
 1166         //    outputs at all.
 1167         for (TaskCompletionEvent event : events) {
 1168           switch (event.getTaskStatus()) {
 1169             case SUCCEEDED:
 1170             {
 1171               URI u = URI.create(event.getTaskTrackerHttp());
 1172               String host = u.getHost();
 1173               int port = u.getPort();
 1174               String taskId = event.getTaskId();
 1175               int mId = event.idWithinJob();
 1176               knownOutputs.add(new MapOutputLocation(taskId, mId, host, port));
 1177             }
 1178             break;
 1179             case FAILED:
 1180             case KILLED:
 1181             case OBSOLETE:
 1182             {
 1183               obsoleteMapIds.add(event.getTaskId());
 1184               LOG.info("Ignoring obsolete output of " + event.getTaskStatus() + 
 1185                        " map-task: '" + event.getTaskId() + "'");
 1186             }
 1187             break;
 1188             case TIPFAILED:
 1189             {
 1190               neededOutputs.remove(event.idWithinJob());
 1191               LOG.info("Ignoring output of failed map TIP: '" +  
 1192                        event.getTaskId() + "'");
 1193             }
 1194             break;
 1195           }
 1196         }
 1197   
 1198       }
 1199       
 1200       
 1201       private class InMemFSMergeThread extends Thread {
 1202         private InMemoryFileSystem inMemFileSys;
 1203         private LocalFileSystem localFileSys;
 1204         private SequenceFile.Sorter sorter;
 1205         
 1206         public InMemFSMergeThread(InMemoryFileSystem inMemFileSys, 
 1207                                   LocalFileSystem localFileSys, SequenceFile.Sorter sorter) {
 1208           this.inMemFileSys = inMemFileSys;
 1209           this.localFileSys = localFileSys;
 1210           this.sorter = sorter;
 1211         }
 1212         public void run() {
 1213           LOG.info(reduceTask.getTaskId() + " Thread started: " + getName());
 1214           try {
 1215             //initiate merge
 1216             Path[] inMemClosedFiles = inMemFileSys.getFiles(MAP_OUTPUT_FILTER);
 1217             //Note that the above Path[] could be of length 0 if all copies are 
 1218             //in flight. So we make sure that we have some 'closed' map
 1219             //output files to merge to get the benefit of in-memory merge
 1220             if (inMemClosedFiles.length >= 
 1221                 (int)(MAX_INMEM_FILESYS_USE/MAX_INMEM_FILESIZE_FRACTION)) {
 1222               //name this output file same as the name of the first file that is 
 1223               //there in the current list of inmem files (this is guaranteed to
 1224               //be absent on the disk currently. So we don't overwrite a prev. 
 1225               //created spill). Also we need to create the output file now since
 1226               //it is not guaranteed that this file will be present after merge
 1227               //is called (we delete empty sequence files as soon as we see them
 1228               //in the merge method)
 1229   
 1230               //figure out the mapId 
 1231               int mapId = extractMapIdFromPathName(inMemClosedFiles[0]);
 1232               
 1233               Path outputPath = mapOutputFile.getInputFileForWrite(mapId, 
 1234                                 reduceTask.getTaskId(), ramfsMergeOutputSize);
 1235   
 1236               SequenceFile.Writer writer = sorter.cloneFileAttributes(
 1237                                                                       inMemFileSys.makeQualified(inMemClosedFiles[0]), 
 1238                                                                       localFileSys.makeQualified(outputPath), null);
 1239               SequenceFile.Sorter.RawKeyValueIterator rIter;
 1240               try {
 1241                 rIter = sorter.merge(inMemClosedFiles, true, 
 1242                                      inMemClosedFiles.length, new Path(reduceTask.getTaskId()));
 1243               } catch (Exception e) { 
 1244                 //make sure that we delete the ondisk file that we created 
 1245                 //earlier when we invoked cloneFileAttributes
 1246                 writer.close();
 1247                 localFileSys.delete(outputPath);
 1248                 throw new IOException (StringUtils.stringifyException(e));
 1249               }
 1250               sorter.writeFile(rIter, writer);
 1251               writer.close();
 1252               LOG.info(reduceTask.getTaskId() + 
 1253                        " Merge of the " +inMemClosedFiles.length +
 1254                        " files in InMemoryFileSystem complete." +
 1255                        " Local file is " + outputPath);
 1256             }
 1257             else {
 1258               LOG.info(reduceTask.getTaskId() + " Nothing to merge from " + 
 1259                        inMemFileSys.getUri());
 1260             }
 1261           } catch (Throwable t) {
 1262             LOG.warn(reduceTask.getTaskId() +
 1263                      " Intermediate Merge of the inmemory files threw an exception: "
 1264                      + StringUtils.stringifyException(t));
 1265             ReduceCopier.this.mergeThrowable = t;
 1266           }
 1267           finally {
 1268             mergeInProgress = false;
 1269           }
 1270         }
 1271       }
 1272       final private PathFilter MAP_OUTPUT_FILTER = new PathFilter() {
 1273           public boolean accept(Path file) {
 1274             return file.toString().endsWith(".out");
 1275           }     
 1276         };
 1277     }
 1278   }

Save This Page
Home » hadoop-0.14.4 » org.apache » hadoop » mapred » [javadoc | source]