public void run(JobConf job,
TaskUmbilicalProtocol umbilical) throws IOException {
final Reporter reporter = getReporter(umbilical);
// start thread that will handle communication with parent
startCommunicationThread(umbilical);
int numReduceTasks = conf.getNumReduceTasks();
LOG.info("numReduceTasks: " + numReduceTasks);
MapOutputCollector collector = null;
if (numReduceTasks > 0) {
collector = new MapOutputBuffer(umbilical, job, reporter);
} else {
collector = new DirectMapOutputCollector(umbilical, job, reporter);
}
// reinstantiate the split
try {
instantiatedSplit = (InputSplit)
ReflectionUtils.newInstance(job.getClassByName(splitClass), job);
} catch (ClassNotFoundException exp) {
IOException wrap = new IOException("Split class " + splitClass +
" not found");
wrap.initCause(exp);
throw wrap;
}
DataInputBuffer splitBuffer = new DataInputBuffer();
splitBuffer.reset(split.get(), 0, split.getSize());
instantiatedSplit.readFields(splitBuffer);
// if it is a file split, we can give more details
if (instantiatedSplit instanceof FileSplit) {
FileSplit fileSplit = (FileSplit) instantiatedSplit;
job.set("map.input.file", fileSplit.getPath().toString());
job.setLong("map.input.start", fileSplit.getStart());
job.setLong("map.input.length", fileSplit.getLength());
}
final RecordReader rawIn = // open input
job.getInputFormat().getRecordReader(instantiatedSplit, job, reporter);
RecordReader in = new RecordReader() { // wrap in progress reporter
public WritableComparable createKey() {
return rawIn.createKey();
}
public Writable createValue() {
return rawIn.createValue();
}
public synchronized boolean next(Writable key, Writable value)
throws IOException {
setProgress(getProgress());
long beforePos = getPos();
boolean ret = rawIn.next(key, value);
if (ret) {
reporter.incrCounter(MAP_INPUT_RECORDS, 1);
reporter.incrCounter(MAP_INPUT_BYTES, (getPos() - beforePos));
}
return ret;
}
public long getPos() throws IOException { return rawIn.getPos(); }
public void close() throws IOException { rawIn.close(); }
public float getProgress() throws IOException {
return rawIn.getProgress();
}
};
MapRunnable runner =
(MapRunnable)ReflectionUtils.newInstance(job.getMapRunnerClass(), job);
try {
runner.run(in, collector, reporter);
collector.flush();
} finally {
//close
in.close(); // close input
collector.close();
}
done(umbilical);
}
|