| Method from org.apache.hadoop.mapred.TaskTracker Detail: |
public synchronized void close() throws IOException {
//
// Kill running tasks. Do this in a 2nd vector, called 'tasksToClose',
// because calling jobHasFinished() may result in an edit to 'tasks'.
//
TreeMap< String, TaskInProgress > tasksToClose =
new TreeMap< String, TaskInProgress >();
tasksToClose.putAll(tasks);
for (TaskInProgress tip : tasksToClose.values()) {
tip.jobHasFinished(false);
}
// Shutdown local RPC servers. Do them
// in parallel, as RPC servers can take a long
// time to shutdown. (They need to wait a full
// RPC timeout, which might be 10-30 seconds.)
new Thread("RPC shutdown") {
public void run() {
if (taskReportServer != null) {
taskReportServer.stop();
taskReportServer = null;
}
}
}.start();
this.running = false;
// Clear local storage
this.mapOutputFile.cleanupStorage();
// Shutdown the fetcher thread
this.mapEventsFetcher.interrupt();
}
Close down the TaskTracker and all its components. We must also shutdown
any running tasks or threads, and cleanup disk space. A new TaskTracker
within the same process space might be restarted, so everything must be
clean. |
public synchronized void done(String taskid) throws IOException {
TaskInProgress tip = (TaskInProgress) tasks.get(taskid);
if (tip != null) {
tip.reportDone();
} else {
LOG.warn("Unknown child task done: "+taskid+". Ignored.");
}
}
|
public synchronized void fsError(String taskId,
String message) throws IOException {
LOG.fatal("Task: " + taskId + " - Killed due to FSError: " + message);
TaskInProgress tip = runningTasks.get(taskId);
tip.reportDiagnosticInfo("FSError: " + message);
purgeTask(tip, true);
}
A child task had a local filesystem error. Kill the task. |
static String getCacheSubdir() {
return TaskTracker.SUBDIR + Path.SEPARATOR + TaskTracker.CACHEDIR;
}
|
public FileSystem getFileSystem() {
return fs;
}
Return the DFS filesystem |
static String getJobCacheSubdir() {
return TaskTracker.SUBDIR + Path.SEPARATOR + TaskTracker.JOBCACHE;
}
|
public InterTrackerProtocol getJobClient() {
return jobClient;
}
The connection to the JobTracker, used by the TaskRunner
for locating remote files. |
JobConf getJobConf() {
return fConf;
}
Get the default job conf for this tracker. |
public TaskCompletionEvent[] getMapCompletionEvents(String jobId,
int fromEventId,
int maxLocs) throws IOException {
TaskCompletionEvent[]mapEvents = TaskCompletionEvent.EMPTY_ARRAY;
RunningJob rjob;
synchronized (runningJobs) {
rjob = runningJobs.get(jobId);
if (rjob != null) {
synchronized (rjob) {
FetchStatus f = rjob.getFetchStatus();
if (f != null) {
mapEvents = f.getMapEvents(fromEventId, maxLocs);
}
}
}
}
return mapEvents;
}
|
String getName() {
return taskTrackerName;
}
Get the name for this task tracker. |
synchronized List getNonRunningTasks() {
List< TaskStatus > result = new ArrayList< TaskStatus >(tasks.size());
for(Map.Entry< String, TaskInProgress > task: tasks.entrySet()) {
if (!runningTasks.containsKey(task.getKey())) {
result.add(task.getValue().createStatus());
}
}
return result;
}
Get the list of stored tasks on this task tracker. |
public long getProtocolVersion(String protocol,
long clientVersion) throws IOException {
if (protocol.equals(TaskUmbilicalProtocol.class.getName())) {
return TaskUmbilicalProtocol.versionID;
} else {
throw new IOException("Unknown protocol for task tracker: " +
protocol);
}
}
|
synchronized List getRunningTaskStatuses() {
List< TaskStatus > result = new ArrayList< TaskStatus >(runningTasks.size());
for(TaskInProgress tip: runningTasks.values()) {
result.add(tip.createStatus());
}
return result;
}
Get the list of tasks that will be reported back to the
job tracker in the next heartbeat cycle. |
public synchronized Task getTask(String taskid) throws IOException {
TaskInProgress tip = (TaskInProgress) tasks.get(taskid);
if (tip != null) {
return (Task) tip.getTask();
} else {
return null;
}
}
Called upon startup by the child process, to fetch Task data. |
public synchronized int getTaskTrackerReportPort() {
return taskReportPort;
}
Return the port at which the tasktracker bound to |
synchronized void initialize() throws IOException {
// use configured nameserver & interface to get local hostname
this.localHostname =
DNS.getDefaultHost
(fConf.get("mapred.tasktracker.dns.interface","default"),
fConf.get("mapred.tasktracker.dns.nameserver","default"));
//check local disk
checkLocalDirs(this.fConf.getLocalDirs());
fConf.deleteLocalFiles(SUBDIR);
// Clear out state tables
this.tasks.clear();
this.runningTasks = new TreeMap< String, TaskInProgress >();
this.runningJobs = new TreeMap< String, RunningJob >();
this.mapTotal = 0;
this.reduceTotal = 0;
this.acceptNewTasks = true;
this.status = null;
this.minSpaceStart = this.fConf.getLong("mapred.local.dir.minspacestart", 0L);
this.minSpaceKill = this.fConf.getLong("mapred.local.dir.minspacekill", 0L);
int numCopiers = this.fConf.getInt("mapred.reduce.parallel.copies", 5);
//tweak the probe sample size (make it a function of numCopiers)
probe_sample_size = Math.max(numCopiers*5, 50);
this.myMetrics = new TaskTrackerMetrics();
// port numbers
this.taskReportPort = this.fConf.getInt("mapred.task.tracker.report.port", 50050);
// bind address
this.taskReportBindAddress = this.fConf.get("mapred.task.tracker.report.bindAddress", "0.0.0.0");
// RPC initialization
while (true) {
try {
this.taskReportServer = RPC.getServer(this, this.taskReportBindAddress, this.taskReportPort, maxCurrentTasks, false, this.fConf);
this.taskReportServer.start();
break;
} catch (BindException e) {
LOG.info("Could not open report server at " + this.taskReportPort + ", trying new port");
this.taskReportPort++;
}
}
// The rpc-server port can be ephemeral...
// ... ensure we have the correct info
this.taskReportPort = taskReportServer.getListenerAddress().getPort();
this.fConf.setInt("mapred.task.tracker.report.port", this.taskReportPort);
LOG.info("TaskTracker up at: " + this.taskReportPort);
this.taskTrackerName = "tracker_" +
localHostname + ":" + taskReportPort;
LOG.info("Starting tracker " + taskTrackerName);
// Clear out temporary files that might be lying around
DistributedCache.purgeCache(this.fConf);
this.mapOutputFile.cleanupStorage();
this.justStarted = true;
this.jobClient = (InterTrackerProtocol)
RPC.waitForProxy(InterTrackerProtocol.class,
InterTrackerProtocol.versionID,
jobTrackAddr, this.fConf);
this.running = true;
// start the thread that will fetch map task completion events
this.mapEventsFetcher = new MapEventsFetcherThread();
mapEventsFetcher.setDaemon(true);
mapEventsFetcher.setName(
"Map-events fetcher for all reduce tasks " + "on " +
taskTrackerName);
mapEventsFetcher.start();
}
Do the real constructor work here. It's in a separate method
so we can call it again and "recycle" the object after calling
close(). |
public synchronized boolean isIdle() {
return tasks.isEmpty() && tasksToCleanup.isEmpty();
}
Is this task tracker idle? |
public static void main(String[] argv) throws Exception {
StringUtils.startupShutdownMessage(TaskTracker.class, argv, LOG);
if (argv.length != 0) {
System.out.println("usage: TaskTracker");
System.exit(-1);
}
try {
JobConf conf=new JobConf();
// enable the server to track time spent waiting on locks
ReflectionUtils.setContentionTracing
(conf.getBoolean("tasktracker.contention.tracking", false));
new TaskTracker(conf).run();
} catch (Throwable e) {
LOG.error("Can not start task tracker because "+
StringUtils.stringifyException(e));
System.exit(-1);
}
}
Start the TaskTracker, point toward the indicated JobTracker |
public synchronized void mapOutputLost(String taskid,
String errorMsg) throws IOException {
TaskInProgress tip = (TaskInProgress) tasks.get(taskid);
if (tip != null) {
tip.mapOutputLost(errorMsg);
} else {
LOG.warn("Unknown child with bad map output: "+taskid+". Ignored.");
}
}
A completed map task's output has been lost. |
TaskTracker.State offerService() throws Exception {
long lastHeartbeat = 0;
this.fs = FileSystem.getNamed(jobClient.getFilesystemName(), this.fConf);
while (running && !shuttingDown) {
try {
long now = System.currentTimeMillis();
long waitTime = HEARTBEAT_INTERVAL - (now - lastHeartbeat);
if (waitTime > 0) {
// sleeps for the wait time, wakes up if a task is finished.
synchronized(finishedCount) {
if (finishedCount[0] == 0) {
finishedCount.wait(waitTime);
}
finishedCount[0] = 0;
}
}
// Send the heartbeat and process the jobtracker's directives
HeartbeatResponse heartbeatResponse = transmitHeartBeat();
TaskTrackerAction[] actions = heartbeatResponse.getActions();
LOG.debug("Got heartbeatResponse from JobTracker with responseId: " +
heartbeatResponse.getResponseId() + " and " +
((actions != null) ? actions.length : 0) + " actions");
if (reinitTaskTracker(actions)) {
return State.STALE;
}
lastHeartbeat = now;
justStarted = false;
if (actions != null){
for(TaskTrackerAction action: actions) {
if (action instanceof LaunchTaskAction) {
startNewTask((LaunchTaskAction) action);
} else {
tasksToCleanup.put(action);
}
}
}
markUnresponsiveTasks();
killOverflowingTasks();
//we've cleaned up, resume normal operation
if (!acceptNewTasks && isIdle()) {
acceptNewTasks=true;
}
} catch (InterruptedException ie) {
LOG.info("Interrupted. Closing down.");
return State.INTERRUPTED;
} catch (DiskErrorException de) {
String msg = "Exiting task tracker for disk error:\n" +
StringUtils.stringifyException(de);
LOG.error(msg);
synchronized (this) {
jobClient.reportTaskTrackerError(taskTrackerName,
"DiskErrorException", msg);
}
return State.STALE;
} catch (RemoteException re) {
String reClass = re.getClassName();
if (DisallowedTaskTrackerException.class.getName().equals(reClass)) {
LOG.info("Tasktracker disallowed by JobTracker.");
return State.DENIED;
}
} catch (Exception except) {
String msg = "Caught exception: " +
StringUtils.stringifyException(except);
LOG.error(msg);
}
}
return State.NORMAL;
}
Main service loop. Will stay in this loop forever. |
public synchronized boolean ping(String taskid) throws IOException {
return tasks.get(taskid) != null;
}
Child checking to see if we're alive. Normally does nothing. |
public synchronized boolean progress(String taskid,
float progress,
String state,
TaskStatus.Phase phase,
Counters counters) throws IOException {
TaskInProgress tip = (TaskInProgress) tasks.get(taskid);
if (tip != null) {
tip.reportProgress(progress, state, phase, counters);
return true;
} else {
LOG.warn("Progress from unknown child task: "+taskid);
return false;
}
}
Called periodically to report Task progress, from 0.0 to 1.0. |
public synchronized void reportDiagnosticInfo(String taskid,
String info) throws IOException {
TaskInProgress tip = (TaskInProgress) tasks.get(taskid);
if (tip != null) {
tip.reportDiagnosticInfo(info);
} else {
LOG.warn("Error from unknown child task: "+taskid+". Ignored.");
}
}
Called when the task dies before completion, and we want to report back
diagnostic info |
void reportTaskFinished(String taskid) {
TaskInProgress tip;
synchronized (this) {
tip = (TaskInProgress) tasks.get(taskid);
}
if (tip != null) {
tip.taskFinished();
synchronized(finishedCount) {
finishedCount[0]++;
finishedCount.notify();
}
} else {
LOG.warn("Unknown child task finshed: "+taskid+". Ignored.");
}
}
The task is no longer running. It may not have completed successfully |
public void run() {
try {
boolean denied = false;
while (running && !shuttingDown && !denied) {
boolean staleState = false;
try {
// This while-loop attempts reconnects if we get network errors
while (running && !staleState && !shuttingDown && !denied) {
try {
State osState = offerService();
if (osState == State.STALE) {
staleState = true;
} else if (osState == State.DENIED) {
denied = true;
}
} catch (Exception ex) {
if (!shuttingDown) {
LOG.info("Lost connection to JobTracker [" +
jobTrackAddr + "]. Retrying...", ex);
try {
Thread.sleep(5000);
} catch (InterruptedException ie) {
}
}
}
}
} finally {
close();
}
if (shuttingDown) { return; }
LOG.warn("Reinitializing local state");
initialize();
}
if (denied) {
shutdown();
}
} catch (IOException iex) {
LOG.error("Got fatal exception while reinitializing TaskTracker: " +
StringUtils.stringifyException(iex));
return;
}
}
The server retry loop.
This while-loop attempts to connect to the JobTracker. It only
loops when the old TaskTracker has gone bad (its state is
stale somehow) and we need to reinitialize everything. |
public synchronized void shutdown() throws IOException {
shuttingDown = true;
close();
if (this.server != null) {
try {
LOG.info("Shutting down StatusHttpServer");
this.server.stop();
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
}
|