1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18 package org.apache.hadoop.mapred;
19
20 import org.apache.commons.logging;
21
22 import org.apache.hadoop.fs;
23 import org.apache.hadoop.io;
24 import org.apache.hadoop.io.retry;
25 import org.apache.hadoop.ipc;
26 import org.apache.hadoop.conf;
27 import org.apache.hadoop.util;
28 import org.apache.hadoop.filecache;
29 import java.io;
30 import java.net;
31 import java.util;
32
33 /*******************************************************
34 * JobClient interacts with the JobTracker network interface.
35 * This object implements the job-control interface, and
36 * should be the primary method by which user programs interact
37 * with the networked job system.
38 *
39 *******************************************************/
40 public class JobClient extends ToolBase implements MRConstants {
41 private static final Log LOG = LogFactory.getLog("org.apache.hadoop.mapred.JobClient");
42 public static enum TaskStatusFilter { NONE, KILLED, FAILED, SUCCEEDED, ALL }
43 private TaskStatusFilter taskOutputFilter = TaskStatusFilter.FAILED;
44
45 static long MAX_JOBPROFILE_AGE = 1000 * 2;
46
47 /**
48 * A NetworkedJob is an implementation of RunningJob. It holds
49 * a JobProfile object to provide some info, and interacts with the
50 * remote service to provide certain functionality.
51 */
52 class NetworkedJob implements RunningJob {
53 JobProfile profile;
54 JobStatus status;
55 long statustime;
56
57 /**
58 * We store a JobProfile and a timestamp for when we last
59 * acquired the job profile. If the job is null, then we cannot
60 * perform any of the tasks. The job might be null if the JobTracker
61 * has completely forgotten about the job. (eg, 24 hours after the
62 * job completes.)
63 */
64 public NetworkedJob(JobStatus job) throws IOException {
65 this.status = job;
66 this.profile = jobSubmitClient.getJobProfile(job.getJobId());
67 this.statustime = System.currentTimeMillis();
68 }
69
70 /**
71 * Some methods rely on having a recent job profile object. Refresh
72 * it, if necessary
73 */
74 synchronized void ensureFreshStatus() throws IOException {
75 if (System.currentTimeMillis() - statustime > MAX_JOBPROFILE_AGE) {
76 this.status = jobSubmitClient.getJobStatus(profile.getJobId());
77 this.statustime = System.currentTimeMillis();
78 }
79 }
80
81 /**
82 * An identifier for the job
83 */
84 public String getJobID() {
85 return profile.getJobId();
86 }
87
88 /**
89 * The user-specified job name
90 */
91 public String getJobName() {
92 return profile.getJobName();
93 }
94
95 /**
96 * The name of the job file
97 */
98 public String getJobFile() {
99 return profile.getJobFile();
100 }
101
102 /**
103 * A URL where the job's status can be seen
104 */
105 public String getTrackingURL() {
106 return profile.getURL().toString();
107 }
108
109 /**
110 * A float between 0.0 and 1.0, indicating the % of map work
111 * completed.
112 */
113 public float mapProgress() throws IOException {
114 ensureFreshStatus();
115 return status.mapProgress();
116 }
117
118 /**
119 * A float between 0.0 and 1.0, indicating the % of reduce work
120 * completed.
121 */
122 public float reduceProgress() throws IOException {
123 ensureFreshStatus();
124 return status.reduceProgress();
125 }
126
127 /**
128 * Returns immediately whether the whole job is done yet or not.
129 */
130 public synchronized boolean isComplete() throws IOException {
131 ensureFreshStatus();
132 return (status.getRunState() == JobStatus.SUCCEEDED ||
133 status.getRunState() == JobStatus.FAILED);
134 }
135
136 /**
137 * True iff job completed successfully.
138 */
139 public synchronized boolean isSuccessful() throws IOException {
140 ensureFreshStatus();
141 return status.getRunState() == JobStatus.SUCCEEDED;
142 }
143
144 /**
145 * Blocks until the job is finished
146 */
147 public void waitForCompletion() throws IOException {
148 while (!isComplete()) {
149 try {
150 Thread.sleep(5000);
151 } catch (InterruptedException ie) {
152 }
153 }
154 }
155
156 /**
157 * Tells the service to terminate the current job.
158 */
159 public synchronized void killJob() throws IOException {
160 jobSubmitClient.killJob(getJobID());
161 }
162 /**
163 * Fetch task completion events from jobtracker for this job.
164 */
165 public synchronized TaskCompletionEvent[] getTaskCompletionEvents(
166 int startFrom) throws IOException{
167 return jobSubmitClient.getTaskCompletionEvents(
168 getJobID(), startFrom, 10);
169 }
170
171 /**
172 * Dump stats to screen
173 */
174 public String toString() {
175 try {
176 ensureFreshStatus();
177 } catch (IOException e) {
178 }
179 return "Job: " + profile.getJobId() + "\n" +
180 "file: " + profile.getJobFile() + "\n" +
181 "tracking URL: " + profile.getURL() + "\n" +
182 "map() completion: " + status.mapProgress() + "\n" +
183 "reduce() completion: " + status.reduceProgress();
184 }
185
186 /**
187 * Returns the counters for this job
188 */
189 public Counters getCounters() throws IOException {
190 return jobSubmitClient.getJobCounters(getJobID());
191 }
192 }
193
194 JobSubmissionProtocol jobSubmitClient;
195 FileSystem fs = null;
196
197 static Random r = new Random();
198
199 /**
200 * Build a job client, connect to the default job tracker
201 */
202 public JobClient() {
203 }
204
205 public JobClient(JobConf conf) throws IOException {
206 setConf(conf);
207 init(conf);
208 }
209
210 public void init(JobConf conf) throws IOException {
211 String tracker = conf.get("mapred.job.tracker", "local");
212 if ("local".equals(tracker)) {
213 this.jobSubmitClient = new LocalJobRunner(conf);
214 } else {
215 this.jobSubmitClient = createProxy(JobTracker.getAddress(conf), conf);
216 }
217 }
218
219 /**
220 * Create a proxy JobSubmissionProtocol that retries timeouts.
221 * @param addr the address to connect to
222 * @param conf the server's configuration
223 * @return a proxy object that will retry timeouts
224 * @throws IOException
225 */
226 private JobSubmissionProtocol createProxy(InetSocketAddress addr,
227 Configuration conf
228 ) throws IOException {
229 JobSubmissionProtocol raw = (JobSubmissionProtocol)
230 RPC.getProxy(JobSubmissionProtocol.class,
231 JobSubmissionProtocol.versionID, addr, conf);
232 RetryPolicy backoffPolicy =
233 RetryPolicies.retryUpToMaximumCountWithProportionalSleep
234 (5, 10, java.util.concurrent.TimeUnit.SECONDS);
235 Map<Class<? extends Exception>, RetryPolicy> handlers =
236 new HashMap<Class<? extends Exception>, RetryPolicy>();
237 handlers.put(SocketTimeoutException.class, backoffPolicy);
238 RetryPolicy backoffTimeOuts =
239 RetryPolicies.retryByException(RetryPolicies.TRY_ONCE_THEN_FAIL,handlers);
240 return (JobSubmissionProtocol)
241 RetryProxy.create(JobSubmissionProtocol.class, raw, backoffTimeOuts);
242 }
243
244 /**
245 * Build a job client, connect to the indicated job tracker.
246 */
247 public JobClient(InetSocketAddress jobTrackAddr,
248 Configuration conf) throws IOException {
249 jobSubmitClient = createProxy(jobTrackAddr, conf);
250 }
251
252 /**
253 */
254 public synchronized void close() throws IOException {
255 }
256
257 /**
258 * Get a filesystem handle. We need this to prepare jobs
259 * for submission to the MapReduce system.
260 */
261 public synchronized FileSystem getFs() throws IOException {
262 if (this.fs == null) {
263 String fsName = jobSubmitClient.getFilesystemName();
264 this.fs = FileSystem.getNamed(fsName, this.conf);
265 }
266 return fs;
267 }
268
269 /**
270 * Submit a job to the MR system
271 */
272 public RunningJob submitJob(String jobFile) throws FileNotFoundException,
273 InvalidJobConfException, IOException {
274 // Load in the submitted job details
275 JobConf job = new JobConf(jobFile);
276 return submitJob(job);
277 }
278
279
280 /**
281 * Submit a job to the MR system
282 */
283 public RunningJob submitJob(JobConf job) throws FileNotFoundException,
284 InvalidJobConfException, IOException {
285 //
286 // First figure out what fs the JobTracker is using. Copy the
287 // job to it, under a temporary name. This allows DFS to work,
288 // and under the local fs also provides UNIX-like object loading
289 // semantics. (that is, if the job file is deleted right after
290 // submission, we can still run the submission to completion)
291 //
292
293 // Create a number of filenames in the JobTracker's fs namespace
294 String jobId = jobSubmitClient.getNewJobId();
295 Path submitJobDir = new Path(job.getSystemDir(), jobId);
296 FileSystem fs = getFs();
297 LOG.debug("default FileSystem: " + fs.getUri());
298 fs.delete(submitJobDir);
299 Path submitJobFile = new Path(submitJobDir, "job.xml");
300 Path submitJarFile = new Path(submitJobDir, "job.jar");
301 Path submitSplitFile = new Path(submitJobDir, "job.split");
302
303 // set the timestamps of the archives and files
304 URI[] tarchives = DistributedCache.getCacheArchives(job);
305 if (tarchives != null) {
306 StringBuffer archiveTimestamps =
307 new StringBuffer(String.valueOf(DistributedCache.getTimestamp(job, tarchives[0])));
308 for (int i = 1; i < tarchives.length; i++) {
309 archiveTimestamps.append(",");
310 archiveTimestamps.append(String.valueOf(DistributedCache.getTimestamp(job, tarchives[i])));
311 }
312 DistributedCache.setArchiveTimestamps(job, archiveTimestamps.toString());
313 }
314
315 URI[] tfiles = DistributedCache.getCacheFiles(job);
316 if (tfiles != null) {
317 StringBuffer fileTimestamps =
318 new StringBuffer(String.valueOf(DistributedCache.getTimestamp(job, tfiles[0])));
319 for (int i = 1; i < tfiles.length; i++) {
320 fileTimestamps.append(",");
321 fileTimestamps.append(String.valueOf(DistributedCache.getTimestamp(job, tfiles[i])));
322 }
323 DistributedCache.setFileTimestamps(job, fileTimestamps.toString());
324 }
325
326 String originalJarPath = job.getJar();
327 short replication = (short)job.getInt("mapred.submit.replication", 10);
328
329 if (originalJarPath != null) { // copy jar to JobTracker's fs
330 // use jar name if job is not named.
331 if ("".equals(job.getJobName())){
332 job.setJobName(new Path(originalJarPath).getName());
333 }
334 job.setJar(submitJarFile.toString());
335 fs.copyFromLocalFile(new Path(originalJarPath), submitJarFile);
336 fs.setReplication(submitJarFile, replication);
337 }
338
339 // Set the user's name and working directory
340 String user = System.getProperty("user.name");
341 job.setUser(user != null ? user : "Dr Who");
342 if (job.getWorkingDirectory() == null) {
343 job.setWorkingDirectory(fs.getWorkingDirectory());
344 }
345
346 // Check the input specification
347 job.getInputFormat().validateInput(job);
348
349 // Check the output specification
350 job.getOutputFormat().checkOutputSpecs(fs, job);
351
352 // Create the splits for the job
353 LOG.debug("Creating splits at " + fs.makeQualified(submitSplitFile));
354 InputSplit[] splits =
355 job.getInputFormat().getSplits(job, job.getNumMapTasks());
356 // sort the splits into order based on size, so that the biggest
357 // go first
358 Arrays.sort(splits, new Comparator<InputSplit>() {
359 public int compare(InputSplit a, InputSplit b) {
360 try {
361 long left = a.getLength();
362 long right = b.getLength();
363 if (left == right) {
364 return 0;
365 } else if (left < right) {
366 return 1;
367 } else {
368 return -1;
369 }
370 } catch (IOException ie) {
371 throw new RuntimeException("Problem getting input split size",
372 ie);
373 }
374 }
375 });
376 // write the splits to a file for the job tracker
377 FSDataOutputStream out = fs.create(submitSplitFile);
378 try {
379 writeSplitsFile(splits, out);
380 } finally {
381 out.close();
382 }
383 job.set("mapred.job.split.file", submitSplitFile.toString());
384 job.setNumMapTasks(splits.length);
385
386 // Write job file to JobTracker's fs
387 out = fs.create(submitJobFile, replication);
388 try {
389 job.write(out);
390 } finally {
391 out.close();
392 }
393
394 //
395 // Now, actually submit the job (using the submit name)
396 //
397 JobStatus status = jobSubmitClient.submitJob(jobId);
398 if (status != null) {
399 return new NetworkedJob(status);
400 } else {
401 throw new IOException("Could not launch job");
402 }
403 }
404
405 static class RawSplit implements Writable {
406 private String splitClass;
407 private BytesWritable bytes = new BytesWritable();
408 private String[] locations;
409
410 public void setBytes(byte[] data, int offset, int length) {
411 bytes.set(data, offset, length);
412 }
413
414 public void setClassName(String className) {
415 splitClass = className;
416 }
417
418 public String getClassName() {
419 return splitClass;
420 }
421
422 public BytesWritable getBytes() {
423 return bytes;
424 }
425
426 public void setLocations(String[] locations) {
427 this.locations = locations;
428 }
429
430 public String[] getLocations() {
431 return locations;
432 }
433
434 public void readFields(DataInput in) throws IOException {
435 splitClass = Text.readString(in);
436 bytes.readFields(in);
437 int len = WritableUtils.readVInt(in);
438 locations = new String[len];
439 for(int i=0; i < len; ++i) {
440 locations[i] = Text.readString(in);
441 }
442 }
443
444 public void write(DataOutput out) throws IOException {
445 Text.writeString(out, splitClass);
446 bytes.write(out);
447 WritableUtils.writeVInt(out, locations.length);
448 for(int i = 0; i < locations.length; i++) {
449 Text.writeString(out, locations[i]);
450 }
451 }
452 }
453
454 private static final int CURRENT_SPLIT_FILE_VERSION = 0;
455 private static final byte[] SPLIT_FILE_HEADER = "SPL".getBytes();
456
457 /** Create the list of input splits and write them out in a file for
458 *the JobTracker. The format is:
459 * <format version>
460 * <numSplits>
461 * for each split:
462 * <RawSplit>
463 * @param splits the input splits to write out
464 * @param out the stream to write to
465 */
466 private void writeSplitsFile(InputSplit[] splits, FSDataOutputStream out) throws IOException {
467 out.write(SPLIT_FILE_HEADER);
468 WritableUtils.writeVInt(out, CURRENT_SPLIT_FILE_VERSION);
469 WritableUtils.writeVInt(out, splits.length);
470 DataOutputBuffer buffer = new DataOutputBuffer();
471 RawSplit rawSplit = new RawSplit();
472 for(InputSplit split: splits) {
473 rawSplit.setClassName(split.getClass().getName());
474 buffer.reset();
475 split.write(buffer);
476 rawSplit.setBytes(buffer.getData(), 0, buffer.getLength());
477 rawSplit.setLocations(split.getLocations());
478 rawSplit.write(out);
479 }
480 }
481
482 /**
483 * Read a splits file into a list of raw splits
484 * @param in the stream to read from
485 * @return the complete list of splits
486 * @throws IOException
487 */
488 static RawSplit[] readSplitFile(DataInput in) throws IOException {
489 byte[] header = new byte[SPLIT_FILE_HEADER.length];
490 in.readFully(header);
491 if (!Arrays.equals(SPLIT_FILE_HEADER, header)) {
492 throw new IOException("Invalid header on split file");
493 }
494 int vers = WritableUtils.readVInt(in);
495 if (vers != CURRENT_SPLIT_FILE_VERSION) {
496 throw new IOException("Unsupported split version " + vers);
497 }
498 int len = WritableUtils.readVInt(in);
499 RawSplit[] result = new RawSplit[len];
500 for(int i=0; i < len; ++i) {
501 result[i] = new RawSplit();
502 result[i].readFields(in);
503 }
504 return result;
505 }
506
507 /**
508 * Get an RunningJob object to track an ongoing job. Returns
509 * null if the id does not correspond to any known job.
510 */
511 public RunningJob getJob(String jobid) throws IOException {
512 JobStatus status = jobSubmitClient.getJobStatus(jobid);
513 if (status != null) {
514 return new NetworkedJob(status);
515 } else {
516 return null;
517 }
518 }
519
520 /**
521 * Get the information of the current state of the map tasks of a job.
522 * @param jobId the job to query
523 * @return the list of all of the map tips
524 */
525 public TaskReport[] getMapTaskReports(String jobId) throws IOException {
526 return jobSubmitClient.getMapTaskReports(jobId);
527 }
528
529 /**
530 * Get the information of the current state of the reduce tasks of a job.
531 * @param jobId the job to query
532 * @return the list of all of the map tips
533 */
534 public TaskReport[] getReduceTaskReports(String jobId) throws IOException {
535 return jobSubmitClient.getReduceTaskReports(jobId);
536 }
537
538 public ClusterStatus getClusterStatus() throws IOException {
539 return jobSubmitClient.getClusterStatus();
540 }
541
542 public JobStatus[] jobsToComplete() throws IOException {
543 return jobSubmitClient.jobsToComplete();
544 }
545
546 /** Utility that submits a job, then polls for progress until the job is
547 * complete. */
548 public static RunningJob runJob(JobConf job) throws IOException {
549 JobClient jc = new JobClient(job);
550 boolean error = true;
551 RunningJob running = null;
552 String lastReport = null;
553 final int MAX_RETRIES = 5;
554 int retries = MAX_RETRIES;
555 TaskStatusFilter filter;
556 try {
557 filter = getTaskOutputFilter(job);
558 } catch(IllegalArgumentException e) {
559 LOG.warn("Invalid Output filter : " + e.getMessage() +
560 " Valid values are : NONE, FAILED, SUCCEEDED, ALL");
561 throw e;
562 }
563 try {
564 running = jc.submitJob(job);
565 String jobId = running.getJobID();
566 LOG.info("Running job: " + jobId);
567 int eventCounter = 0;
568
569 while (true) {
570 try {
571 Thread.sleep(1000);
572 } catch (InterruptedException e) {}
573 try {
574 if (running.isComplete()) {
575 break;
576 }
577 running = jc.getJob(jobId);
578 String report =
579 (" map " + StringUtils.formatPercent(running.mapProgress(), 0)+
580 " reduce " +
581 StringUtils.formatPercent(running.reduceProgress(), 0));
582 if (!report.equals(lastReport)) {
583 LOG.info(report);
584 lastReport = report;
585 }
586
587 if (filter != TaskStatusFilter.NONE){
588 TaskCompletionEvent[] events =
589 running.getTaskCompletionEvents(eventCounter);
590 eventCounter += events.length;
591 for(TaskCompletionEvent event : events){
592 switch(filter){
593 case SUCCEEDED:
594 if (event.getTaskStatus() ==
595 TaskCompletionEvent.Status.SUCCEEDED){
596 LOG.info(event.toString());
597 displayTaskLogs(event.getTaskId(), event.getTaskTrackerHttp());
598 }
599 break;
600 case FAILED:
601 if (event.getTaskStatus() ==
602 TaskCompletionEvent.Status.FAILED){
603 LOG.info(event.toString());
604 displayTaskLogs(event.getTaskId(), event.getTaskTrackerHttp());
605 }
606 break;
607 case KILLED:
608 if (event.getTaskStatus() == TaskCompletionEvent.Status.KILLED){
609 LOG.info(event.toString());
610 }
611 break;
612 case ALL:
613 LOG.info(event.toString());
614 displayTaskLogs(event.getTaskId(), event.getTaskTrackerHttp());
615 break;
616 }
617 }
618 }
619 retries = MAX_RETRIES;
620 } catch (IOException ie) {
621 if (--retries == 0) {
622 LOG.warn("Final attempt failed, killing job.");
623 throw ie;
624 }
625 LOG.info("Communication problem with server: " +
626 StringUtils.stringifyException(ie));
627 }
628 }
629 if (!running.isSuccessful()) {
630 throw new IOException("Job failed!");
631 }
632 LOG.info("Job complete: " + jobId);
633 running.getCounters().log(LOG);
634 error = false;
635 } finally {
636 if (error && (running != null)) {
637 running.killJob();
638 }
639 jc.close();
640 }
641 return running;
642 }
643
644 private static void displayTaskLogs(String taskId, String baseUrl)
645 throws IOException {
646 // The tasktracker for a 'failed/killed' job might not be around...
647 if (baseUrl != null) {
648 // Copy tasks's stdout of the JobClient
649 getTaskLogs(taskId, new URL(baseUrl+"&filter=stdout"), System.out);
650
651 // Copy task's stderr to stderr of the JobClient
652 getTaskLogs(taskId, new URL(baseUrl+"&filter=stderr"), System.err);
653 }
654 }
655
656 private static void getTaskLogs(String taskId, URL taskLogUrl,
657 OutputStream out) {
658 try {
659 URLConnection connection = taskLogUrl.openConnection();
660 BufferedReader input =
661 new BufferedReader(new InputStreamReader(connection.getInputStream()));
662 BufferedWriter output =
663 new BufferedWriter(new OutputStreamWriter(out));
664 try {
665 String logData = null;
666 while ((logData = input.readLine()) != null) {
667 if (logData.length() > 0) {
668 output.write(taskId + ": " + logData + "\n");
669 output.flush();
670 }
671 }
672 } finally {
673 input.close();
674 }
675 }catch(IOException ioe){
676 LOG.warn("Error reading task output" + ioe.getMessage());
677 }
678 }
679
680 static Configuration getConfiguration(String jobTrackerSpec)
681 {
682 Configuration conf = new Configuration();
683 if (jobTrackerSpec != null) {
684 if (jobTrackerSpec.indexOf(":") >= 0) {
685 conf.set("mapred.job.tracker", jobTrackerSpec);
686 } else {
687 String classpathFile = "hadoop-" + jobTrackerSpec + ".xml";
688 URL validate = conf.getResource(classpathFile);
689 if (validate == null) {
690 throw new RuntimeException(classpathFile + " not found on CLASSPATH");
691 }
692 conf.addFinalResource(classpathFile);
693 }
694 }
695 return conf;
696 }
697
698 /**
699 * Sets the output filter for tasks. only those tasks are printed whose
700 * output matches the filter.
701 * @param newValue task filter.
702 */
703 @Deprecated
704 public void setTaskOutputFilter(TaskStatusFilter newValue){
705 this.taskOutputFilter = newValue;
706 }
707
708 /**
709 * Get the task output filter out of the JobConf
710 * @param job the JobConf to examine
711 * @return the filter level
712 */
713 public static TaskStatusFilter getTaskOutputFilter(JobConf job) {
714 return TaskStatusFilter.valueOf(job.get("jobclient.output.filter",
715 "FAILED"));
716 }
717
718 /**
719 * Modify the JobConf to set the task output filter
720 * @param job the JobConf to modify
721 * @param newValue the value to set
722 */
723 public static void setTaskOutputFilter(JobConf job,
724 TaskStatusFilter newValue) {
725 job.set("jobclient.output.filter", newValue.toString());
726 }
727
728 /**
729 * Returns task output filter.
730 * @return task filter.
731 */
732 @Deprecated
733 public TaskStatusFilter getTaskOutputFilter(){
734 return this.taskOutputFilter;
735 }
736
737 public int run(String[] argv) throws Exception {
738 if (argv.length < 2) {
739 String cmd = "JobClient -submit <job> | -status <id> |" +
740 " -events <id> |" +
741 " -kill <id> [-jt <jobtracker:port>|<config>]";
742 System.out.println(cmd);
743 throw new RuntimeException("JobClient:" + cmd);
744 }
745
746 // Process args
747 String submitJobFile = null;
748 String jobid = null;
749 boolean getStatus = false;
750 boolean killJob = false;
751
752 for (int i = 0; i < argv.length; i++) {
753 if ("-submit".equals(argv[i])) {
754 submitJobFile = argv[i+1];
755 i++;
756 } else if ("-status".equals(argv[i])) {
757 jobid = argv[i+1];
758 getStatus = true;
759 i++;
760 } else if ("-kill".equals(argv[i])) {
761 jobid = argv[i+1];
762 killJob = true;
763 i++;
764 } else if ("-events".equals(argv[i])) {
765 listEvents(argv[i+1], Integer.parseInt(argv[i+2]),
766 Integer.parseInt(argv[i+3]));
767 i += 3;
768 }
769 }
770
771 // initialize JobClient
772 JobConf conf = null;
773 if (submitJobFile != null) {
774 conf = new JobConf(submitJobFile);
775 } else {
776 conf = new JobConf(getConf());
777 }
778 init(conf);
779
780 // Submit the request
781 int exitCode = -1;
782 try {
783 if (submitJobFile != null) {
784 RunningJob job = submitJob(conf);
785 System.out.println("Created job " + job.getJobID());
786 } else if (getStatus) {
787 RunningJob job = getJob(jobid);
788 if (job == null) {
789 System.out.println("Could not find job " + jobid);
790 } else {
791 System.out.println();
792 System.out.println(job);
793 exitCode = 0;
794 }
795 } else if (killJob) {
796 RunningJob job = getJob(jobid);
797 if (job == null) {
798 System.out.println("Could not find job " + jobid);
799 } else {
800 job.killJob();
801 System.out.println("Killed job " + jobid);
802 exitCode = 0;
803 }
804 }
805 } finally {
806 close();
807 }
808 return exitCode;
809 }
810
811 /**
812 * List the events for the given job
813 * @param jobId the job id for the job's events to list
814 * @throws IOException
815 */
816 private void listEvents(String jobId, int fromEventId, int numEvents)
817 throws IOException {
818 TaskCompletionEvent[] events =
819 jobSubmitClient.getTaskCompletionEvents(jobId, fromEventId, numEvents);
820 System.out.println("Task completion events for " + jobId);
821 System.out.println("Number of events (from " + fromEventId +
822 ") are: " + events.length);
823 for(TaskCompletionEvent event: events) {
824 System.out.println(event.getTaskStatus() + " " + event.getTaskId() +
825 " " + event.getTaskTrackerHttp());
826 }
827 }
828
829 /**
830 */
831 public static void main(String argv[]) throws Exception {
832 int res = new JobClient().doMain(new Configuration(), argv);
833 System.exit(res);
834 }
835 }
836