| Method from org.apache.hadoop.mapred.JobClient Detail: |
public synchronized void close() throws IOException {
}
|
public ClusterStatus getClusterStatus() throws IOException {
return jobSubmitClient.getClusterStatus();
}
|
static Configuration getConfiguration(String jobTrackerSpec) {
Configuration conf = new Configuration();
if (jobTrackerSpec != null) {
if (jobTrackerSpec.indexOf(":") >= 0) {
conf.set("mapred.job.tracker", jobTrackerSpec);
} else {
String classpathFile = "hadoop-" + jobTrackerSpec + ".xml";
URL validate = conf.getResource(classpathFile);
if (validate == null) {
throw new RuntimeException(classpathFile + " not found on CLASSPATH");
}
conf.addFinalResource(classpathFile);
}
}
return conf;
}
|
public synchronized FileSystem getFs() throws IOException {
if (this.fs == null) {
String fsName = jobSubmitClient.getFilesystemName();
this.fs = FileSystem.getNamed(fsName, this.conf);
}
return fs;
}
Get a filesystem handle. We need this to prepare jobs
for submission to the MapReduce system. |
public RunningJob getJob(String jobid) throws IOException {
JobStatus status = jobSubmitClient.getJobStatus(jobid);
if (status != null) {
return new NetworkedJob(status);
} else {
return null;
}
}
Get an RunningJob object to track an ongoing job. Returns
null if the id does not correspond to any known job. |
public TaskReport[] getMapTaskReports(String jobId) throws IOException {
return jobSubmitClient.getMapTaskReports(jobId);
}
Get the information of the current state of the map tasks of a job. |
public TaskReport[] getReduceTaskReports(String jobId) throws IOException {
return jobSubmitClient.getReduceTaskReports(jobId);
}
Get the information of the current state of the reduce tasks of a job. |
public JobClient.TaskStatusFilter getTaskOutputFilter() {
return this.taskOutputFilter;
}
Returns task output filter. |
public static JobClient.TaskStatusFilter getTaskOutputFilter(JobConf job) {
return TaskStatusFilter.valueOf(job.get("jobclient.output.filter",
"FAILED"));
}
Get the task output filter out of the JobConf |
public void init(JobConf conf) throws IOException {
String tracker = conf.get("mapred.job.tracker", "local");
if ("local".equals(tracker)) {
this.jobSubmitClient = new LocalJobRunner(conf);
} else {
this.jobSubmitClient = createProxy(JobTracker.getAddress(conf), conf);
}
}
|
public JobStatus[] jobsToComplete() throws IOException {
return jobSubmitClient.jobsToComplete();
}
|
public static void main(String[] argv) throws Exception {
int res = new JobClient().doMain(new Configuration(), argv);
System.exit(res);
}
|
static JobClient.RawSplit[] readSplitFile(DataInput in) throws IOException {
byte[] header = new byte[SPLIT_FILE_HEADER.length];
in.readFully(header);
if (!Arrays.equals(SPLIT_FILE_HEADER, header)) {
throw new IOException("Invalid header on split file");
}
int vers = WritableUtils.readVInt(in);
if (vers != CURRENT_SPLIT_FILE_VERSION) {
throw new IOException("Unsupported split version " + vers);
}
int len = WritableUtils.readVInt(in);
RawSplit[] result = new RawSplit[len];
for(int i=0; i < len; ++i) {
result[i] = new RawSplit();
result[i].readFields(in);
}
return result;
}
Read a splits file into a list of raw splits |
public int run(String[] argv) throws Exception {
if (argv.length < 2) {
String cmd = "JobClient -submit < job > | -status < id > |" +
" -events < id > |" +
" -kill < id > [-jt < jobtracker:port >|< config >]";
System.out.println(cmd);
throw new RuntimeException("JobClient:" + cmd);
}
// Process args
String submitJobFile = null;
String jobid = null;
boolean getStatus = false;
boolean killJob = false;
for (int i = 0; i < argv.length; i++) {
if ("-submit".equals(argv[i])) {
submitJobFile = argv[i+1];
i++;
} else if ("-status".equals(argv[i])) {
jobid = argv[i+1];
getStatus = true;
i++;
} else if ("-kill".equals(argv[i])) {
jobid = argv[i+1];
killJob = true;
i++;
} else if ("-events".equals(argv[i])) {
listEvents(argv[i+1], Integer.parseInt(argv[i+2]),
Integer.parseInt(argv[i+3]));
i += 3;
}
}
// initialize JobClient
JobConf conf = null;
if (submitJobFile != null) {
conf = new JobConf(submitJobFile);
} else {
conf = new JobConf(getConf());
}
init(conf);
// Submit the request
int exitCode = -1;
try {
if (submitJobFile != null) {
RunningJob job = submitJob(conf);
System.out.println("Created job " + job.getJobID());
} else if (getStatus) {
RunningJob job = getJob(jobid);
if (job == null) {
System.out.println("Could not find job " + jobid);
} else {
System.out.println();
System.out.println(job);
exitCode = 0;
}
} else if (killJob) {
RunningJob job = getJob(jobid);
if (job == null) {
System.out.println("Could not find job " + jobid);
} else {
job.killJob();
System.out.println("Killed job " + jobid);
exitCode = 0;
}
}
} finally {
close();
}
return exitCode;
}
|
public static RunningJob runJob(JobConf job) throws IOException {
JobClient jc = new JobClient(job);
boolean error = true;
RunningJob running = null;
String lastReport = null;
final int MAX_RETRIES = 5;
int retries = MAX_RETRIES;
TaskStatusFilter filter;
try {
filter = getTaskOutputFilter(job);
} catch(IllegalArgumentException e) {
LOG.warn("Invalid Output filter : " + e.getMessage() +
" Valid values are : NONE, FAILED, SUCCEEDED, ALL");
throw e;
}
try {
running = jc.submitJob(job);
String jobId = running.getJobID();
LOG.info("Running job: " + jobId);
int eventCounter = 0;
while (true) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {}
try {
if (running.isComplete()) {
break;
}
running = jc.getJob(jobId);
String report =
(" map " + StringUtils.formatPercent(running.mapProgress(), 0)+
" reduce " +
StringUtils.formatPercent(running.reduceProgress(), 0));
if (!report.equals(lastReport)) {
LOG.info(report);
lastReport = report;
}
if (filter != TaskStatusFilter.NONE){
TaskCompletionEvent[] events =
running.getTaskCompletionEvents(eventCounter);
eventCounter += events.length;
for(TaskCompletionEvent event : events){
switch(filter){
case SUCCEEDED:
if (event.getTaskStatus() ==
TaskCompletionEvent.Status.SUCCEEDED){
LOG.info(event.toString());
displayTaskLogs(event.getTaskId(), event.getTaskTrackerHttp());
}
break;
case FAILED:
if (event.getTaskStatus() ==
TaskCompletionEvent.Status.FAILED){
LOG.info(event.toString());
displayTaskLogs(event.getTaskId(), event.getTaskTrackerHttp());
}
break;
case KILLED:
if (event.getTaskStatus() == TaskCompletionEvent.Status.KILLED){
LOG.info(event.toString());
}
break;
case ALL:
LOG.info(event.toString());
displayTaskLogs(event.getTaskId(), event.getTaskTrackerHttp());
break;
}
}
}
retries = MAX_RETRIES;
} catch (IOException ie) {
if (--retries == 0) {
LOG.warn("Final attempt failed, killing job.");
throw ie;
}
LOG.info("Communication problem with server: " +
StringUtils.stringifyException(ie));
}
}
if (!running.isSuccessful()) {
throw new IOException("Job failed!");
}
LOG.info("Job complete: " + jobId);
running.getCounters().log(LOG);
error = false;
} finally {
if (error && (running != null)) {
running.killJob();
}
jc.close();
}
return running;
}
Utility that submits a job, then polls for progress until the job is
complete. |
public void setTaskOutputFilter(JobClient.TaskStatusFilter newValue) {
this.taskOutputFilter = newValue;
}
Sets the output filter for tasks. only those tasks are printed whose
output matches the filter. |
public static void setTaskOutputFilter(JobConf job,
JobClient.TaskStatusFilter newValue) {
job.set("jobclient.output.filter", newValue.toString());
}
Modify the JobConf to set the task output filter |
public RunningJob submitJob(String jobFile) throws IOException, FileNotFoundException, InvalidJobConfException {
// Load in the submitted job details
JobConf job = new JobConf(jobFile);
return submitJob(job);
}
Submit a job to the MR system |
public RunningJob submitJob(JobConf job) throws IOException, FileNotFoundException, InvalidJobConfException {
//
// First figure out what fs the JobTracker is using. Copy the
// job to it, under a temporary name. This allows DFS to work,
// and under the local fs also provides UNIX-like object loading
// semantics. (that is, if the job file is deleted right after
// submission, we can still run the submission to completion)
//
// Create a number of filenames in the JobTracker's fs namespace
String jobId = jobSubmitClient.getNewJobId();
Path submitJobDir = new Path(job.getSystemDir(), jobId);
FileSystem fs = getFs();
LOG.debug("default FileSystem: " + fs.getUri());
fs.delete(submitJobDir);
Path submitJobFile = new Path(submitJobDir, "job.xml");
Path submitJarFile = new Path(submitJobDir, "job.jar");
Path submitSplitFile = new Path(submitJobDir, "job.split");
// set the timestamps of the archives and files
URI[] tarchives = DistributedCache.getCacheArchives(job);
if (tarchives != null) {
StringBuffer archiveTimestamps =
new StringBuffer(String.valueOf(DistributedCache.getTimestamp(job, tarchives[0])));
for (int i = 1; i < tarchives.length; i++) {
archiveTimestamps.append(",");
archiveTimestamps.append(String.valueOf(DistributedCache.getTimestamp(job, tarchives[i])));
}
DistributedCache.setArchiveTimestamps(job, archiveTimestamps.toString());
}
URI[] tfiles = DistributedCache.getCacheFiles(job);
if (tfiles != null) {
StringBuffer fileTimestamps =
new StringBuffer(String.valueOf(DistributedCache.getTimestamp(job, tfiles[0])));
for (int i = 1; i < tfiles.length; i++) {
fileTimestamps.append(",");
fileTimestamps.append(String.valueOf(DistributedCache.getTimestamp(job, tfiles[i])));
}
DistributedCache.setFileTimestamps(job, fileTimestamps.toString());
}
String originalJarPath = job.getJar();
short replication = (short)job.getInt("mapred.submit.replication", 10);
if (originalJarPath != null) { // copy jar to JobTracker's fs
// use jar name if job is not named.
if ("".equals(job.getJobName())){
job.setJobName(new Path(originalJarPath).getName());
}
job.setJar(submitJarFile.toString());
fs.copyFromLocalFile(new Path(originalJarPath), submitJarFile);
fs.setReplication(submitJarFile, replication);
}
// Set the user's name and working directory
String user = System.getProperty("user.name");
job.setUser(user != null ? user : "Dr Who");
if (job.getWorkingDirectory() == null) {
job.setWorkingDirectory(fs.getWorkingDirectory());
}
// Check the input specification
job.getInputFormat().validateInput(job);
// Check the output specification
job.getOutputFormat().checkOutputSpecs(fs, job);
// Create the splits for the job
LOG.debug("Creating splits at " + fs.makeQualified(submitSplitFile));
InputSplit[] splits =
job.getInputFormat().getSplits(job, job.getNumMapTasks());
// sort the splits into order based on size, so that the biggest
// go first
Arrays.sort(splits, new Comparator< InputSplit >() {
public int compare(InputSplit a, InputSplit b) {
try {
long left = a.getLength();
long right = b.getLength();
if (left == right) {
return 0;
} else if (left < right) {
return 1;
} else {
return -1;
}
} catch (IOException ie) {
throw new RuntimeException("Problem getting input split size",
ie);
}
}
});
// write the splits to a file for the job tracker
FSDataOutputStream out = fs.create(submitSplitFile);
try {
writeSplitsFile(splits, out);
} finally {
out.close();
}
job.set("mapred.job.split.file", submitSplitFile.toString());
job.setNumMapTasks(splits.length);
// Write job file to JobTracker's fs
out = fs.create(submitJobFile, replication);
try {
job.write(out);
} finally {
out.close();
}
//
// Now, actually submit the job (using the submit name)
//
JobStatus status = jobSubmitClient.submitJob(jobId);
if (status != null) {
return new NetworkedJob(status);
} else {
throw new IOException("Could not launch job");
}
}
Submit a job to the MR system |