1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19 package org.apache.hadoop.mapred;
20
21 import java.io.DataInput;
22 import java.io.DataOutput;
23 import java.io.File;
24 import java.io.IOException;
25 import java.net.URI;
26 import java.net.URL;
27 import java.net.URLClassLoader;
28 import java.text.DecimalFormat;
29 import java.util.ArrayList;
30 import java.util.Collections;
31 import java.util.HashSet;
32 import java.util.Hashtable;
33 import java.util.Iterator;
34 import java.util.List;
35 import java.util.Map;
36 import java.util.Random;
37 import java.util.Set;
38 import java.util.TreeSet;
39 import java.util.concurrent.atomic.AtomicBoolean;
40
41 import org.apache.commons.logging.Log;
42 import org.apache.commons.logging.LogFactory;
43 import org.apache.hadoop.conf.Configuration;
44 import org.apache.hadoop.fs.FileSystem;
45 import org.apache.hadoop.fs.InMemoryFileSystem;
46 import org.apache.hadoop.fs.LocalFileSystem;
47 import org.apache.hadoop.fs.Path;
48 import org.apache.hadoop.fs.PathFilter;
49 import org.apache.hadoop.io.DataInputBuffer;
50 import org.apache.hadoop.io.DataOutputBuffer;
51 import org.apache.hadoop.io.IntWritable;
52 import org.apache.hadoop.io.SequenceFile;
53 import org.apache.hadoop.io.Writable;
54 import org.apache.hadoop.io.WritableComparable;
55 import org.apache.hadoop.io.WritableComparator;
56 import org.apache.hadoop.io.WritableFactories;
57 import org.apache.hadoop.io.WritableFactory;
58 import org.apache.hadoop.metrics.MetricsContext;
59 import org.apache.hadoop.metrics.MetricsRecord;
60 import org.apache.hadoop.metrics.MetricsUtil;
61 import org.apache.hadoop.metrics.Updater;
62 import org.apache.hadoop.util.Progress;
63 import org.apache.hadoop.util.ReflectionUtils;
64 import org.apache.hadoop.util.StringUtils;
65 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
66
67 import static org.apache.hadoop.mapred.Task.Counter.*;
68
69 /** A Reduce task. */
70 class ReduceTask extends Task {
71
72 static { // register a ctor
73 WritableFactories.setFactory
74 (ReduceTask.class,
75 new WritableFactory() {
76 public Writable newInstance() { return new ReduceTask(); }
77 });
78 }
79
80 private static final Log LOG = LogFactory.getLog(ReduceTask.class.getName());
81 private int numMaps;
82 private ReduceCopier reduceCopier;
83
84 {
85 getProgress().setStatus("reduce");
86 setPhase(TaskStatus.Phase.SHUFFLE); // phase to start with
87 }
88
89 private Progress copyPhase = getProgress().addPhase("copy");
90 private Progress sortPhase = getProgress().addPhase("sort");
91 private Progress reducePhase = getProgress().addPhase("reduce");
92
93 public ReduceTask() {}
94
95 public ReduceTask(String jobId, String jobFile, String tipId, String taskId,
96 int partition, int numMaps) {
97 super(jobId, jobFile, tipId, taskId, partition);
98 this.numMaps = numMaps;
99 }
100
101 public TaskRunner createRunner(TaskTracker tracker) throws IOException {
102 return new ReduceTaskRunner(this, tracker, this.conf);
103 }
104
105 public boolean isMapTask() {
106 return false;
107 }
108
109 public int getNumMaps() { return numMaps; }
110
111 /**
112 * Localize the given JobConf to be specific for this task.
113 */
114 public void localizeConfiguration(JobConf conf) throws IOException {
115 super.localizeConfiguration(conf);
116 conf.setNumMapTasks(numMaps);
117 }
118
119 public void write(DataOutput out) throws IOException {
120 super.write(out);
121
122 out.writeInt(numMaps); // write the number of maps
123 }
124
125 public void readFields(DataInput in) throws IOException {
126 super.readFields(in);
127
128 numMaps = in.readInt();
129 }
130
131 /** Iterates values while keys match in sorted input. */
132 static class ValuesIterator implements Iterator {
133 private SequenceFile.Sorter.RawKeyValueIterator in; //input iterator
134 private WritableComparable key; // current key
135 private Writable value; // current value
136 private boolean hasNext; // more w/ this key
137 private boolean more; // more in file
138 private WritableComparator comparator;
139 private Class keyClass;
140 private Class valClass;
141 private Configuration conf;
142 private DataOutputBuffer valOut = new DataOutputBuffer();
143 private DataInputBuffer valIn = new DataInputBuffer();
144 private DataInputBuffer keyIn = new DataInputBuffer();
145 protected Reporter reporter;
146
147 public ValuesIterator (SequenceFile.Sorter.RawKeyValueIterator in,
148 WritableComparator comparator, Class keyClass,
149 Class valClass, Configuration conf,
150 Reporter reporter)
151 throws IOException {
152 this.in = in;
153 this.conf = conf;
154 this.comparator = comparator;
155 this.keyClass = keyClass;
156 this.valClass = valClass;
157 this.reporter = reporter;
158 getNext();
159 }
160
161 /// Iterator methods
162
163 public boolean hasNext() { return hasNext; }
164
165 public Object next() {
166 Object result = value; // save value
167 try {
168 getNext(); // move to next
169 } catch (IOException e) {
170 throw new RuntimeException(e);
171 }
172 reporter.progress();
173 return result; // return saved value
174 }
175
176 public void remove() { throw new RuntimeException("not implemented"); }
177
178 /// Auxiliary methods
179
180 /** Start processing next unique key. */
181 public void nextKey() {
182 while (hasNext) { next(); } // skip any unread
183 hasNext = more;
184 }
185
186 /** True iff more keys remain. */
187 public boolean more() { return more; }
188
189 /** The current key. */
190 public WritableComparable getKey() { return key; }
191
192 private void getNext() throws IOException {
193 Writable lastKey = key; // save previous key
194 try {
195 key = (WritableComparable)ReflectionUtils.newInstance(keyClass, this.conf);
196 value = (Writable)ReflectionUtils.newInstance(valClass, this.conf);
197 } catch (Exception e) {
198 throw new RuntimeException(e);
199 }
200 more = in.next();
201 if (more) {
202 //de-serialize the raw key/value
203 keyIn.reset(in.getKey().getData(), in.getKey().getLength());
204 key.readFields(keyIn);
205 valOut.reset();
206 (in.getValue()).writeUncompressedBytes(valOut);
207 valIn.reset(valOut.getData(), valOut.getLength());
208 value.readFields(valIn);
209
210 if (lastKey == null) {
211 hasNext = true;
212 } else {
213 hasNext = (comparator.compare(key, lastKey) == 0);
214 }
215 } else {
216 hasNext = false;
217 }
218 }
219 }
220 private class ReduceValuesIterator extends ValuesIterator {
221 public ReduceValuesIterator (SequenceFile.Sorter.RawKeyValueIterator in,
222 WritableComparator comparator, Class keyClass,
223 Class valClass,
224 Configuration conf, Reporter reporter)
225 throws IOException {
226 super(in, comparator, keyClass, valClass, conf, reporter);
227 }
228 public void informReduceProgress() {
229 reducePhase.set(super.in.getProgress().get()); // update progress
230 reporter.progress();
231 }
232 public Object next() {
233 reporter.incrCounter(REDUCE_INPUT_RECORDS, 1);
234 return super.next();
235 }
236 }
237
238 public void run(JobConf job, final TaskUmbilicalProtocol umbilical)
239 throws IOException {
240 Reducer reducer = (Reducer)ReflectionUtils.newInstance(
241 job.getReducerClass(), job);
242
243 // start thread that will handle communication with parent
244 startCommunicationThread(umbilical);
245
246 FileSystem lfs = FileSystem.getLocal(job);
247 if (!job.get("mapred.job.tracker", "local").equals("local")) {
248 reduceCopier = new ReduceCopier(umbilical, job);
249 if (!reduceCopier.fetchOutputs()) {
250 throw new IOException(getTaskId() + "The reduce copier failed");
251 }
252 }
253 copyPhase.complete(); // copy is already complete
254
255
256 // open a file to collect map output
257 // since we don't know how many map outputs got merged in memory, we have
258 // to check whether a given map output exists, and if it does, add it in
259 // the list of files to merge, otherwise not.
260 List<Path> mapFilesList = new ArrayList<Path>();
261 for(int i=0; i < numMaps; i++) {
262 Path f;
263 try {
264 //catch and ignore DiskErrorException, since some map outputs will
265 //really be absent (inmem merge).
266 f = mapOutputFile.getInputFile(i, getTaskId());
267 } catch (DiskErrorException d) {
268 continue;
269 }
270 if (lfs.exists(f))
271 mapFilesList.add(f);
272 }
273 Path[] mapFiles = new Path[mapFilesList.size()];
274 mapFiles = mapFilesList.toArray(mapFiles);
275
276 Path tempDir = new Path(getTaskId());
277
278 SequenceFile.Sorter.RawKeyValueIterator rIter;
279
280 setPhase(TaskStatus.Phase.SORT);
281
282 final Reporter reporter = getReporter(umbilical);
283
284 // sort the input file
285 SequenceFile.Sorter sorter = new SequenceFile.Sorter(lfs,
286 job.getOutputKeyComparator(), job.getMapOutputValueClass(), job);
287 sorter.setProgressable(reporter);
288 rIter = sorter.merge(mapFiles, tempDir,
289 !conf.getKeepFailedTaskFiles()); // sort
290
291 sortPhase.complete(); // sort is complete
292 setPhase(TaskStatus.Phase.REDUCE);
293
294 // make output collector
295 String finalName = getOutputName(getPartition());
296 FileSystem fs = FileSystem.get(job);
297
298 final RecordWriter out =
299 job.getOutputFormat().getRecordWriter(fs, job, finalName, reporter);
300
301 OutputCollector collector = new OutputCollector() {
302 public void collect(WritableComparable key, Writable value)
303 throws IOException {
304 out.write(key, value);
305 reporter.incrCounter(REDUCE_OUTPUT_RECORDS, 1);
306 // indicate that progress update needs to be sent
307 reporter.progress();
308 }
309 };
310
311 // apply reduce function
312 try {
313 Class keyClass = job.getMapOutputKeyClass();
314 Class valClass = job.getMapOutputValueClass();
315
316 ReduceValuesIterator values = new ReduceValuesIterator(rIter,
317 job.getOutputValueGroupingComparator(), keyClass, valClass,
318 job, reporter);
319 values.informReduceProgress();
320 while (values.more()) {
321 reporter.incrCounter(REDUCE_INPUT_GROUPS, 1);
322 reducer.reduce(values.getKey(), values, collector, reporter);
323 values.nextKey();
324 values.informReduceProgress();
325 }
326
327 //Clean up: repeated in catch block below
328 reducer.close();
329 out.close(reporter);
330 //End of clean up.
331 } catch (IOException ioe) {
332 try {
333 reducer.close();
334 } catch (IOException ignored) {}
335
336 try {
337 out.close(reporter);
338 } catch (IOException ignored) {}
339
340 throw ioe;
341 }
342 done(umbilical);
343 }
344
345 class ReduceCopier implements MRConstants {
346
347 /** Reference to the umbilical object */
348 private TaskUmbilicalProtocol umbilical;
349
350 /** Reference to the task object */
351
352 /** Number of ms before timing out a copy */
353 private static final int STALLED_COPY_TIMEOUT = 3 * 60 * 1000;
354
355 /**
356 * our reduce task instance
357 */
358 private ReduceTask reduceTask;
359
360 /**
361 * the list of map outputs currently being copied
362 */
363 private List<MapOutputLocation> scheduledCopies;
364
365 /**
366 * the results of dispatched copy attempts
367 */
368 private List<CopyResult> copyResults;
369
370 /**
371 * the number of outputs to copy in parallel
372 */
373 private int numCopiers;
374
375 /**
376 * the maximum amount of time (less 1 minute) to wait to
377 * contact a host after a copy from it fails. We wait for (1 min +
378 * Random.nextInt(maxBackoff)) seconds.
379 */
380 private int maxBackoff;
381
382 /**
383 * busy hosts from which copies are being backed off
384 * Map of host -> next contact time
385 */
386 private Map<String, Long> penaltyBox;
387
388 /**
389 * the set of unique hosts from which we are copying
390 */
391 private Set<String> uniqueHosts;
392
393 /**
394 * the last time we polled the job tracker
395 */
396 private long lastPollTime;
397
398 /**
399 * A reference to the in memory file system for writing the map outputs to.
400 */
401 private InMemoryFileSystem inMemFileSys;
402
403 /**
404 * A reference to the local file system for writing the map outputs to.
405 */
406 private FileSystem localFileSys;
407
408 /**
409 * An instance of the sorter used for doing merge
410 */
411 private SequenceFile.Sorter sorter;
412
413 /**
414 * A reference to the throwable object (if merge throws an exception)
415 */
416 private volatile Throwable mergeThrowable;
417
418 /**
419 * A flag to indicate that merge is in progress
420 */
421 private volatile boolean mergeInProgress = false;
422
423 /**
424 * When we accumulate mergeThreshold number of files in ram, we merge/spill
425 */
426 private int mergeThreshold = 500;
427
428 /**
429 * The threads for fetching the files.
430 */
431 private MapOutputCopier[] copiers = null;
432
433 /**
434 * The object for metrics reporting.
435 */
436 private ShuffleClientMetrics shuffleClientMetrics = null;
437
438 /**
439 * the minimum interval between tasktracker polls
440 */
441 private static final long MIN_POLL_INTERVAL = 1000;
442
443 /**
444 * the number of map output locations to poll for at one time
445 */
446 private int probe_sample_size = 100;
447
448 /**
449 * a list of map output locations for fetch retrials
450 */
451 private List<MapOutputLocation> retryFetches =
452 new ArrayList<MapOutputLocation>();
453
454 /**
455 * The set of required map outputs
456 */
457 private Set <Integer> neededOutputs =
458 Collections.synchronizedSet(new TreeSet<Integer>());
459
460 /**
461 * The set of obsolete map taskids.
462 */
463 private Set <String> obsoleteMapIds =
464 Collections.synchronizedSet(new TreeSet<String>());
465
466 private Random random = null;
467
468 /**
469 * the max size of the merge output from ramfs
470 */
471 private long ramfsMergeOutputSize;
472
473 /**
474 * This class contains the methods that should be used for metrics-reporting
475 * the specific metrics for shuffle. This class actually reports the
476 * metrics for the shuffle client (the ReduceTask), and hence the name
477 * ShuffleClientMetrics.
478 */
479 class ShuffleClientMetrics implements Updater {
480 private MetricsRecord shuffleMetrics = null;
481 private int numFailedFetches = 0;
482 private int numSuccessFetches = 0;
483 private long numBytes = 0;
484 private int numThreadsBusy = 0;
485 ShuffleClientMetrics(JobConf conf) {
486 MetricsContext metricsContext = MetricsUtil.getContext("mapred");
487 this.shuffleMetrics =
488 MetricsUtil.createRecord(metricsContext, "shuffleInput");
489 this.shuffleMetrics.setTag("user", conf.getUser());
490 this.shuffleMetrics.setTag("jobName", conf.getJobName());
491 this.shuffleMetrics.setTag("jobId", ReduceTask.this.getJobId());
492 this.shuffleMetrics.setTag("taskId", getTaskId());
493 this.shuffleMetrics.setTag("sessionId", conf.getSessionId());
494 metricsContext.registerUpdater(this);
495 }
496 public synchronized void inputBytes(long numBytes) {
497 this.numBytes += numBytes;
498 }
499 public synchronized void failedFetch() {
500 ++numFailedFetches;
501 }
502 public synchronized void successFetch() {
503 ++numSuccessFetches;
504 }
505 public synchronized void threadBusy() {
506 ++numThreadsBusy;
507 }
508 public synchronized void threadFree() {
509 --numThreadsBusy;
510 }
511 public void doUpdates(MetricsContext unused) {
512 synchronized (this) {
513 shuffleMetrics.incrMetric("shuffle_input_bytes", numBytes);
514 shuffleMetrics.incrMetric("shuffle_failed_fetches",
515 numFailedFetches);
516 shuffleMetrics.incrMetric("shuffle_success_fetches",
517 numSuccessFetches);
518 if (numCopiers != 0) {
519 shuffleMetrics.setMetric("shuffle_fetchers_busy_percent",
520 100*((float)numThreadsBusy/numCopiers));
521 } else {
522 shuffleMetrics.setMetric("shuffle_fetchers_busy_percent", 0);
523 }
524 numBytes = 0;
525 numSuccessFetches = 0;
526 numFailedFetches = 0;
527 }
528 shuffleMetrics.update();
529 }
530 }
531
532 /** Represents the result of an attempt to copy a map output */
533 private class CopyResult {
534
535 // the map output location against which a copy attempt was made
536 private final MapOutputLocation loc;
537
538 // the size of the file copied, -1 if the transfer failed
539 private final long size;
540
541 //a flag signifying whether a copy result is obsolete
542 private static final int OBSOLETE = -2;
543
544 CopyResult(MapOutputLocation loc, long size) {
545 this.loc = loc;
546 this.size = size;
547 }
548
549 public int getMapId() { return loc.getMapId(); }
550 public boolean getSuccess() { return size >= 0; }
551 public boolean isObsolete() {
552 return size == OBSOLETE;
553 }
554 public long getSize() { return size; }
555 public String getHost() { return loc.getHost(); }
556 public MapOutputLocation getLocation() { return loc; }
557 }
558
559 private int extractMapIdFromPathName(Path pathname) {
560 //all paths end with map_<id>.out
561 String firstPathName = pathname.getName();
562 int beginIndex = firstPathName.lastIndexOf("map_");
563 int endIndex = firstPathName.lastIndexOf(".out");
564 return Integer.parseInt(firstPathName.substring(beginIndex +
565 "map_".length(), endIndex));
566 }
567
568 private int nextMapOutputCopierId = 0;
569
570 /** Copies map outputs as they become available */
571 private class MapOutputCopier extends Thread {
572
573 private MapOutputLocation currentLocation = null;
574 private int id = nextMapOutputCopierId++;
575 private Reporter reporter;
576
577 public MapOutputCopier(Reporter reporter) {
578 setName("MapOutputCopier " + reduceTask.getTaskId() + "." + id);
579 LOG.debug(getName() + " created");
580 this.reporter = reporter;
581 }
582
583 /**
584 * Fail the current file that we are fetching
585 * @return were we currently fetching?
586 */
587 public synchronized boolean fail() {
588 if (currentLocation != null) {
589 finish(-1);
590 return true;
591 } else {
592 return false;
593 }
594 }
595
596 /**
597 * Get the current map output location.
598 */
599 public synchronized MapOutputLocation getLocation() {
600 return currentLocation;
601 }
602
603 private synchronized void start(MapOutputLocation loc) {
604 currentLocation = loc;
605 }
606
607 private synchronized void finish(long size) {
608 if (currentLocation != null) {
609 LOG.debug(getName() + " finishing " + currentLocation + " =" + size);
610 synchronized (copyResults) {
611 copyResults.add(new CopyResult(currentLocation, size));
612 copyResults.notify();
613 }
614 currentLocation = null;
615 }
616 }
617
618 /** Loop forever and fetch map outputs as they become available.
619 * The thread exits when it is interrupted by {@link ReduceTaskRunner}
620 */
621 public void run() {
622 while (true) {
623 try {
624 MapOutputLocation loc = null;
625 long size = -1;
626
627 synchronized (scheduledCopies) {
628 while (scheduledCopies.isEmpty()) {
629 scheduledCopies.wait();
630 }
631 loc = scheduledCopies.remove(0);
632 }
633
634 try {
635 shuffleClientMetrics.threadBusy();
636 start(loc);
637 size = copyOutput(loc);
638 shuffleClientMetrics.successFetch();
639 } catch (IOException e) {
640 LOG.warn(reduceTask.getTaskId() + " copy failed: " +
641 loc.getMapTaskId() + " from " + loc.getHost());
642 LOG.warn(StringUtils.stringifyException(e));
643 shuffleClientMetrics.failedFetch();
644
645 // Reset
646 size = -1;
647 } finally {
648 shuffleClientMetrics.threadFree();
649 finish(size);
650 }
651 } catch (InterruptedException e) {
652 return; // ALL DONE
653 } catch (Throwable th) {
654 LOG.error("Map output copy failure: " +
655 StringUtils.stringifyException(th));
656 }
657 }
658 }
659
660 /** Copies a a map output from a remote host, via HTTP.
661 * @param currentLocation the map output location to be copied
662 * @return the path (fully qualified) of the copied file
663 * @throws IOException if there is an error copying the file
664 * @throws InterruptedException if the copier should give up
665 */
666 private long copyOutput(MapOutputLocation loc
667 ) throws IOException, InterruptedException {
668 // check if we still need to copy the output from this location
669 if (!neededOutputs.contains(loc.getMapId()) ||
670 obsoleteMapIds.contains(loc.getMapTaskId())) {
671 return CopyResult.OBSOLETE;
672 }
673
674 String reduceId = reduceTask.getTaskId();
675 LOG.info(reduceId + " Copying " + loc.getMapTaskId() +
676 " output from " + loc.getHost() + ".");
677 // a temp filename. If this file gets created in ramfs, we're fine,
678 // else, we will check the localFS to find a suitable final location
679 // for this path
680 Path filename = new Path("/" + reduceId + "/map_" +
681 loc.getMapId() + ".out");
682 // a working filename that will be unique to this attempt
683 Path tmpFilename = new Path(filename + "-" + id);
684 // this copies the map output file
685 tmpFilename = loc.getFile(inMemFileSys, localFileSys, shuffleClientMetrics,
686 tmpFilename, lDirAlloc,
687 conf, reduceTask.getPartition(),
688 STALLED_COPY_TIMEOUT, reporter);
689 if (!neededOutputs.contains(loc.getMapId())) {
690 if (tmpFilename != null) {
691 FileSystem fs = tmpFilename.getFileSystem(conf);
692 fs.delete(tmpFilename);
693 }
694 return CopyResult.OBSOLETE;
695 }
696 if (tmpFilename == null)
697 throw new IOException("File " + filename + "-" + id +
698 " not created");
699 long bytes = -1;
700 // lock the ReduceTask while we do the rename
701 synchronized (ReduceTask.this) {
702 // This file could have been created in the inmemory
703 // fs or the localfs. So need to get the filesystem owning the path.
704 FileSystem fs = tmpFilename.getFileSystem(conf);
705 if (!neededOutputs.contains(loc.getMapId())) {
706 fs.delete(tmpFilename);
707 return CopyResult.OBSOLETE;
708 }
709
710 bytes = fs.getLength(tmpFilename);
711 //resolve the final filename against the directory where the tmpFile
712 //got created
713 filename = new Path(tmpFilename.getParent(), filename.getName());
714 // if we can't rename the file, something is broken (and IOException
715 // will be thrown).
716 if (!fs.rename(tmpFilename, filename)) {
717 fs.delete(tmpFilename);
718 bytes = -1;
719 throw new IOException("failure to rename map output " +
720 tmpFilename);
721 }
722
723 LOG.info(reduceId + " done copying " + loc.getMapTaskId() +
724 " output from " + loc.getHost() + ".");
725 //Create a thread to do merges. Synchronize access/update to
726 //mergeInProgress
727 if (!mergeInProgress &&
728 (inMemFileSys.getPercentUsed() >= MAX_INMEM_FILESYS_USE ||
729 (mergeThreshold > 0 &&
730 inMemFileSys.getNumFiles(MAP_OUTPUT_FILTER) >=
731 mergeThreshold))&&
732 mergeThrowable == null) {
733 LOG.info(reduceId + " InMemoryFileSystem " +
734 inMemFileSys.getUri().toString() +
735 " is " + inMemFileSys.getPercentUsed() +
736 " full. Triggering merge");
737 InMemFSMergeThread m = new InMemFSMergeThread(inMemFileSys,
738 (LocalFileSystem)localFileSys, sorter);
739 m.setName("Thread for merging in memory files");
740 m.setDaemon(true);
741 mergeInProgress = true;
742 m.start();
743 }
744 neededOutputs.remove(loc.getMapId());
745 }
746 return bytes;
747 }
748
749 }
750
751 private void configureClasspath(JobConf conf)
752 throws IOException {
753
754 // get the task and the current classloader which will become the parent
755 Task task = ReduceTask.this;
756 ClassLoader parent = conf.getClassLoader();
757
758 // get the work directory which holds the elements we are dynamically
759 // adding to the classpath
760 File workDir = new File(task.getJobFile()).getParentFile();
761 File jobCacheDir = new File(workDir.getParent(), "work");
762 ArrayList<URL> urllist = new ArrayList<URL>();
763
764 // add the jars and directories to the classpath
765 String jar = conf.getJar();
766 if (jar != null) {
767 File[] libs = new File(jobCacheDir, "lib").listFiles();
768 if (libs != null) {
769 for (int i = 0; i < libs.length; i++) {
770 urllist.add(libs[i].toURL());
771 }
772 }
773 urllist.add(new File(jobCacheDir, "classes").toURL());
774 urllist.add(jobCacheDir.toURL());
775
776 }
777 urllist.add(workDir.toURL());
778
779 // create a new classloader with the old classloader as its parent
780 // then set that classloader as the one used by the current jobconf
781 URL[] urls = urllist.toArray(new URL[urllist.size()]);
782 URLClassLoader loader = new URLClassLoader(urls, parent);
783 conf.setClassLoader(loader);
784 }
785
786 public ReduceCopier(TaskUmbilicalProtocol umbilical, JobConf conf)
787 throws IOException {
788
789 configureClasspath(conf);
790 this.shuffleClientMetrics = new ShuffleClientMetrics(conf);
791 this.umbilical = umbilical;
792 this.reduceTask = ReduceTask.this;
793 this.scheduledCopies = new ArrayList<MapOutputLocation>(100);
794 this.copyResults = new ArrayList<CopyResult>(100);
795 this.numCopiers = conf.getInt("mapred.reduce.parallel.copies", 5);
796 this.maxBackoff = conf.getInt("mapred.reduce.copy.backoff", 300);
797 this.mergeThreshold = conf.getInt("mapred.inmem.merge.threshold", 1000);
798
799 //we want to distinguish inmem fs instances for different reduces. Hence,
800 //append a unique string in the uri for the inmem fs name
801 URI uri = URI.create("ramfs://mapoutput" + reduceTask.hashCode());
802 inMemFileSys = (InMemoryFileSystem)FileSystem.get(uri, conf);
803 LOG.info(reduceTask.getTaskId() + " Created an InMemoryFileSystem, uri: "
804 + uri);
805 ramfsMergeOutputSize = (long)(MAX_INMEM_FILESYS_USE *
806 inMemFileSys.getFSSize());
807 localFileSys = FileSystem.getLocal(conf);
808 //create an instance of the sorter
809 sorter =
810 new SequenceFile.Sorter(inMemFileSys, conf.getOutputKeyComparator(),
811 conf.getMapOutputValueClass(), conf);
812 sorter.setProgressable(getReporter(umbilical));
813
814 // hosts -> next contact time
815 this.penaltyBox = new Hashtable<String, Long>();
816
817 // hostnames
818 this.uniqueHosts = new HashSet<String>();
819
820 this.lastPollTime = 0;
821
822
823 // Seed the random number generator with a reasonably globally unique seed
824 long randomSeed = System.nanoTime() +
825 (long)Math.pow(this.reduceTask.getPartition(),
826 (this.reduceTask.getPartition()%10)
827 );
828 this.random = new Random(randomSeed);
829 }
830
831 public boolean fetchOutputs() throws IOException {
832 final int numOutputs = reduceTask.getNumMaps();
833 List<MapOutputLocation> knownOutputs =
834 new ArrayList<MapOutputLocation>(numCopiers);
835 int numInFlight = 0, numCopied = 0;
836 int lowThreshold = numCopiers*2;
837 long bytesTransferred = 0;
838 DecimalFormat mbpsFormat = new DecimalFormat("0.00");
839 Random backoff = new Random();
840 final Progress copyPhase =
841 reduceTask.getProgress().phase();
842
843 //tweak the probe sample size (make it a function of numCopiers)
844 probe_sample_size = Math.max(numCopiers*5, 50);
845
846 for (int i = 0; i < numOutputs; i++) {
847 neededOutputs.add(i);
848 copyPhase.addPhase(); // add sub-phase per file
849 }
850
851 copiers = new MapOutputCopier[numCopiers];
852
853 Reporter reporter = getReporter(umbilical);
854 // start all the copying threads
855 for (int i=0; i < copiers.length; i++) {
856 copiers[i] = new MapOutputCopier(reporter);
857 copiers[i].start();
858 }
859
860 // start the clock for bandwidth measurement
861 long startTime = System.currentTimeMillis();
862 long currentTime = startTime;
863 IntWritable fromEventId = new IntWritable(0);
864
865 try {
866 // loop until we get all required outputs
867 while (!neededOutputs.isEmpty() && mergeThrowable == null) {
868
869 LOG.info(reduceTask.getTaskId() + " Need " + neededOutputs.size() +
870 " map output(s)");
871
872 try {
873 // Put the hash entries for the failed fetches. Entries here
874 // might be replaced by (mapId) hashkeys from new successful
875 // Map executions, if the fetch failures were due to lost tasks.
876 // The replacements, if at all, will happen when we query the
877 // tasktracker and put the mapId hashkeys with new
878 // MapOutputLocations as values
879 knownOutputs.addAll(retryFetches);
880
881 // The call getMapCompletionEvents will update fromEventId to
882 // used for the next call to getMapCompletionEvents
883 int currentNumKnownMaps = knownOutputs.size();
884 int currentNumObsoleteMapIds = obsoleteMapIds.size();
885 getMapCompletionEvents(fromEventId, knownOutputs);
886
887 LOG.info(reduceTask.getTaskId() + ": " +
888 "Got " + (knownOutputs.size()-currentNumKnownMaps) +
889 " new map-outputs & " +
890 (obsoleteMapIds.size()-currentNumObsoleteMapIds) +
891 " obsolete map-outputs from tasktracker and " +
892 retryFetches.size() + " map-outputs from previous failures"
893 );
894
895 // clear the "failed" fetches hashmap
896 retryFetches.clear();
897 }
898 catch (IOException ie) {
899 LOG.warn(reduceTask.getTaskId() +
900 " Problem locating map outputs: " +
901 StringUtils.stringifyException(ie));
902 }
903
904 // now walk through the cache and schedule what we can
905 int numKnown = knownOutputs.size(), numScheduled = 0;
906 int numSlow = 0, numDups = 0;
907
908 LOG.info(reduceTask.getTaskId() + " Got " + numKnown +
909 " known map output location(s); scheduling...");
910
911 synchronized (scheduledCopies) {
912 // Randomize the map output locations to prevent
913 // all reduce-tasks swamping the same tasktracker
914 Collections.shuffle(knownOutputs, this.random);
915
916 Iterator locIt = knownOutputs.iterator();
917
918 currentTime = System.currentTimeMillis();
919 while (locIt.hasNext()) {
920
921 MapOutputLocation loc = (MapOutputLocation)locIt.next();
922
923 // Do not schedule fetches from OBSOLETE maps
924 if (obsoleteMapIds.contains(loc.getMapTaskId())) {
925 locIt.remove();
926 continue;
927 }
928
929 Long penaltyEnd = penaltyBox.get(loc.getHost());
930 boolean penalized = false, duplicate = false;
931
932 if (penaltyEnd != null && currentTime < penaltyEnd.longValue()) {
933 penalized = true; numSlow++;
934 }
935 if (uniqueHosts.contains(loc.getHost())) {
936 duplicate = true; numDups++;
937 }
938
939 if (!penalized && !duplicate) {
940 uniqueHosts.add(loc.getHost());
941 scheduledCopies.add(loc);
942 locIt.remove(); // remove from knownOutputs
943 numInFlight++; numScheduled++;
944 }
945 }
946 scheduledCopies.notifyAll();
947 }
948 LOG.info(reduceTask.getTaskId() + " Scheduled " + numScheduled +
949 " of " + numKnown + " known outputs (" + numSlow +
950 " slow hosts and " + numDups + " dup hosts)");
951
952 // if we have no copies in flight and we can't schedule anything
953 // new, just wait for a bit
954 try {
955 if (numInFlight == 0 && numScheduled == 0) {
956 // we should indicate progress as we don't want TT to think
957 // we're stuck and kill us
958 reporter.progress();
959 Thread.sleep(5000);
960 }
961 } catch (InterruptedException e) { } // IGNORE
962
963 while (numInFlight > 0 && mergeThrowable == null) {
964 LOG.debug(reduceTask.getTaskId() + " numInFlight = " +
965 numInFlight);
966 CopyResult cr = getCopyResult();
967
968 if (cr != null) {
969 if (cr.getSuccess()) { // a successful copy
970 numCopied++;
971 bytesTransferred += cr.getSize();
972
973 long secsSinceStart =
974 (System.currentTimeMillis()-startTime)/1000+1;
975 float mbs = ((float)bytesTransferred)/(1024*1024);
976 float transferRate = mbs/secsSinceStart;
977
978 copyPhase.startNextPhase();
979 copyPhase.setStatus("copy (" + numCopied + " of " + numOutputs
980 + " at " +
981 mbpsFormat.format(transferRate) + " MB/s)");
982 } else if (cr.isObsolete()) {
983 //ignore
984 LOG.info(reduceTask.getTaskId() +
985 " Ignoring obsolete copy result for Map Task: " +
986 cr.getLocation().getMapTaskId() + " from host: " +
987 cr.getHost());
988 } else {
989 retryFetches.add(cr.getLocation());
990
991 // wait a random amount of time for next contact
992 currentTime = System.currentTimeMillis();
993 long nextContact = currentTime + 60 * 1000 +
994 backoff.nextInt(maxBackoff*1000);
995 penaltyBox.put(cr.getHost(), nextContact);
996 LOG.warn(reduceTask.getTaskId() + " adding host " +
997 cr.getHost() + " to penalty box, next contact in " +
998 ((nextContact-currentTime)/1000) + " seconds");
999
1000 // other outputs from the failed host may be present in the
1001 // knownOutputs cache, purge them. This is important in case
1002 // the failure is due to a lost tasktracker (causes many
1003 // unnecessary backoffs). If not, we only take a small hit
1004 // polling the tasktracker a few more times
1005 Iterator locIt = knownOutputs.iterator();
1006 while (locIt.hasNext()) {
1007 MapOutputLocation loc = (MapOutputLocation)locIt.next();
1008 if (cr.getHost().equals(loc.getHost())) {
1009 retryFetches.add(loc);
1010 locIt.remove();
1011 }
1012 }
1013 }
1014 uniqueHosts.remove(cr.getHost());
1015 numInFlight--;
1016 }
1017
1018 boolean busy = true;
1019 // ensure we have enough to keep us busy
1020 if (numInFlight < lowThreshold && (numOutputs-numCopied) >
1021 probe_sample_size) {
1022 busy = false;
1023 }
1024 //Check whether we have more CopyResult to check. If there is none,
1025 //and we are not busy enough, break
1026 synchronized (copyResults) {
1027 if (copyResults.size() == 0 && !busy) {
1028 break;
1029 }
1030 }
1031 }
1032
1033 }
1034
1035 // all done, inform the copiers to exit
1036 synchronized (copiers) {
1037 synchronized (scheduledCopies) {
1038 for (int i=0; i < copiers.length; i++) {
1039 copiers[i].interrupt();
1040 copiers[i] = null;
1041 }
1042 }
1043 }
1044
1045 //Do a merge of in-memory files (if there are any)
1046 if (mergeThrowable == null) {
1047 try {
1048 //wait for an ongoing merge (if it is in flight) to complete
1049 while (mergeInProgress) {
1050 Thread.sleep(200);
1051 }
1052 LOG.info(reduceTask.getTaskId() +
1053 " Copying of all map outputs complete. " +
1054 "Initiating the last merge on the remaining files in " +
1055 inMemFileSys.getUri());
1056 if (mergeThrowable != null) {
1057 //this could happen if the merge that
1058 //was in progress threw an exception
1059 throw mergeThrowable;
1060 }
1061 //initiate merge
1062 Path[] inMemClosedFiles = inMemFileSys.getFiles(MAP_OUTPUT_FILTER);
1063 if (inMemClosedFiles.length == 0) {
1064 LOG.info(reduceTask.getTaskId() + "Nothing to merge from " +
1065 inMemFileSys.getUri());
1066 return neededOutputs.isEmpty();
1067 }
1068 //name this output file same as the name of the first file that is
1069 //there in the current list of inmem files (this is guaranteed to be
1070 //absent on the disk currently. So we don't overwrite a prev.
1071 //created spill). Also we need to create the output file now since
1072 //it is not guaranteed that this file will be present after merge
1073 //is called (we delete empty sequence files as soon as we see them
1074 //in the merge method)
1075 int mapId = extractMapIdFromPathName(inMemClosedFiles[0]);
1076 Path outputPath = mapOutputFile.getInputFileForWrite(mapId,
1077 reduceTask.getTaskId(), ramfsMergeOutputSize);
1078 SequenceFile.Writer writer = sorter.cloneFileAttributes(
1079 inMemFileSys.makeQualified(inMemClosedFiles[0]),
1080 localFileSys.makeQualified(outputPath), null);
1081
1082 SequenceFile.Sorter.RawKeyValueIterator rIter = null;
1083 try {
1084 rIter = sorter.merge(inMemClosedFiles, true,
1085 inMemClosedFiles.length,
1086 new Path(reduceTask.getTaskId()));
1087 } catch (Exception e) {
1088 //make sure that we delete the ondisk file that we created earlier
1089 //when we invoked cloneFileAttributes
1090 writer.close();
1091 localFileSys.delete(inMemClosedFiles[0]);
1092 throw new IOException (StringUtils.stringifyException(e));
1093 }
1094 sorter.writeFile(rIter, writer);
1095 writer.close();
1096 LOG.info(reduceTask.getTaskId() +
1097 " Merge of the " +inMemClosedFiles.length +
1098 " files in InMemoryFileSystem complete." +
1099 " Local file is " + outputPath);
1100 } catch (Throwable t) {
1101 LOG.warn(reduceTask.getTaskId() +
1102 " Final merge of the inmemory files threw an exception: " +
1103 StringUtils.stringifyException(t));
1104 return false;
1105 }
1106 }
1107 return mergeThrowable == null && neededOutputs.isEmpty();
1108 } finally {
1109 inMemFileSys.close();
1110 }
1111 }
1112
1113
1114 private CopyResult getCopyResult() {
1115 synchronized (copyResults) {
1116 while (copyResults.isEmpty()) {
1117 try {
1118 copyResults.wait();
1119 } catch (InterruptedException e) { }
1120 }
1121 if (copyResults.isEmpty()) {
1122 return null;
1123 } else {
1124 return copyResults.remove(0);
1125 }
1126 }
1127 }
1128
1129 /**
1130 * Queries the task tracker for a set of map-completion events from
1131 * a given event ID.
1132 * @param fromEventId the first event ID we want to start from, this is
1133 * modified by the call to this method
1134 * @param jobClient the job tracker
1135 * @return a set of locations to copy outputs from
1136 * @throws IOException
1137 */
1138 private void getMapCompletionEvents(IntWritable fromEventId,
1139 List<MapOutputLocation> knownOutputs)
1140 throws IOException {
1141
1142 long currentTime = System.currentTimeMillis();
1143 long pollTime = lastPollTime + MIN_POLL_INTERVAL;
1144 while (currentTime < pollTime) {
1145 try {
1146 Thread.sleep(pollTime-currentTime);
1147 } catch (InterruptedException ie) { } // IGNORE
1148 currentTime = System.currentTimeMillis();
1149 }
1150
1151 TaskCompletionEvent events[] =
1152 umbilical.getMapCompletionEvents(reduceTask.getJobId(),
1153 fromEventId.get(), probe_sample_size);
1154
1155 // Note the last successful poll time-stamp
1156 lastPollTime = currentTime;
1157
1158 // Update the last seen event ID
1159 fromEventId.set(fromEventId.get() + events.length);
1160
1161 // Process the TaskCompletionEvents:
1162 // 1. Save the SUCCEEDED maps in knownOutputs to fetch the outputs.
1163 // 2. Save the OBSOLETE/FAILED/KILLED maps in obsoleteOutputs to stop fetching
1164 // from those maps.
1165 // 3. Remove TIPFAILED maps from neededOutputs since we don't need their
1166 // outputs at all.
1167 for (TaskCompletionEvent event : events) {
1168 switch (event.getTaskStatus()) {
1169 case SUCCEEDED:
1170 {
1171 URI u = URI.create(event.getTaskTrackerHttp());
1172 String host = u.getHost();
1173 int port = u.getPort();
1174 String taskId = event.getTaskId();
1175 int mId = event.idWithinJob();
1176 knownOutputs.add(new MapOutputLocation(taskId, mId, host, port));
1177 }
1178 break;
1179 case FAILED:
1180 case KILLED:
1181 case OBSOLETE:
1182 {
1183 obsoleteMapIds.add(event.getTaskId());
1184 LOG.info("Ignoring obsolete output of " + event.getTaskStatus() +
1185 " map-task: '" + event.getTaskId() + "'");
1186 }
1187 break;
1188 case TIPFAILED:
1189 {
1190 neededOutputs.remove(event.idWithinJob());
1191 LOG.info("Ignoring output of failed map TIP: '" +
1192 event.getTaskId() + "'");
1193 }
1194 break;
1195 }
1196 }
1197
1198 }
1199
1200
1201 private class InMemFSMergeThread extends Thread {
1202 private InMemoryFileSystem inMemFileSys;
1203 private LocalFileSystem localFileSys;
1204 private SequenceFile.Sorter sorter;
1205
1206 public InMemFSMergeThread(InMemoryFileSystem inMemFileSys,
1207 LocalFileSystem localFileSys, SequenceFile.Sorter sorter) {
1208 this.inMemFileSys = inMemFileSys;
1209 this.localFileSys = localFileSys;
1210 this.sorter = sorter;
1211 }
1212 public void run() {
1213 LOG.info(reduceTask.getTaskId() + " Thread started: " + getName());
1214 try {
1215 //initiate merge
1216 Path[] inMemClosedFiles = inMemFileSys.getFiles(MAP_OUTPUT_FILTER);
1217 //Note that the above Path[] could be of length 0 if all copies are
1218 //in flight. So we make sure that we have some 'closed' map
1219 //output files to merge to get the benefit of in-memory merge
1220 if (inMemClosedFiles.length >=
1221 (int)(MAX_INMEM_FILESYS_USE/MAX_INMEM_FILESIZE_FRACTION)) {
1222 //name this output file same as the name of the first file that is
1223 //there in the current list of inmem files (this is guaranteed to
1224 //be absent on the disk currently. So we don't overwrite a prev.
1225 //created spill). Also we need to create the output file now since
1226 //it is not guaranteed that this file will be present after merge
1227 //is called (we delete empty sequence files as soon as we see them
1228 //in the merge method)
1229
1230 //figure out the mapId
1231 int mapId = extractMapIdFromPathName(inMemClosedFiles[0]);
1232
1233 Path outputPath = mapOutputFile.getInputFileForWrite(mapId,
1234 reduceTask.getTaskId(), ramfsMergeOutputSize);
1235
1236 SequenceFile.Writer writer = sorter.cloneFileAttributes(
1237 inMemFileSys.makeQualified(inMemClosedFiles[0]),
1238 localFileSys.makeQualified(outputPath), null);
1239 SequenceFile.Sorter.RawKeyValueIterator rIter;
1240 try {
1241 rIter = sorter.merge(inMemClosedFiles, true,
1242 inMemClosedFiles.length, new Path(reduceTask.getTaskId()));
1243 } catch (Exception e) {
1244 //make sure that we delete the ondisk file that we created
1245 //earlier when we invoked cloneFileAttributes
1246 writer.close();
1247 localFileSys.delete(outputPath);
1248 throw new IOException (StringUtils.stringifyException(e));
1249 }
1250 sorter.writeFile(rIter, writer);
1251 writer.close();
1252 LOG.info(reduceTask.getTaskId() +
1253 " Merge of the " +inMemClosedFiles.length +
1254 " files in InMemoryFileSystem complete." +
1255 " Local file is " + outputPath);
1256 }
1257 else {
1258 LOG.info(reduceTask.getTaskId() + " Nothing to merge from " +
1259 inMemFileSys.getUri());
1260 }
1261 } catch (Throwable t) {
1262 LOG.warn(reduceTask.getTaskId() +
1263 " Intermediate Merge of the inmemory files threw an exception: "
1264 + StringUtils.stringifyException(t));
1265 ReduceCopier.this.mergeThrowable = t;
1266 }
1267 finally {
1268 mergeInProgress = false;
1269 }
1270 }
1271 }
1272 final private PathFilter MAP_OUTPUT_FILTER = new PathFilter() {
1273 public boolean accept(Path file) {
1274 return file.toString().endsWith(".out");
1275 }
1276 };
1277 }
1278 }