Save This Page
Home » hadoop-0.14.4 » org.apache » hadoop » mapred » [javadoc | source]
    1   /**
    2    * Licensed to the Apache Software Foundation (ASF) under one
    3    * or more contributor license agreements.  See the NOTICE file
    4    * distributed with this work for additional information
    5    * regarding copyright ownership.  The ASF licenses this file
    6    * to you under the Apache License, Version 2.0 (the
    7    * "License"); you may not use this file except in compliance
    8    * with the License.  You may obtain a copy of the License at
    9    *
   10    *     http://www.apache.org/licenses/LICENSE-2.0
   11    *
   12    * Unless required by applicable law or agreed to in writing, software
   13    * distributed under the License is distributed on an "AS IS" BASIS,
   14    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   15    * See the License for the specific language governing permissions and
   16    * limitations under the License.
   17    */
   18   
   19   package org.apache.hadoop.mapred;
   20   
   21   import java.io.DataInput;
   22   import java.io.DataOutput;
   23   import java.io.DataOutputStream;
   24   import java.io.IOException;
   25   import java.util.ArrayList;
   26   import java.util.List;
   27   
   28   import org.apache.commons.logging.Log;
   29   import org.apache.commons.logging.LogFactory;
   30   import org.apache.hadoop.conf.Configuration;
   31   import org.apache.hadoop.fs.FSDataInputStream;
   32   import org.apache.hadoop.fs.FSDataOutputStream;
   33   import org.apache.hadoop.fs.FileSystem;
   34   import org.apache.hadoop.fs.Path;
   35   import org.apache.hadoop.io.BytesWritable;
   36   import org.apache.hadoop.io.DataInputBuffer;
   37   import org.apache.hadoop.io.DataOutputBuffer;
   38   import org.apache.hadoop.io.SequenceFile;
   39   import org.apache.hadoop.io.Text;
   40   import org.apache.hadoop.io.Writable;
   41   import org.apache.hadoop.io.WritableComparable;
   42   import org.apache.hadoop.io.WritableComparator;
   43   import org.apache.hadoop.io.SequenceFile.CompressionType;
   44   import org.apache.hadoop.io.SequenceFile.Sorter;
   45   import org.apache.hadoop.io.SequenceFile.Writer;
   46   import org.apache.hadoop.io.SequenceFile.Sorter.RawKeyValueIterator;
   47   import org.apache.hadoop.io.SequenceFile.Sorter.SegmentDescriptor;
   48   import org.apache.hadoop.io.compress.CompressionCodec;
   49   import org.apache.hadoop.io.compress.DefaultCodec;
   50   import org.apache.hadoop.mapred.ReduceTask.ValuesIterator;
   51   import org.apache.hadoop.util.ReflectionUtils;
   52   import org.apache.hadoop.util.StringUtils;
   53   
   54   import static org.apache.hadoop.mapred.Task.Counter.*;
   55   
   56   /** A Map task. */
   57   class MapTask extends Task {
   58   
   59     private BytesWritable split = new BytesWritable();
   60     private String splitClass;
   61     private InputSplit instantiatedSplit = null;
   62     private final static int APPROX_HEADER_LENGTH = 150;
   63   
   64     private static final Log LOG = LogFactory.getLog(MapTask.class.getName());
   65   
   66     {   // set phase for this task
   67       setPhase(TaskStatus.Phase.MAP); 
   68     }
   69   
   70     public MapTask() {}
   71   
   72     public MapTask(String jobId, String jobFile, String tipId, String taskId, 
   73                    int partition, String splitClass, BytesWritable split
   74                    ) throws IOException {
   75       super(jobId, jobFile, tipId, taskId, partition);
   76       this.splitClass = splitClass;
   77       this.split.set(split);
   78     }
   79   
   80     public boolean isMapTask() {
   81       return true;
   82     }
   83   
   84     public void localizeConfiguration(JobConf conf) throws IOException {
   85       super.localizeConfiguration(conf);
   86       Path localSplit = new Path(new Path(getJobFile()).getParent(), 
   87                                  "split.dta");
   88       LOG.debug("Writing local split to " + localSplit);
   89       DataOutputStream out = FileSystem.getLocal(conf).create(localSplit);
   90       Text.writeString(out, splitClass);
   91       split.write(out);
   92       out.close();
   93     }
   94     
   95     public TaskRunner createRunner(TaskTracker tracker) {
   96       return new MapTaskRunner(this, tracker, this.conf);
   97     }
   98   
   99     public void write(DataOutput out) throws IOException {
  100       super.write(out);
  101       Text.writeString(out, splitClass);
  102       split.write(out);
  103     }
  104     
  105     public void readFields(DataInput in) throws IOException {
  106       super.readFields(in);
  107       splitClass = Text.readString(in);
  108       split.readFields(in);
  109     }
  110   
  111     InputSplit getInputSplit() throws UnsupportedOperationException {
  112       return instantiatedSplit;
  113     }
  114   
  115     public void run(final JobConf job, final TaskUmbilicalProtocol umbilical)
  116       throws IOException {
  117   
  118       final Reporter reporter = getReporter(umbilical);
  119   
  120       // start thread that will handle communication with parent
  121       startCommunicationThread(umbilical);
  122   
  123       int numReduceTasks = conf.getNumReduceTasks();
  124       LOG.info("numReduceTasks: " + numReduceTasks);
  125       MapOutputCollector collector = null;
  126       if (numReduceTasks > 0) {
  127         collector = new MapOutputBuffer(umbilical, job, reporter);
  128       } else { 
  129         collector = new DirectMapOutputCollector(umbilical, job, reporter);
  130       }
  131       // reinstantiate the split
  132       try {
  133         instantiatedSplit = (InputSplit) 
  134           ReflectionUtils.newInstance(job.getClassByName(splitClass), job);
  135       } catch (ClassNotFoundException exp) {
  136         IOException wrap = new IOException("Split class " + splitClass + 
  137                                            " not found");
  138         wrap.initCause(exp);
  139         throw wrap;
  140       }
  141       DataInputBuffer splitBuffer = new DataInputBuffer();
  142       splitBuffer.reset(split.get(), 0, split.getSize());
  143       instantiatedSplit.readFields(splitBuffer);
  144       
  145       // if it is a file split, we can give more details
  146       if (instantiatedSplit instanceof FileSplit) {
  147         FileSplit fileSplit = (FileSplit) instantiatedSplit;
  148         job.set("map.input.file", fileSplit.getPath().toString());
  149         job.setLong("map.input.start", fileSplit.getStart());
  150         job.setLong("map.input.length", fileSplit.getLength());
  151       }
  152         
  153       final RecordReader rawIn =                  // open input
  154         job.getInputFormat().getRecordReader(instantiatedSplit, job, reporter);
  155   
  156       RecordReader in = new RecordReader() {      // wrap in progress reporter
  157   
  158           public WritableComparable createKey() {
  159             return rawIn.createKey();
  160           }
  161             
  162           public Writable createValue() {
  163             return rawIn.createValue();
  164           }
  165            
  166           public synchronized boolean next(Writable key, Writable value)
  167             throws IOException {
  168   
  169             setProgress(getProgress());
  170             long beforePos = getPos();
  171             boolean ret = rawIn.next(key, value);
  172             if (ret) {
  173               reporter.incrCounter(MAP_INPUT_RECORDS, 1);
  174               reporter.incrCounter(MAP_INPUT_BYTES, (getPos() - beforePos));
  175             }
  176             return ret;
  177           }
  178           public long getPos() throws IOException { return rawIn.getPos(); }
  179           public void close() throws IOException { rawIn.close(); }
  180           public float getProgress() throws IOException {
  181             return rawIn.getProgress();
  182           }
  183         };
  184   
  185       MapRunnable runner =
  186         (MapRunnable)ReflectionUtils.newInstance(job.getMapRunnerClass(), job);
  187   
  188       try {
  189         runner.run(in, collector, reporter);      
  190         collector.flush();
  191       } finally {
  192         //close
  193         in.close();                               // close input
  194         collector.close();
  195       }
  196       done(umbilical);
  197     }
  198   
  199     interface MapOutputCollector extends OutputCollector {
  200   
  201       public void close() throws IOException;
  202       
  203       public void flush() throws IOException;
  204           
  205     }
  206   
  207     class DirectMapOutputCollector implements MapOutputCollector {
  208   
  209       private RecordWriter out = null;
  210   
  211       private Reporter reporter = null;
  212   
  213       public DirectMapOutputCollector(TaskUmbilicalProtocol umbilical,
  214           JobConf job, Reporter reporter) throws IOException {
  215         this.reporter = reporter;
  216         String finalName = getOutputName(getPartition());
  217         FileSystem fs = FileSystem.get(job);
  218   
  219         out = job.getOutputFormat().getRecordWriter(fs, job, finalName, reporter);
  220       }
  221   
  222       public void close() throws IOException {
  223         if (this.out != null) {
  224           out.close(this.reporter);
  225         }
  226   
  227       }
  228   
  229       public void flush() throws IOException {
  230         // TODO Auto-generated method stub
  231         
  232       }
  233   
  234       public void collect(WritableComparable key, Writable value) throws IOException {
  235         this.out.write(key, value);
  236       }
  237       
  238     }
  239     
  240     class MapOutputBuffer implements MapOutputCollector {
  241   
  242       private final int partitions;
  243       private Partitioner partitioner;
  244       private JobConf job;
  245       private Reporter reporter;
  246       final private TaskUmbilicalProtocol umbilical;
  247   
  248       private DataOutputBuffer keyValBuffer; //the buffer where key/val will
  249                                              //be stored before they are 
  250                                              //spilled to disk
  251       private int maxBufferSize; //the max amount of in-memory space after which
  252                                  //we will spill the keyValBuffer to disk
  253       private int numSpills; //maintains the no. of spills to disk done so far
  254       
  255       private FileSystem localFs;
  256       private CompressionCodec codec;
  257       private CompressionType compressionType;
  258       private Class keyClass;
  259       private Class valClass;
  260       private WritableComparator comparator;
  261       private BufferSorter []sortImpl;
  262       private SequenceFile.Writer writer;
  263       private FSDataOutputStream out;
  264       private FSDataOutputStream indexOut;
  265       private long segmentStart;
  266       public MapOutputBuffer(TaskUmbilicalProtocol umbilical, JobConf job, 
  267                              Reporter reporter) throws IOException {
  268         this.partitions = job.getNumReduceTasks();
  269         this.partitioner = (Partitioner)ReflectionUtils.newInstance(
  270                                                                     job.getPartitionerClass(), job);
  271         maxBufferSize = job.getInt("io.sort.mb", 100) * 1024 * 1024;
  272         keyValBuffer = new DataOutputBuffer();
  273   
  274         this.job = job;
  275         this.reporter = reporter;
  276         this.umbilical = umbilical;
  277         this.comparator = job.getOutputKeyComparator();
  278         this.keyClass = job.getMapOutputKeyClass();
  279         this.valClass = job.getMapOutputValueClass();
  280         this.localFs = FileSystem.getLocal(job);
  281         this.codec = null;
  282         this.compressionType = CompressionType.NONE;
  283         if (job.getCompressMapOutput()) {
  284           // find the kind of compression to do, defaulting to record
  285           compressionType = job.getMapOutputCompressionType();
  286   
  287           // find the right codec
  288           Class codecClass = 
  289             job.getMapOutputCompressorClass(DefaultCodec.class);
  290           codec = (CompressionCodec) 
  291             ReflectionUtils.newInstance(codecClass, job);
  292         }
  293         sortImpl = new BufferSorter[partitions];
  294         for (int i = 0; i < partitions; i++)
  295           sortImpl[i] = (BufferSorter)ReflectionUtils.newInstance(
  296                                                                   job.getClass("map.sort.class", MergeSorter.class,
  297                                                                                BufferSorter.class), job);
  298       }
  299       
  300       private void startPartition(int partNumber) throws IOException {
  301         //We create the sort output as multiple sequence files within a spilled
  302         //file. So we create a writer for each partition. 
  303         segmentStart = out.getPos();
  304         writer =
  305           SequenceFile.createWriter(job, out, job.getMapOutputKeyClass(),
  306                                     job.getMapOutputValueClass(), compressionType, codec);
  307       }
  308       private void endPartition(int partNumber) throws IOException {
  309         //Need to close the file, especially if block compression is in use
  310         //We also update the index file to contain the part offsets per 
  311         //spilled file
  312         writer.close();
  313         indexOut.writeLong(segmentStart);
  314         //we also store 0 length key/val segments to make the merge phase easier.
  315         indexOut.writeLong(out.getPos()-segmentStart);
  316       }
  317       
  318       public void collect(WritableComparable key,
  319                           Writable value) throws IOException {
  320         
  321         if (key.getClass() != keyClass) {
  322           throw new IOException("Type mismatch in key from map: expected "
  323                                 + keyClass.getName() + ", recieved "
  324                                 + key.getClass().getName());
  325         }
  326         if (value.getClass() != valClass) {
  327           throw new IOException("Type mismatch in value from map: expected "
  328                                 + valClass.getName() + ", recieved "
  329                                 + value.getClass().getName());
  330         }
  331         
  332         synchronized (this) {
  333           if (keyValBuffer == null) {
  334             keyValBuffer = new DataOutputBuffer();
  335           }
  336           //dump the key/value to buffer
  337           int keyOffset = keyValBuffer.getLength(); 
  338           key.write(keyValBuffer);
  339           int keyLength = keyValBuffer.getLength() - keyOffset;
  340           value.write(keyValBuffer);
  341           int valLength = keyValBuffer.getLength() - (keyOffset + keyLength);
  342         
  343           int partNumber = partitioner.getPartition(key, value, partitions);
  344           sortImpl[partNumber].addKeyValue(keyOffset, keyLength, valLength);
  345   
  346           reporter.incrCounter(MAP_OUTPUT_RECORDS, 1);
  347           reporter.incrCounter(MAP_OUTPUT_BYTES,
  348                                (keyValBuffer.getLength() - keyOffset));
  349   
  350           //now check whether we need to spill to disk
  351           long totalMem = 0;
  352           for (int i = 0; i < partitions; i++)
  353             totalMem += sortImpl[i].getMemoryUtilized();
  354           if ((keyValBuffer.getLength() + totalMem) >= maxBufferSize) {
  355             sortAndSpillToDisk();
  356             //we don't reuse the keyValBuffer. We want to maintain consistency
  357             //in the memory model (for negligible performance loss).
  358             keyValBuffer = null;
  359             for (int i = 0; i < partitions; i++) {
  360               sortImpl[i].close();
  361             }
  362           }
  363         }
  364       }
  365       
  366       //sort, combine and spill to disk
  367       private void sortAndSpillToDisk() throws IOException {
  368         synchronized (this) {
  369           //approximate the length of the output file to be the length of the
  370           //buffer + header lengths for the partitions
  371           long size = keyValBuffer.getLength() + 
  372                       partitions * APPROX_HEADER_LENGTH;
  373           Path filename = mapOutputFile.getSpillFileForWrite(getTaskId(), 
  374                                         numSpills, size);
  375           //we just create the FSDataOutputStream object here.
  376           out = localFs.create(filename);
  377           Path indexFilename = mapOutputFile.getSpillIndexFileForWrite(
  378                                getTaskId(), numSpills, partitions * 16);
  379           indexOut = localFs.create(indexFilename);
  380           LOG.debug("opened "+
  381                     mapOutputFile.getSpillFile(getTaskId(), numSpills).getName());
  382             
  383           //invoke the sort
  384           for (int i = 0; i < partitions; i++) {
  385             sortImpl[i].setInputBuffer(keyValBuffer);
  386             sortImpl[i].setProgressable(reporter);
  387             RawKeyValueIterator rIter = sortImpl[i].sort();
  388             
  389             startPartition(i);
  390             if (rIter != null) {
  391               //invoke the combiner if one is defined
  392               if (job.getCombinerClass() != null) {
  393                 //we instantiate and close the combiner for each partition. This
  394                 //is required for streaming where the combiner runs as a separate
  395                 //process and we want to make sure that the combiner process has
  396                 //got all the input key/val, processed, and output the result 
  397                 //key/vals before we write the partition header in the output file
  398                 Reducer combiner = (Reducer)ReflectionUtils.newInstance(
  399                                                                         job.getCombinerClass(), job);
  400                 // make collector
  401                 OutputCollector combineCollector = new OutputCollector() {
  402                     public void collect(WritableComparable key, Writable value)
  403                       throws IOException {
  404                       synchronized (this) {
  405                         writer.append(key, value);
  406                       }
  407                     }
  408                   };
  409                 combineAndSpill(rIter, combiner, combineCollector);
  410                 combiner.close();
  411               }
  412               else //just spill the sorted data
  413                 spill(rIter);
  414             }
  415             endPartition(i);
  416           }
  417           numSpills++;
  418           out.close();
  419           indexOut.close();
  420         }
  421       }
  422       
  423       private void combineAndSpill(RawKeyValueIterator resultIter, 
  424                                    Reducer combiner, OutputCollector combineCollector) throws IOException {
  425         //combine the key/value obtained from the offset & indices arrays.
  426         CombineValuesIterator values = new CombineValuesIterator(resultIter,
  427                                                                  comparator, keyClass, valClass, job, reporter);
  428         while (values.more()) {
  429           combiner.reduce(values.getKey(), values, combineCollector, reporter);
  430           values.nextKey();
  431           reporter.incrCounter(COMBINE_OUTPUT_RECORDS, 1);
  432           // indicate we're making progress
  433           reporter.progress();
  434         }
  435       }
  436       
  437       private void spill(RawKeyValueIterator resultIter) throws IOException {
  438         Writable key = null;
  439         Writable value = null;
  440   
  441         try {
  442           // indicate progress, since constructor may take a while (because of 
  443           // user code) 
  444           reporter.progress();
  445           key = (WritableComparable)ReflectionUtils.newInstance(keyClass, job);
  446           value = (Writable)ReflectionUtils.newInstance(valClass, job);
  447         } catch (Exception e) {
  448           throw new RuntimeException(e);
  449         }
  450   
  451         DataInputBuffer keyIn = new DataInputBuffer();
  452         DataInputBuffer valIn = new DataInputBuffer();
  453         DataOutputBuffer valOut = new DataOutputBuffer();
  454         while (resultIter.next()) {
  455           keyIn.reset(resultIter.getKey().getData(), 
  456                       resultIter.getKey().getLength());
  457           key.readFields(keyIn);
  458           valOut.reset();
  459           (resultIter.getValue()).writeUncompressedBytes(valOut);
  460           valIn.reset(valOut.getData(), valOut.getLength());
  461           value.readFields(valIn);
  462           writer.append(key, value);
  463           reporter.progress();
  464         }
  465       }
  466       
  467       private void mergeParts() throws IOException {
  468         // get the approximate size of the final output/index files
  469         long finalOutFileSize = 0;
  470         long finalIndexFileSize = 0;
  471         Path [] filename = new Path[numSpills];
  472         Path [] indexFileName = new Path[numSpills];
  473         
  474         for(int i = 0; i < numSpills; i++) {
  475           filename[i] = mapOutputFile.getSpillFile(getTaskId(), i);
  476           indexFileName[i] = mapOutputFile.getSpillIndexFile(getTaskId(), i);
  477           finalOutFileSize += localFs.getLength(filename[i]);
  478         }
  479         //make correction in the length to include the sequence file header
  480         //lengths for each partition
  481         finalOutFileSize += partitions * APPROX_HEADER_LENGTH;
  482         
  483         finalIndexFileSize = partitions * 16;
  484         
  485         Path finalOutputFile = mapOutputFile.getOutputFileForWrite(getTaskId(), 
  486                                finalOutFileSize);
  487         Path finalIndexFile = mapOutputFile.getOutputIndexFileForWrite(
  488                               getTaskId(), finalIndexFileSize);
  489         
  490         if (numSpills == 1) { //the spill is the final output
  491           localFs.rename(filename[0], finalOutputFile);
  492           localFs.rename(indexFileName[0], finalIndexFile);
  493           return;
  494         }
  495         
  496         //The output stream for the final single output file
  497         FSDataOutputStream finalOut = localFs.create(finalOutputFile, true, 
  498                                                      4096);
  499         //The final index file output stream
  500         FSDataOutputStream finalIndexOut = localFs.create(finalIndexFile, true,
  501                                                           4096);
  502         long segmentStart;
  503         
  504         if (numSpills == 0) {
  505           //create dummy files
  506           for (int i = 0; i < partitions; i++) {
  507             segmentStart = finalOut.getPos();
  508             Writer writer = SequenceFile.createWriter(job, finalOut, 
  509                                                       job.getMapOutputKeyClass(), 
  510                                                       job.getMapOutputValueClass(), 
  511                                                       compressionType, codec);
  512             finalIndexOut.writeLong(segmentStart);
  513             finalIndexOut.writeLong(finalOut.getPos() - segmentStart);
  514             writer.close();
  515           }
  516           finalOut.close();
  517           finalIndexOut.close();
  518           return;
  519         }
  520         {
  521           //create a sorter object as we need access to the SegmentDescriptor
  522           //class and merge methods
  523           Sorter sorter = new Sorter(localFs, job.getOutputKeyComparator(), valClass, job);
  524           sorter.setProgressable(reporter);
  525           
  526           for (int parts = 0; parts < partitions; parts++){
  527             List<SegmentDescriptor> segmentList =
  528               new ArrayList<SegmentDescriptor>(numSpills);
  529             for(int i = 0; i < numSpills; i++) {
  530               FSDataInputStream indexIn = localFs.open(indexFileName[i]);
  531               indexIn.seek(parts * 16);
  532               long segmentOffset = indexIn.readLong();
  533               long segmentLength = indexIn.readLong();
  534               indexIn.close();
  535               SegmentDescriptor s = sorter.new SegmentDescriptor(segmentOffset,
  536                                                                  segmentLength, filename[i]);
  537               s.preserveInput(true);
  538               s.doSync();
  539               segmentList.add(i, s);
  540             }
  541             segmentStart = finalOut.getPos();
  542             RawKeyValueIterator kvIter = sorter.merge(segmentList, new Path(getTaskId())); 
  543             SequenceFile.Writer writer = SequenceFile.createWriter(job, finalOut, 
  544                                                                    job.getMapOutputKeyClass(), job.getMapOutputValueClass(), 
  545                                                                    compressionType, codec);
  546             sorter.writeFile(kvIter, writer);
  547             //close the file - required esp. for block compression to ensure
  548             //partition data don't span partition boundaries
  549             writer.close();
  550             //when we write the offset/length to the final index file, we write
  551             //longs for both. This helps us to reliably seek directly to the
  552             //offset/length for a partition when we start serving the byte-ranges
  553             //to the reduces. We probably waste some space in the file by doing
  554             //this as opposed to writing VLong but it helps us later on.
  555             finalIndexOut.writeLong(segmentStart);
  556             finalIndexOut.writeLong(finalOut.getPos()-segmentStart);
  557           }
  558           finalOut.close();
  559           finalIndexOut.close();
  560           //cleanup
  561           for(int i = 0; i < numSpills; i++) {
  562             localFs.delete(filename[i]);
  563             localFs.delete(indexFileName[i]);
  564           }
  565         }
  566       }
  567       
  568       public void close() throws IOException {
  569         //empty for now
  570       }
  571       
  572       private class CombineValuesIterator extends ValuesIterator {
  573           
  574         public CombineValuesIterator(SequenceFile.Sorter.RawKeyValueIterator in, 
  575                                      WritableComparator comparator, Class keyClass,
  576                                      Class valClass, Configuration conf, Reporter reporter) 
  577           throws IOException {
  578           super(in, comparator, keyClass, valClass, conf, reporter);
  579         }
  580         
  581         public Object next() {
  582           reporter.incrCounter(COMBINE_INPUT_RECORDS, 1);
  583           return super.next();
  584         }
  585       }
  586   
  587       public void flush() throws IOException 
  588       {
  589         //check whether the length of the key/value buffer is 0. If not, then
  590         //we need to spill that to disk. Note that we reset the key/val buffer
  591         //upon each spill (so a length > 0 means that we have not spilled yet)
  592         synchronized (this) {
  593           if (keyValBuffer != null && keyValBuffer.getLength() > 0) {
  594             sortAndSpillToDisk();
  595           }
  596         }
  597         mergeParts();
  598       }
  599     }
  600   }

Save This Page
Home » hadoop-0.14.4 » org.apache » hadoop » mapred » [javadoc | source]