1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19 package org.apache.hadoop.mapred;
20
21 import java.io.DataInput;
22 import java.io.DataOutput;
23 import java.io.DataOutputStream;
24 import java.io.IOException;
25 import java.util.ArrayList;
26 import java.util.List;
27
28 import org.apache.commons.logging.Log;
29 import org.apache.commons.logging.LogFactory;
30 import org.apache.hadoop.conf.Configuration;
31 import org.apache.hadoop.fs.FSDataInputStream;
32 import org.apache.hadoop.fs.FSDataOutputStream;
33 import org.apache.hadoop.fs.FileSystem;
34 import org.apache.hadoop.fs.Path;
35 import org.apache.hadoop.io.BytesWritable;
36 import org.apache.hadoop.io.DataInputBuffer;
37 import org.apache.hadoop.io.DataOutputBuffer;
38 import org.apache.hadoop.io.SequenceFile;
39 import org.apache.hadoop.io.Text;
40 import org.apache.hadoop.io.Writable;
41 import org.apache.hadoop.io.WritableComparable;
42 import org.apache.hadoop.io.WritableComparator;
43 import org.apache.hadoop.io.SequenceFile.CompressionType;
44 import org.apache.hadoop.io.SequenceFile.Sorter;
45 import org.apache.hadoop.io.SequenceFile.Writer;
46 import org.apache.hadoop.io.SequenceFile.Sorter.RawKeyValueIterator;
47 import org.apache.hadoop.io.SequenceFile.Sorter.SegmentDescriptor;
48 import org.apache.hadoop.io.compress.CompressionCodec;
49 import org.apache.hadoop.io.compress.DefaultCodec;
50 import org.apache.hadoop.mapred.ReduceTask.ValuesIterator;
51 import org.apache.hadoop.util.ReflectionUtils;
52 import org.apache.hadoop.util.StringUtils;
53
54 import static org.apache.hadoop.mapred.Task.Counter.*;
55
56 /** A Map task. */
57 class MapTask extends Task {
58
59 private BytesWritable split = new BytesWritable();
60 private String splitClass;
61 private InputSplit instantiatedSplit = null;
62 private final static int APPROX_HEADER_LENGTH = 150;
63
64 private static final Log LOG = LogFactory.getLog(MapTask.class.getName());
65
66 { // set phase for this task
67 setPhase(TaskStatus.Phase.MAP);
68 }
69
70 public MapTask() {}
71
72 public MapTask(String jobId, String jobFile, String tipId, String taskId,
73 int partition, String splitClass, BytesWritable split
74 ) throws IOException {
75 super(jobId, jobFile, tipId, taskId, partition);
76 this.splitClass = splitClass;
77 this.split.set(split);
78 }
79
80 public boolean isMapTask() {
81 return true;
82 }
83
84 public void localizeConfiguration(JobConf conf) throws IOException {
85 super.localizeConfiguration(conf);
86 Path localSplit = new Path(new Path(getJobFile()).getParent(),
87 "split.dta");
88 LOG.debug("Writing local split to " + localSplit);
89 DataOutputStream out = FileSystem.getLocal(conf).create(localSplit);
90 Text.writeString(out, splitClass);
91 split.write(out);
92 out.close();
93 }
94
95 public TaskRunner createRunner(TaskTracker tracker) {
96 return new MapTaskRunner(this, tracker, this.conf);
97 }
98
99 public void write(DataOutput out) throws IOException {
100 super.write(out);
101 Text.writeString(out, splitClass);
102 split.write(out);
103 }
104
105 public void readFields(DataInput in) throws IOException {
106 super.readFields(in);
107 splitClass = Text.readString(in);
108 split.readFields(in);
109 }
110
111 InputSplit getInputSplit() throws UnsupportedOperationException {
112 return instantiatedSplit;
113 }
114
115 public void run(final JobConf job, final TaskUmbilicalProtocol umbilical)
116 throws IOException {
117
118 final Reporter reporter = getReporter(umbilical);
119
120 // start thread that will handle communication with parent
121 startCommunicationThread(umbilical);
122
123 int numReduceTasks = conf.getNumReduceTasks();
124 LOG.info("numReduceTasks: " + numReduceTasks);
125 MapOutputCollector collector = null;
126 if (numReduceTasks > 0) {
127 collector = new MapOutputBuffer(umbilical, job, reporter);
128 } else {
129 collector = new DirectMapOutputCollector(umbilical, job, reporter);
130 }
131 // reinstantiate the split
132 try {
133 instantiatedSplit = (InputSplit)
134 ReflectionUtils.newInstance(job.getClassByName(splitClass), job);
135 } catch (ClassNotFoundException exp) {
136 IOException wrap = new IOException("Split class " + splitClass +
137 " not found");
138 wrap.initCause(exp);
139 throw wrap;
140 }
141 DataInputBuffer splitBuffer = new DataInputBuffer();
142 splitBuffer.reset(split.get(), 0, split.getSize());
143 instantiatedSplit.readFields(splitBuffer);
144
145 // if it is a file split, we can give more details
146 if (instantiatedSplit instanceof FileSplit) {
147 FileSplit fileSplit = (FileSplit) instantiatedSplit;
148 job.set("map.input.file", fileSplit.getPath().toString());
149 job.setLong("map.input.start", fileSplit.getStart());
150 job.setLong("map.input.length", fileSplit.getLength());
151 }
152
153 final RecordReader rawIn = // open input
154 job.getInputFormat().getRecordReader(instantiatedSplit, job, reporter);
155
156 RecordReader in = new RecordReader() { // wrap in progress reporter
157
158 public WritableComparable createKey() {
159 return rawIn.createKey();
160 }
161
162 public Writable createValue() {
163 return rawIn.createValue();
164 }
165
166 public synchronized boolean next(Writable key, Writable value)
167 throws IOException {
168
169 setProgress(getProgress());
170 long beforePos = getPos();
171 boolean ret = rawIn.next(key, value);
172 if (ret) {
173 reporter.incrCounter(MAP_INPUT_RECORDS, 1);
174 reporter.incrCounter(MAP_INPUT_BYTES, (getPos() - beforePos));
175 }
176 return ret;
177 }
178 public long getPos() throws IOException { return rawIn.getPos(); }
179 public void close() throws IOException { rawIn.close(); }
180 public float getProgress() throws IOException {
181 return rawIn.getProgress();
182 }
183 };
184
185 MapRunnable runner =
186 (MapRunnable)ReflectionUtils.newInstance(job.getMapRunnerClass(), job);
187
188 try {
189 runner.run(in, collector, reporter);
190 collector.flush();
191 } finally {
192 //close
193 in.close(); // close input
194 collector.close();
195 }
196 done(umbilical);
197 }
198
199 interface MapOutputCollector extends OutputCollector {
200
201 public void close() throws IOException;
202
203 public void flush() throws IOException;
204
205 }
206
207 class DirectMapOutputCollector implements MapOutputCollector {
208
209 private RecordWriter out = null;
210
211 private Reporter reporter = null;
212
213 public DirectMapOutputCollector(TaskUmbilicalProtocol umbilical,
214 JobConf job, Reporter reporter) throws IOException {
215 this.reporter = reporter;
216 String finalName = getOutputName(getPartition());
217 FileSystem fs = FileSystem.get(job);
218
219 out = job.getOutputFormat().getRecordWriter(fs, job, finalName, reporter);
220 }
221
222 public void close() throws IOException {
223 if (this.out != null) {
224 out.close(this.reporter);
225 }
226
227 }
228
229 public void flush() throws IOException {
230 // TODO Auto-generated method stub
231
232 }
233
234 public void collect(WritableComparable key, Writable value) throws IOException {
235 this.out.write(key, value);
236 }
237
238 }
239
240 class MapOutputBuffer implements MapOutputCollector {
241
242 private final int partitions;
243 private Partitioner partitioner;
244 private JobConf job;
245 private Reporter reporter;
246 final private TaskUmbilicalProtocol umbilical;
247
248 private DataOutputBuffer keyValBuffer; //the buffer where key/val will
249 //be stored before they are
250 //spilled to disk
251 private int maxBufferSize; //the max amount of in-memory space after which
252 //we will spill the keyValBuffer to disk
253 private int numSpills; //maintains the no. of spills to disk done so far
254
255 private FileSystem localFs;
256 private CompressionCodec codec;
257 private CompressionType compressionType;
258 private Class keyClass;
259 private Class valClass;
260 private WritableComparator comparator;
261 private BufferSorter []sortImpl;
262 private SequenceFile.Writer writer;
263 private FSDataOutputStream out;
264 private FSDataOutputStream indexOut;
265 private long segmentStart;
266 public MapOutputBuffer(TaskUmbilicalProtocol umbilical, JobConf job,
267 Reporter reporter) throws IOException {
268 this.partitions = job.getNumReduceTasks();
269 this.partitioner = (Partitioner)ReflectionUtils.newInstance(
270 job.getPartitionerClass(), job);
271 maxBufferSize = job.getInt("io.sort.mb", 100) * 1024 * 1024;
272 keyValBuffer = new DataOutputBuffer();
273
274 this.job = job;
275 this.reporter = reporter;
276 this.umbilical = umbilical;
277 this.comparator = job.getOutputKeyComparator();
278 this.keyClass = job.getMapOutputKeyClass();
279 this.valClass = job.getMapOutputValueClass();
280 this.localFs = FileSystem.getLocal(job);
281 this.codec = null;
282 this.compressionType = CompressionType.NONE;
283 if (job.getCompressMapOutput()) {
284 // find the kind of compression to do, defaulting to record
285 compressionType = job.getMapOutputCompressionType();
286
287 // find the right codec
288 Class codecClass =
289 job.getMapOutputCompressorClass(DefaultCodec.class);
290 codec = (CompressionCodec)
291 ReflectionUtils.newInstance(codecClass, job);
292 }
293 sortImpl = new BufferSorter[partitions];
294 for (int i = 0; i < partitions; i++)
295 sortImpl[i] = (BufferSorter)ReflectionUtils.newInstance(
296 job.getClass("map.sort.class", MergeSorter.class,
297 BufferSorter.class), job);
298 }
299
300 private void startPartition(int partNumber) throws IOException {
301 //We create the sort output as multiple sequence files within a spilled
302 //file. So we create a writer for each partition.
303 segmentStart = out.getPos();
304 writer =
305 SequenceFile.createWriter(job, out, job.getMapOutputKeyClass(),
306 job.getMapOutputValueClass(), compressionType, codec);
307 }
308 private void endPartition(int partNumber) throws IOException {
309 //Need to close the file, especially if block compression is in use
310 //We also update the index file to contain the part offsets per
311 //spilled file
312 writer.close();
313 indexOut.writeLong(segmentStart);
314 //we also store 0 length key/val segments to make the merge phase easier.
315 indexOut.writeLong(out.getPos()-segmentStart);
316 }
317
318 public void collect(WritableComparable key,
319 Writable value) throws IOException {
320
321 if (key.getClass() != keyClass) {
322 throw new IOException("Type mismatch in key from map: expected "
323 + keyClass.getName() + ", recieved "
324 + key.getClass().getName());
325 }
326 if (value.getClass() != valClass) {
327 throw new IOException("Type mismatch in value from map: expected "
328 + valClass.getName() + ", recieved "
329 + value.getClass().getName());
330 }
331
332 synchronized (this) {
333 if (keyValBuffer == null) {
334 keyValBuffer = new DataOutputBuffer();
335 }
336 //dump the key/value to buffer
337 int keyOffset = keyValBuffer.getLength();
338 key.write(keyValBuffer);
339 int keyLength = keyValBuffer.getLength() - keyOffset;
340 value.write(keyValBuffer);
341 int valLength = keyValBuffer.getLength() - (keyOffset + keyLength);
342
343 int partNumber = partitioner.getPartition(key, value, partitions);
344 sortImpl[partNumber].addKeyValue(keyOffset, keyLength, valLength);
345
346 reporter.incrCounter(MAP_OUTPUT_RECORDS, 1);
347 reporter.incrCounter(MAP_OUTPUT_BYTES,
348 (keyValBuffer.getLength() - keyOffset));
349
350 //now check whether we need to spill to disk
351 long totalMem = 0;
352 for (int i = 0; i < partitions; i++)
353 totalMem += sortImpl[i].getMemoryUtilized();
354 if ((keyValBuffer.getLength() + totalMem) >= maxBufferSize) {
355 sortAndSpillToDisk();
356 //we don't reuse the keyValBuffer. We want to maintain consistency
357 //in the memory model (for negligible performance loss).
358 keyValBuffer = null;
359 for (int i = 0; i < partitions; i++) {
360 sortImpl[i].close();
361 }
362 }
363 }
364 }
365
366 //sort, combine and spill to disk
367 private void sortAndSpillToDisk() throws IOException {
368 synchronized (this) {
369 //approximate the length of the output file to be the length of the
370 //buffer + header lengths for the partitions
371 long size = keyValBuffer.getLength() +
372 partitions * APPROX_HEADER_LENGTH;
373 Path filename = mapOutputFile.getSpillFileForWrite(getTaskId(),
374 numSpills, size);
375 //we just create the FSDataOutputStream object here.
376 out = localFs.create(filename);
377 Path indexFilename = mapOutputFile.getSpillIndexFileForWrite(
378 getTaskId(), numSpills, partitions * 16);
379 indexOut = localFs.create(indexFilename);
380 LOG.debug("opened "+
381 mapOutputFile.getSpillFile(getTaskId(), numSpills).getName());
382
383 //invoke the sort
384 for (int i = 0; i < partitions; i++) {
385 sortImpl[i].setInputBuffer(keyValBuffer);
386 sortImpl[i].setProgressable(reporter);
387 RawKeyValueIterator rIter = sortImpl[i].sort();
388
389 startPartition(i);
390 if (rIter != null) {
391 //invoke the combiner if one is defined
392 if (job.getCombinerClass() != null) {
393 //we instantiate and close the combiner for each partition. This
394 //is required for streaming where the combiner runs as a separate
395 //process and we want to make sure that the combiner process has
396 //got all the input key/val, processed, and output the result
397 //key/vals before we write the partition header in the output file
398 Reducer combiner = (Reducer)ReflectionUtils.newInstance(
399 job.getCombinerClass(), job);
400 // make collector
401 OutputCollector combineCollector = new OutputCollector() {
402 public void collect(WritableComparable key, Writable value)
403 throws IOException {
404 synchronized (this) {
405 writer.append(key, value);
406 }
407 }
408 };
409 combineAndSpill(rIter, combiner, combineCollector);
410 combiner.close();
411 }
412 else //just spill the sorted data
413 spill(rIter);
414 }
415 endPartition(i);
416 }
417 numSpills++;
418 out.close();
419 indexOut.close();
420 }
421 }
422
423 private void combineAndSpill(RawKeyValueIterator resultIter,
424 Reducer combiner, OutputCollector combineCollector) throws IOException {
425 //combine the key/value obtained from the offset & indices arrays.
426 CombineValuesIterator values = new CombineValuesIterator(resultIter,
427 comparator, keyClass, valClass, job, reporter);
428 while (values.more()) {
429 combiner.reduce(values.getKey(), values, combineCollector, reporter);
430 values.nextKey();
431 reporter.incrCounter(COMBINE_OUTPUT_RECORDS, 1);
432 // indicate we're making progress
433 reporter.progress();
434 }
435 }
436
437 private void spill(RawKeyValueIterator resultIter) throws IOException {
438 Writable key = null;
439 Writable value = null;
440
441 try {
442 // indicate progress, since constructor may take a while (because of
443 // user code)
444 reporter.progress();
445 key = (WritableComparable)ReflectionUtils.newInstance(keyClass, job);
446 value = (Writable)ReflectionUtils.newInstance(valClass, job);
447 } catch (Exception e) {
448 throw new RuntimeException(e);
449 }
450
451 DataInputBuffer keyIn = new DataInputBuffer();
452 DataInputBuffer valIn = new DataInputBuffer();
453 DataOutputBuffer valOut = new DataOutputBuffer();
454 while (resultIter.next()) {
455 keyIn.reset(resultIter.getKey().getData(),
456 resultIter.getKey().getLength());
457 key.readFields(keyIn);
458 valOut.reset();
459 (resultIter.getValue()).writeUncompressedBytes(valOut);
460 valIn.reset(valOut.getData(), valOut.getLength());
461 value.readFields(valIn);
462 writer.append(key, value);
463 reporter.progress();
464 }
465 }
466
467 private void mergeParts() throws IOException {
468 // get the approximate size of the final output/index files
469 long finalOutFileSize = 0;
470 long finalIndexFileSize = 0;
471 Path [] filename = new Path[numSpills];
472 Path [] indexFileName = new Path[numSpills];
473
474 for(int i = 0; i < numSpills; i++) {
475 filename[i] = mapOutputFile.getSpillFile(getTaskId(), i);
476 indexFileName[i] = mapOutputFile.getSpillIndexFile(getTaskId(), i);
477 finalOutFileSize += localFs.getLength(filename[i]);
478 }
479 //make correction in the length to include the sequence file header
480 //lengths for each partition
481 finalOutFileSize += partitions * APPROX_HEADER_LENGTH;
482
483 finalIndexFileSize = partitions * 16;
484
485 Path finalOutputFile = mapOutputFile.getOutputFileForWrite(getTaskId(),
486 finalOutFileSize);
487 Path finalIndexFile = mapOutputFile.getOutputIndexFileForWrite(
488 getTaskId(), finalIndexFileSize);
489
490 if (numSpills == 1) { //the spill is the final output
491 localFs.rename(filename[0], finalOutputFile);
492 localFs.rename(indexFileName[0], finalIndexFile);
493 return;
494 }
495
496 //The output stream for the final single output file
497 FSDataOutputStream finalOut = localFs.create(finalOutputFile, true,
498 4096);
499 //The final index file output stream
500 FSDataOutputStream finalIndexOut = localFs.create(finalIndexFile, true,
501 4096);
502 long segmentStart;
503
504 if (numSpills == 0) {
505 //create dummy files
506 for (int i = 0; i < partitions; i++) {
507 segmentStart = finalOut.getPos();
508 Writer writer = SequenceFile.createWriter(job, finalOut,
509 job.getMapOutputKeyClass(),
510 job.getMapOutputValueClass(),
511 compressionType, codec);
512 finalIndexOut.writeLong(segmentStart);
513 finalIndexOut.writeLong(finalOut.getPos() - segmentStart);
514 writer.close();
515 }
516 finalOut.close();
517 finalIndexOut.close();
518 return;
519 }
520 {
521 //create a sorter object as we need access to the SegmentDescriptor
522 //class and merge methods
523 Sorter sorter = new Sorter(localFs, job.getOutputKeyComparator(), valClass, job);
524 sorter.setProgressable(reporter);
525
526 for (int parts = 0; parts < partitions; parts++){
527 List<SegmentDescriptor> segmentList =
528 new ArrayList<SegmentDescriptor>(numSpills);
529 for(int i = 0; i < numSpills; i++) {
530 FSDataInputStream indexIn = localFs.open(indexFileName[i]);
531 indexIn.seek(parts * 16);
532 long segmentOffset = indexIn.readLong();
533 long segmentLength = indexIn.readLong();
534 indexIn.close();
535 SegmentDescriptor s = sorter.new SegmentDescriptor(segmentOffset,
536 segmentLength, filename[i]);
537 s.preserveInput(true);
538 s.doSync();
539 segmentList.add(i, s);
540 }
541 segmentStart = finalOut.getPos();
542 RawKeyValueIterator kvIter = sorter.merge(segmentList, new Path(getTaskId()));
543 SequenceFile.Writer writer = SequenceFile.createWriter(job, finalOut,
544 job.getMapOutputKeyClass(), job.getMapOutputValueClass(),
545 compressionType, codec);
546 sorter.writeFile(kvIter, writer);
547 //close the file - required esp. for block compression to ensure
548 //partition data don't span partition boundaries
549 writer.close();
550 //when we write the offset/length to the final index file, we write
551 //longs for both. This helps us to reliably seek directly to the
552 //offset/length for a partition when we start serving the byte-ranges
553 //to the reduces. We probably waste some space in the file by doing
554 //this as opposed to writing VLong but it helps us later on.
555 finalIndexOut.writeLong(segmentStart);
556 finalIndexOut.writeLong(finalOut.getPos()-segmentStart);
557 }
558 finalOut.close();
559 finalIndexOut.close();
560 //cleanup
561 for(int i = 0; i < numSpills; i++) {
562 localFs.delete(filename[i]);
563 localFs.delete(indexFileName[i]);
564 }
565 }
566 }
567
568 public void close() throws IOException {
569 //empty for now
570 }
571
572 private class CombineValuesIterator extends ValuesIterator {
573
574 public CombineValuesIterator(SequenceFile.Sorter.RawKeyValueIterator in,
575 WritableComparator comparator, Class keyClass,
576 Class valClass, Configuration conf, Reporter reporter)
577 throws IOException {
578 super(in, comparator, keyClass, valClass, conf, reporter);
579 }
580
581 public Object next() {
582 reporter.incrCounter(COMBINE_INPUT_RECORDS, 1);
583 return super.next();
584 }
585 }
586
587 public void flush() throws IOException
588 {
589 //check whether the length of the key/value buffer is 0. If not, then
590 //we need to spill that to disk. Note that we reset the key/val buffer
591 //upon each spill (so a length > 0 means that we have not spilled yet)
592 synchronized (this) {
593 if (keyValBuffer != null && keyValBuffer.getLength() > 0) {
594 sortAndSpillToDisk();
595 }
596 }
597 mergeParts();
598 }
599 }
600 }