Save This Page
Home » hadoop-0.14.4 » org.apache » hadoop » mapred » [javadoc | source]
    1   /**
    2    * Licensed to the Apache Software Foundation (ASF) under one
    3    * or more contributor license agreements.  See the NOTICE file
    4    * distributed with this work for additional information
    5    * regarding copyright ownership.  The ASF licenses this file
    6    * to you under the Apache License, Version 2.0 (the
    7    * "License"); you may not use this file except in compliance
    8    * with the License.  You may obtain a copy of the License at
    9    *
   10    *     http://www.apache.org/licenses/LICENSE-2.0
   11    *
   12    * Unless required by applicable law or agreed to in writing, software
   13    * distributed under the License is distributed on an "AS IS" BASIS,
   14    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   15    * See the License for the specific language governing permissions and
   16    * limitations under the License.
   17    */
   18   package org.apache.hadoop.mapred;
   19   
   20   import org.apache.commons.logging;
   21   
   22   import org.apache.hadoop.fs;
   23   import org.apache.hadoop.io;
   24   import org.apache.hadoop.io.retry;
   25   import org.apache.hadoop.ipc;
   26   import org.apache.hadoop.conf;
   27   import org.apache.hadoop.util;
   28   import org.apache.hadoop.filecache;
   29   import java.io;
   30   import java.net;
   31   import java.util;
   32   
   33   /*******************************************************
   34    * JobClient interacts with the JobTracker network interface.
   35    * This object implements the job-control interface, and
   36    * should be the primary method by which user programs interact
   37    * with the networked job system.
   38    *
   39    *******************************************************/
   40   public class JobClient extends ToolBase implements MRConstants  {
   41     private static final Log LOG = LogFactory.getLog("org.apache.hadoop.mapred.JobClient");
   42     public static enum TaskStatusFilter { NONE, KILLED, FAILED, SUCCEEDED, ALL }
   43     private TaskStatusFilter taskOutputFilter = TaskStatusFilter.FAILED; 
   44   
   45     static long MAX_JOBPROFILE_AGE = 1000 * 2;
   46   
   47     /**
   48      * A NetworkedJob is an implementation of RunningJob.  It holds
   49      * a JobProfile object to provide some info, and interacts with the
   50      * remote service to provide certain functionality.
   51      */
   52     class NetworkedJob implements RunningJob {
   53       JobProfile profile;
   54       JobStatus status;
   55       long statustime;
   56   
   57       /**
   58        * We store a JobProfile and a timestamp for when we last
   59        * acquired the job profile.  If the job is null, then we cannot
   60        * perform any of the tasks.  The job might be null if the JobTracker
   61        * has completely forgotten about the job.  (eg, 24 hours after the
   62        * job completes.)
   63        */
   64       public NetworkedJob(JobStatus job) throws IOException {
   65         this.status = job;
   66         this.profile = jobSubmitClient.getJobProfile(job.getJobId());
   67         this.statustime = System.currentTimeMillis();
   68       }
   69   
   70       /**
   71        * Some methods rely on having a recent job profile object.  Refresh
   72        * it, if necessary
   73        */
   74       synchronized void ensureFreshStatus() throws IOException {
   75         if (System.currentTimeMillis() - statustime > MAX_JOBPROFILE_AGE) {
   76           this.status = jobSubmitClient.getJobStatus(profile.getJobId());
   77           this.statustime = System.currentTimeMillis();
   78         }
   79       }
   80   
   81       /**
   82        * An identifier for the job
   83        */
   84       public String getJobID() {
   85         return profile.getJobId();
   86       }
   87       
   88       /**
   89        * The user-specified job name
   90        */
   91       public String getJobName() {
   92         return profile.getJobName();
   93       }
   94   
   95       /**
   96        * The name of the job file
   97        */
   98       public String getJobFile() {
   99         return profile.getJobFile();
  100       }
  101   
  102       /**
  103        * A URL where the job's status can be seen
  104        */
  105       public String getTrackingURL() {
  106         return profile.getURL().toString();
  107       }
  108   
  109       /**
  110        * A float between 0.0 and 1.0, indicating the % of map work
  111        * completed.
  112        */
  113       public float mapProgress() throws IOException {
  114         ensureFreshStatus();
  115         return status.mapProgress();
  116       }
  117   
  118       /**
  119        * A float between 0.0 and 1.0, indicating the % of reduce work
  120        * completed.
  121        */
  122       public float reduceProgress() throws IOException {
  123         ensureFreshStatus();
  124         return status.reduceProgress();
  125       }
  126   
  127       /**
  128        * Returns immediately whether the whole job is done yet or not.
  129        */
  130       public synchronized boolean isComplete() throws IOException {
  131         ensureFreshStatus();
  132         return (status.getRunState() == JobStatus.SUCCEEDED ||
  133                 status.getRunState() == JobStatus.FAILED);
  134       }
  135   
  136       /**
  137        * True iff job completed successfully.
  138        */
  139       public synchronized boolean isSuccessful() throws IOException {
  140         ensureFreshStatus();
  141         return status.getRunState() == JobStatus.SUCCEEDED;
  142       }
  143   
  144       /**
  145        * Blocks until the job is finished
  146        */
  147       public void waitForCompletion() throws IOException {
  148         while (!isComplete()) {
  149           try {
  150             Thread.sleep(5000);
  151           } catch (InterruptedException ie) {
  152           }
  153         }
  154       }
  155   
  156       /**
  157        * Tells the service to terminate the current job.
  158        */
  159       public synchronized void killJob() throws IOException {
  160         jobSubmitClient.killJob(getJobID());
  161       }
  162       /**
  163        * Fetch task completion events from jobtracker for this job. 
  164        */
  165       public synchronized TaskCompletionEvent[] getTaskCompletionEvents(
  166                                                                         int startFrom) throws IOException{
  167         return jobSubmitClient.getTaskCompletionEvents(
  168                                                        getJobID(), startFrom, 10); 
  169       }
  170   
  171       /**
  172        * Dump stats to screen
  173        */
  174       public String toString() {
  175         try {
  176           ensureFreshStatus();
  177         } catch (IOException e) {
  178         }
  179         return "Job: " + profile.getJobId() + "\n" + 
  180           "file: " + profile.getJobFile() + "\n" + 
  181           "tracking URL: " + profile.getURL() + "\n" + 
  182           "map() completion: " + status.mapProgress() + "\n" + 
  183           "reduce() completion: " + status.reduceProgress();
  184       }
  185           
  186       /**
  187        * Returns the counters for this job
  188        */
  189       public Counters getCounters() throws IOException {
  190         return jobSubmitClient.getJobCounters(getJobID());
  191       }
  192     }
  193   
  194     JobSubmissionProtocol jobSubmitClient;
  195     FileSystem fs = null;
  196   
  197     static Random r = new Random();
  198   
  199     /**
  200      * Build a job client, connect to the default job tracker
  201      */
  202     public JobClient() {
  203     }
  204       
  205     public JobClient(JobConf conf) throws IOException {
  206       setConf(conf);
  207       init(conf);
  208     }
  209       
  210     public void init(JobConf conf) throws IOException {
  211       String tracker = conf.get("mapred.job.tracker", "local");
  212       if ("local".equals(tracker)) {
  213         this.jobSubmitClient = new LocalJobRunner(conf);
  214       } else {
  215         this.jobSubmitClient = createProxy(JobTracker.getAddress(conf), conf);
  216       }        
  217     }
  218   
  219     /**
  220      * Create a proxy JobSubmissionProtocol that retries timeouts.
  221      * @param addr the address to connect to
  222      * @param conf the server's configuration
  223      * @return a proxy object that will retry timeouts
  224      * @throws IOException
  225      */
  226     private JobSubmissionProtocol createProxy(InetSocketAddress addr,
  227                                               Configuration conf
  228                                               ) throws IOException {
  229       JobSubmissionProtocol raw = (JobSubmissionProtocol) 
  230         RPC.getProxy(JobSubmissionProtocol.class,
  231                      JobSubmissionProtocol.versionID, addr, conf);
  232       RetryPolicy backoffPolicy =
  233         RetryPolicies.retryUpToMaximumCountWithProportionalSleep
  234         (5, 10, java.util.concurrent.TimeUnit.SECONDS);
  235       Map<Class<? extends Exception>, RetryPolicy> handlers = 
  236         new HashMap<Class<? extends Exception>, RetryPolicy>();
  237       handlers.put(SocketTimeoutException.class, backoffPolicy);
  238       RetryPolicy backoffTimeOuts = 
  239         RetryPolicies.retryByException(RetryPolicies.TRY_ONCE_THEN_FAIL,handlers);
  240       return (JobSubmissionProtocol)
  241         RetryProxy.create(JobSubmissionProtocol.class, raw, backoffTimeOuts);
  242     }
  243   
  244     /**
  245      * Build a job client, connect to the indicated job tracker.
  246      */
  247     public JobClient(InetSocketAddress jobTrackAddr, 
  248                      Configuration conf) throws IOException {
  249       jobSubmitClient = createProxy(jobTrackAddr, conf);
  250     }
  251   
  252     /**
  253      */
  254     public synchronized void close() throws IOException {
  255     }
  256   
  257     /**
  258      * Get a filesystem handle.  We need this to prepare jobs
  259      * for submission to the MapReduce system.
  260      */
  261     public synchronized FileSystem getFs() throws IOException {
  262       if (this.fs == null) {
  263         String fsName = jobSubmitClient.getFilesystemName();
  264         this.fs = FileSystem.getNamed(fsName, this.conf);
  265       }
  266       return fs;
  267     }
  268   
  269     /**
  270      * Submit a job to the MR system
  271      */
  272     public RunningJob submitJob(String jobFile) throws FileNotFoundException, 
  273                                                        InvalidJobConfException, IOException {
  274       // Load in the submitted job details
  275       JobConf job = new JobConf(jobFile);
  276       return submitJob(job);
  277     }
  278       
  279      
  280     /**
  281      * Submit a job to the MR system
  282      */
  283     public RunningJob submitJob(JobConf job) throws FileNotFoundException, 
  284                                                     InvalidJobConfException, IOException {
  285       //
  286       // First figure out what fs the JobTracker is using.  Copy the
  287       // job to it, under a temporary name.  This allows DFS to work,
  288       // and under the local fs also provides UNIX-like object loading 
  289       // semantics.  (that is, if the job file is deleted right after
  290       // submission, we can still run the submission to completion)
  291       //
  292   
  293       // Create a number of filenames in the JobTracker's fs namespace
  294       String jobId = jobSubmitClient.getNewJobId();
  295       Path submitJobDir = new Path(job.getSystemDir(), jobId);
  296       FileSystem fs = getFs();
  297       LOG.debug("default FileSystem: " + fs.getUri());
  298       fs.delete(submitJobDir);    
  299       Path submitJobFile = new Path(submitJobDir, "job.xml");
  300       Path submitJarFile = new Path(submitJobDir, "job.jar");
  301       Path submitSplitFile = new Path(submitJobDir, "job.split");
  302           
  303       // set the timestamps of the archives and files
  304       URI[] tarchives = DistributedCache.getCacheArchives(job);
  305       if (tarchives != null) {
  306         StringBuffer archiveTimestamps = 
  307           new StringBuffer(String.valueOf(DistributedCache.getTimestamp(job, tarchives[0])));
  308         for (int i = 1; i < tarchives.length; i++) {
  309           archiveTimestamps.append(",");
  310           archiveTimestamps.append(String.valueOf(DistributedCache.getTimestamp(job, tarchives[i])));
  311         }
  312         DistributedCache.setArchiveTimestamps(job, archiveTimestamps.toString());
  313       }
  314   
  315       URI[] tfiles = DistributedCache.getCacheFiles(job);
  316       if (tfiles != null) {
  317         StringBuffer fileTimestamps = 
  318           new StringBuffer(String.valueOf(DistributedCache.getTimestamp(job, tfiles[0])));
  319         for (int i = 1; i < tfiles.length; i++) {
  320           fileTimestamps.append(",");
  321           fileTimestamps.append(String.valueOf(DistributedCache.getTimestamp(job, tfiles[i])));
  322         }
  323         DistributedCache.setFileTimestamps(job, fileTimestamps.toString());
  324       }
  325          
  326       String originalJarPath = job.getJar();
  327       short replication = (short)job.getInt("mapred.submit.replication", 10);
  328   
  329       if (originalJarPath != null) {           // copy jar to JobTracker's fs
  330         // use jar name if job is not named. 
  331         if ("".equals(job.getJobName())){
  332           job.setJobName(new Path(originalJarPath).getName());
  333         }
  334         job.setJar(submitJarFile.toString());
  335         fs.copyFromLocalFile(new Path(originalJarPath), submitJarFile);
  336         fs.setReplication(submitJarFile, replication);
  337       }
  338   
  339       // Set the user's name and working directory
  340       String user = System.getProperty("user.name");
  341       job.setUser(user != null ? user : "Dr Who");
  342       if (job.getWorkingDirectory() == null) {
  343         job.setWorkingDirectory(fs.getWorkingDirectory());          
  344       }
  345   
  346       // Check the input specification 
  347       job.getInputFormat().validateInput(job);
  348   
  349       // Check the output specification
  350       job.getOutputFormat().checkOutputSpecs(fs, job);
  351   
  352       // Create the splits for the job
  353       LOG.debug("Creating splits at " + fs.makeQualified(submitSplitFile));
  354       InputSplit[] splits = 
  355         job.getInputFormat().getSplits(job, job.getNumMapTasks());
  356       // sort the splits into order based on size, so that the biggest
  357       // go first
  358       Arrays.sort(splits, new Comparator<InputSplit>() {
  359         public int compare(InputSplit a, InputSplit b) {
  360           try {
  361             long left = a.getLength();
  362             long right = b.getLength();
  363             if (left == right) {
  364               return 0;
  365             } else if (left < right) {
  366               return 1;
  367             } else {
  368               return -1;
  369             }
  370           } catch (IOException ie) {
  371             throw new RuntimeException("Problem getting input split size",
  372                                        ie);
  373           }
  374         }
  375       });
  376       // write the splits to a file for the job tracker
  377       FSDataOutputStream out = fs.create(submitSplitFile);
  378       try {
  379         writeSplitsFile(splits, out);
  380       } finally {
  381         out.close();
  382       }
  383       job.set("mapred.job.split.file", submitSplitFile.toString());
  384       job.setNumMapTasks(splits.length);
  385           
  386       // Write job file to JobTracker's fs        
  387       out = fs.create(submitJobFile, replication);
  388       try {
  389         job.write(out);
  390       } finally {
  391         out.close();
  392       }
  393   
  394       //
  395       // Now, actually submit the job (using the submit name)
  396       //
  397       JobStatus status = jobSubmitClient.submitJob(jobId);
  398       if (status != null) {
  399         return new NetworkedJob(status);
  400       } else {
  401         throw new IOException("Could not launch job");
  402       }
  403     }
  404   
  405     static class RawSplit implements Writable {
  406       private String splitClass;
  407       private BytesWritable bytes = new BytesWritable();
  408       private String[] locations;
  409         
  410       public void setBytes(byte[] data, int offset, int length) {
  411         bytes.set(data, offset, length);
  412       }
  413   
  414       public void setClassName(String className) {
  415         splitClass = className;
  416       }
  417         
  418       public String getClassName() {
  419         return splitClass;
  420       }
  421         
  422       public BytesWritable getBytes() {
  423         return bytes;
  424       }
  425         
  426       public void setLocations(String[] locations) {
  427         this.locations = locations;
  428       }
  429         
  430       public String[] getLocations() {
  431         return locations;
  432       }
  433         
  434       public void readFields(DataInput in) throws IOException {
  435         splitClass = Text.readString(in);
  436         bytes.readFields(in);
  437         int len = WritableUtils.readVInt(in);
  438         locations = new String[len];
  439         for(int i=0; i < len; ++i) {
  440           locations[i] = Text.readString(in);
  441         }
  442       }
  443         
  444       public void write(DataOutput out) throws IOException {
  445         Text.writeString(out, splitClass);
  446         bytes.write(out);
  447         WritableUtils.writeVInt(out, locations.length);
  448         for(int i = 0; i < locations.length; i++) {
  449           Text.writeString(out, locations[i]);
  450         }        
  451       }
  452     }
  453       
  454     private static final int CURRENT_SPLIT_FILE_VERSION = 0;
  455     private static final byte[] SPLIT_FILE_HEADER = "SPL".getBytes();
  456       
  457     /** Create the list of input splits and write them out in a file for
  458      *the JobTracker. The format is:
  459      * <format version>
  460      * <numSplits>
  461      * for each split:
  462      *    <RawSplit>
  463      * @param splits the input splits to write out
  464      * @param out the stream to write to
  465      */
  466     private void writeSplitsFile(InputSplit[] splits, FSDataOutputStream out) throws IOException {
  467       out.write(SPLIT_FILE_HEADER);
  468       WritableUtils.writeVInt(out, CURRENT_SPLIT_FILE_VERSION);
  469       WritableUtils.writeVInt(out, splits.length);
  470       DataOutputBuffer buffer = new DataOutputBuffer();
  471       RawSplit rawSplit = new RawSplit();
  472       for(InputSplit split: splits) {
  473         rawSplit.setClassName(split.getClass().getName());
  474         buffer.reset();
  475         split.write(buffer);
  476         rawSplit.setBytes(buffer.getData(), 0, buffer.getLength());
  477         rawSplit.setLocations(split.getLocations());
  478         rawSplit.write(out);
  479       }
  480     }
  481   
  482     /**
  483      * Read a splits file into a list of raw splits
  484      * @param in the stream to read from
  485      * @return the complete list of splits
  486      * @throws IOException
  487      */
  488     static RawSplit[] readSplitFile(DataInput in) throws IOException {
  489       byte[] header = new byte[SPLIT_FILE_HEADER.length];
  490       in.readFully(header);
  491       if (!Arrays.equals(SPLIT_FILE_HEADER, header)) {
  492         throw new IOException("Invalid header on split file");
  493       }
  494       int vers = WritableUtils.readVInt(in);
  495       if (vers != CURRENT_SPLIT_FILE_VERSION) {
  496         throw new IOException("Unsupported split version " + vers);
  497       }
  498       int len = WritableUtils.readVInt(in);
  499       RawSplit[] result = new RawSplit[len];
  500       for(int i=0; i < len; ++i) {
  501         result[i] = new RawSplit();
  502         result[i].readFields(in);
  503       }
  504       return result;
  505     }
  506       
  507     /**
  508      * Get an RunningJob object to track an ongoing job.  Returns
  509      * null if the id does not correspond to any known job.
  510      */
  511     public RunningJob getJob(String jobid) throws IOException {
  512       JobStatus status = jobSubmitClient.getJobStatus(jobid);
  513       if (status != null) {
  514         return new NetworkedJob(status);
  515       } else {
  516         return null;
  517       }
  518     }
  519   
  520     /**
  521      * Get the information of the current state of the map tasks of a job.
  522      * @param jobId the job to query
  523      * @return the list of all of the map tips
  524      */
  525     public TaskReport[] getMapTaskReports(String jobId) throws IOException {
  526       return jobSubmitClient.getMapTaskReports(jobId);
  527     }
  528       
  529     /**
  530      * Get the information of the current state of the reduce tasks of a job.
  531      * @param jobId the job to query
  532      * @return the list of all of the map tips
  533      */    
  534     public TaskReport[] getReduceTaskReports(String jobId) throws IOException {
  535       return jobSubmitClient.getReduceTaskReports(jobId);
  536     }
  537       
  538     public ClusterStatus getClusterStatus() throws IOException {
  539       return jobSubmitClient.getClusterStatus();
  540     }
  541       
  542     public JobStatus[] jobsToComplete() throws IOException {
  543       return jobSubmitClient.jobsToComplete();
  544     }
  545       
  546     /** Utility that submits a job, then polls for progress until the job is
  547      * complete. */
  548     public static RunningJob runJob(JobConf job) throws IOException {
  549       JobClient jc = new JobClient(job);
  550       boolean error = true;
  551       RunningJob running = null;
  552       String lastReport = null;
  553       final int MAX_RETRIES = 5;
  554       int retries = MAX_RETRIES;
  555       TaskStatusFilter filter;
  556       try {
  557         filter = getTaskOutputFilter(job);
  558       } catch(IllegalArgumentException e) {
  559         LOG.warn("Invalid Output filter : " + e.getMessage() + 
  560                  " Valid values are : NONE, FAILED, SUCCEEDED, ALL");
  561         throw e;
  562       }
  563       try {
  564         running = jc.submitJob(job);
  565         String jobId = running.getJobID();
  566         LOG.info("Running job: " + jobId);
  567         int eventCounter = 0; 
  568           
  569         while (true) {
  570           try {
  571             Thread.sleep(1000);
  572           } catch (InterruptedException e) {}
  573           try {
  574             if (running.isComplete()) {
  575               break;
  576             }
  577             running = jc.getJob(jobId);
  578             String report = 
  579               (" map " + StringUtils.formatPercent(running.mapProgress(), 0)+
  580                " reduce " + 
  581                StringUtils.formatPercent(running.reduceProgress(), 0));
  582             if (!report.equals(lastReport)) {
  583               LOG.info(report);
  584               lastReport = report;
  585             }
  586               
  587             if (filter  != TaskStatusFilter.NONE){
  588               TaskCompletionEvent[] events = 
  589                 running.getTaskCompletionEvents(eventCounter); 
  590               eventCounter += events.length;
  591               for(TaskCompletionEvent event : events){
  592                 switch(filter){
  593                 case SUCCEEDED:
  594                   if (event.getTaskStatus() == 
  595                       TaskCompletionEvent.Status.SUCCEEDED){
  596                     LOG.info(event.toString());
  597                     displayTaskLogs(event.getTaskId(), event.getTaskTrackerHttp());
  598                   }
  599                   break; 
  600                 case FAILED:
  601                   if (event.getTaskStatus() == 
  602                       TaskCompletionEvent.Status.FAILED){
  603                     LOG.info(event.toString());
  604                     displayTaskLogs(event.getTaskId(), event.getTaskTrackerHttp());
  605                   }
  606                   break; 
  607                 case KILLED:
  608                   if (event.getTaskStatus() == TaskCompletionEvent.Status.KILLED){
  609                     LOG.info(event.toString());
  610                   }
  611                   break; 
  612                 case ALL:
  613                   LOG.info(event.toString());
  614                   displayTaskLogs(event.getTaskId(), event.getTaskTrackerHttp());
  615                   break;
  616                 }
  617               }
  618             }
  619             retries = MAX_RETRIES;
  620           } catch (IOException ie) {
  621             if (--retries == 0) {
  622               LOG.warn("Final attempt failed, killing job.");
  623               throw ie;
  624             }
  625             LOG.info("Communication problem with server: " +
  626                      StringUtils.stringifyException(ie));
  627           }
  628         }
  629         if (!running.isSuccessful()) {
  630           throw new IOException("Job failed!");
  631         }
  632         LOG.info("Job complete: " + jobId);
  633         running.getCounters().log(LOG);
  634         error = false;
  635       } finally {
  636         if (error && (running != null)) {
  637           running.killJob();
  638         }
  639         jc.close();
  640       }
  641       return running;
  642     }
  643   
  644     private static void displayTaskLogs(String taskId, String baseUrl)
  645       throws IOException {
  646       // The tasktracker for a 'failed/killed' job might not be around...
  647       if (baseUrl != null) {
  648         // Copy tasks's stdout of the JobClient
  649         getTaskLogs(taskId, new URL(baseUrl+"&filter=stdout"), System.out);
  650           
  651         // Copy task's stderr to stderr of the JobClient 
  652         getTaskLogs(taskId, new URL(baseUrl+"&filter=stderr"), System.err);
  653       }
  654     }
  655       
  656     private static void getTaskLogs(String taskId, URL taskLogUrl, 
  657                                     OutputStream out) {
  658       try {
  659         URLConnection connection = taskLogUrl.openConnection();
  660         BufferedReader input = 
  661           new BufferedReader(new InputStreamReader(connection.getInputStream()));
  662         BufferedWriter output = 
  663           new BufferedWriter(new OutputStreamWriter(out));
  664         try {
  665           String logData = null;
  666           while ((logData = input.readLine()) != null) {
  667             if (logData.length() > 0) {
  668               output.write(taskId + ": " + logData + "\n");
  669               output.flush();
  670             }
  671           }
  672         } finally {
  673           input.close();
  674         }
  675       }catch(IOException ioe){
  676         LOG.warn("Error reading task output" + ioe.getMessage()); 
  677       }
  678     }    
  679   
  680     static Configuration getConfiguration(String jobTrackerSpec)
  681     {
  682       Configuration conf = new Configuration();
  683       if (jobTrackerSpec != null) {        
  684         if (jobTrackerSpec.indexOf(":") >= 0) {
  685           conf.set("mapred.job.tracker", jobTrackerSpec);
  686         } else {
  687           String classpathFile = "hadoop-" + jobTrackerSpec + ".xml";
  688           URL validate = conf.getResource(classpathFile);
  689           if (validate == null) {
  690             throw new RuntimeException(classpathFile + " not found on CLASSPATH");
  691           }
  692           conf.addFinalResource(classpathFile);
  693         }
  694       }
  695       return conf;
  696     }
  697   
  698     /**
  699      * Sets the output filter for tasks. only those tasks are printed whose
  700      * output matches the filter. 
  701      * @param newValue task filter.
  702      */
  703     @Deprecated
  704     public void setTaskOutputFilter(TaskStatusFilter newValue){
  705       this.taskOutputFilter = newValue;
  706     }
  707       
  708     /**
  709      * Get the task output filter out of the JobConf
  710      * @param job the JobConf to examine
  711      * @return the filter level
  712      */
  713     public static TaskStatusFilter getTaskOutputFilter(JobConf job) {
  714       return TaskStatusFilter.valueOf(job.get("jobclient.output.filter", 
  715                                               "FAILED"));
  716     }
  717       
  718     /**
  719      * Modify the JobConf to set the task output filter
  720      * @param job the JobConf to modify
  721      * @param newValue the value to set
  722      */
  723     public static void setTaskOutputFilter(JobConf job, 
  724                                            TaskStatusFilter newValue) {
  725       job.set("jobclient.output.filter", newValue.toString());
  726     }
  727       
  728     /**
  729      * Returns task output filter.
  730      * @return task filter. 
  731      */
  732     @Deprecated
  733     public TaskStatusFilter getTaskOutputFilter(){
  734       return this.taskOutputFilter; 
  735     }
  736       
  737     public int run(String[] argv) throws Exception {
  738       if (argv.length < 2) {
  739         String cmd = "JobClient -submit <job> | -status <id> |" + 
  740                      " -events <id> |" +
  741                      " -kill <id> [-jt <jobtracker:port>|<config>]";
  742         System.out.println(cmd);
  743         throw new RuntimeException("JobClient:" + cmd);
  744       }
  745   
  746       // Process args
  747       String submitJobFile = null;
  748       String jobid = null;
  749       boolean getStatus = false;
  750       boolean killJob = false;
  751   
  752       for (int i = 0; i < argv.length; i++) {
  753         if ("-submit".equals(argv[i])) {
  754           submitJobFile = argv[i+1];
  755           i++;
  756         } else if ("-status".equals(argv[i])) {
  757           jobid = argv[i+1];
  758           getStatus = true;
  759           i++;
  760         } else if ("-kill".equals(argv[i])) {
  761           jobid = argv[i+1];
  762           killJob = true;
  763           i++;
  764         } else if ("-events".equals(argv[i])) {
  765           listEvents(argv[i+1], Integer.parseInt(argv[i+2]), 
  766                      Integer.parseInt(argv[i+3]));
  767           i += 3;
  768         }
  769       }
  770   
  771       // initialize JobClient
  772       JobConf conf = null;
  773       if (submitJobFile != null) {
  774         conf = new JobConf(submitJobFile);
  775       } else {
  776         conf = new JobConf(getConf());
  777       }
  778       init(conf);
  779           
  780       // Submit the request
  781       int exitCode = -1;
  782       try {
  783         if (submitJobFile != null) {
  784           RunningJob job = submitJob(conf);
  785           System.out.println("Created job " + job.getJobID());
  786         } else if (getStatus) {
  787           RunningJob job = getJob(jobid);
  788           if (job == null) {
  789             System.out.println("Could not find job " + jobid);
  790           } else {
  791             System.out.println();
  792             System.out.println(job);
  793             exitCode = 0;
  794           }
  795         } else if (killJob) {
  796           RunningJob job = getJob(jobid);
  797           if (job == null) {
  798             System.out.println("Could not find job " + jobid);
  799           } else {
  800             job.killJob();
  801             System.out.println("Killed job " + jobid);
  802             exitCode = 0;
  803           }
  804         }
  805       } finally {
  806         close();
  807       }
  808       return exitCode;
  809     }
  810       
  811     /**
  812      * List the events for the given job
  813      * @param jobId the job id for the job's events to list
  814      * @throws IOException
  815      */
  816     private void listEvents(String jobId, int fromEventId, int numEvents)
  817       throws IOException {
  818       TaskCompletionEvent[] events = 
  819         jobSubmitClient.getTaskCompletionEvents(jobId, fromEventId, numEvents);
  820       System.out.println("Task completion events for " + jobId);
  821       System.out.println("Number of events (from " + fromEventId + 
  822                          ") are: " + events.length);
  823       for(TaskCompletionEvent event: events) {
  824         System.out.println(event.getTaskStatus() + " " + event.getTaskId() + 
  825                            " " + event.getTaskTrackerHttp());
  826       }
  827     }
  828       
  829     /**
  830      */
  831     public static void main(String argv[]) throws Exception {
  832       int res = new JobClient().doMain(new Configuration(), argv);
  833       System.exit(res);
  834     }
  835   }
  836   

Save This Page
Home » hadoop-0.14.4 » org.apache » hadoop » mapred » [javadoc | source]