Save This Page
Home » nutch-1.0 » org.apache.nutch » crawl » [javadoc | source]
    1   /**
    2    * Licensed to the Apache Software Foundation (ASF) under one or more
    3    * contributor license agreements.  See the NOTICE file distributed with
    4    * this work for additional information regarding copyright ownership.
    5    * The ASF licenses this file to You under the Apache License, Version 2.0
    6    * (the "License"); you may not use this file except in compliance with
    7    * the License.  You may obtain a copy of the License at
    8    *
    9    *     http://www.apache.org/licenses/LICENSE-2.0
   10    *
   11    * Unless required by applicable law or agreed to in writing, software
   12    * distributed under the License is distributed on an "AS IS" BASIS,
   13    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   14    * See the License for the specific language governing permissions and
   15    * limitations under the License.
   16    */
   17   
   18   package org.apache.nutch.crawl;
   19   
   20   import java.io;
   21   import java.util;
   22   
   23   // Commons Logging imports
   24   import org.apache.commons.logging.Log;
   25   import org.apache.commons.logging.LogFactory;
   26   
   27   import org.apache.hadoop.io;
   28   import org.apache.hadoop.fs;
   29   import org.apache.hadoop.conf;
   30   import org.apache.hadoop.mapred;
   31   import org.apache.hadoop.util;
   32   
   33   import org.apache.nutch.net;
   34   import org.apache.nutch.scoring.ScoringFilterException;
   35   import org.apache.nutch.scoring.ScoringFilters;
   36   import org.apache.nutch.util.NutchConfiguration;
   37   import org.apache.nutch.util.NutchJob;
   38   
   39   /** This class takes a flat file of URLs and adds them to the of pages to be
   40    * crawled.  Useful for bootstrapping the system. */
   41   public class Injector extends Configured implements Tool {
   42     public static final Log LOG = LogFactory.getLog(Injector.class);
   43   
   44   
   45     /** Normalize and filter injected urls. */
   46     public static class InjectMapper implements Mapper<WritableComparable, Text, Text, CrawlDatum> {
   47       private URLNormalizers urlNormalizers;
   48       private int interval;
   49       private float scoreInjected;
   50       private JobConf jobConf;
   51       private URLFilters filters;
   52       private ScoringFilters scfilters;
   53       private long curTime;
   54   
   55       public void configure(JobConf job) {
   56         this.jobConf = job;
   57         urlNormalizers = new URLNormalizers(job, URLNormalizers.SCOPE_INJECT);
   58         interval = jobConf.getInt("db.fetch.interval.default", 2592000);
   59         filters = new URLFilters(jobConf);
   60         scfilters = new ScoringFilters(jobConf);
   61         scoreInjected = jobConf.getFloat("db.score.injected", 1.0f);
   62         curTime = job.getLong("injector.current.time", System.currentTimeMillis());
   63       }
   64   
   65       public void close() {}
   66   
   67       public void map(WritableComparable key, Text value,
   68                       OutputCollector<Text, CrawlDatum> output, Reporter reporter)
   69         throws IOException {
   70         String url = value.toString();              // value is line of text
   71         try {
   72           url = urlNormalizers.normalize(url, URLNormalizers.SCOPE_INJECT);
   73           url = filters.filter(url);             // filter the url
   74         } catch (Exception e) {
   75           if (LOG.isWarnEnabled()) { LOG.warn("Skipping " +url+":"+e); }
   76           url = null;
   77         }
   78         if (url != null) {                          // if it passes
   79           value.set(url);                           // collect it
   80           CrawlDatum datum = new CrawlDatum(CrawlDatum.STATUS_INJECTED, interval);
   81           datum.setFetchTime(curTime);
   82           datum.setScore(scoreInjected);
   83           try {
   84             scfilters.injectedScore(value, datum);
   85           } catch (ScoringFilterException e) {
   86             if (LOG.isWarnEnabled()) {
   87               LOG.warn("Cannot filter injected score for url " + url +
   88                        ", using default (" + e.getMessage() + ")");
   89             }
   90             datum.setScore(scoreInjected);
   91           }
   92           output.collect(value, datum);
   93         }
   94       }
   95     }
   96   
   97     /** Combine multiple new entries for a url. */
   98     public static class InjectReducer implements Reducer<Text, CrawlDatum, Text, CrawlDatum> {
   99       public void configure(JobConf job) {}    
  100       public void close() {}
  101   
  102       private CrawlDatum old = new CrawlDatum();
  103       private CrawlDatum injected = new CrawlDatum();
  104       
  105       public void reduce(Text key, Iterator<CrawlDatum> values,
  106                          OutputCollector<Text, CrawlDatum> output, Reporter reporter)
  107         throws IOException {
  108         boolean oldSet = false;
  109         while (values.hasNext()) {
  110           CrawlDatum val = values.next();
  111           if (val.getStatus() == CrawlDatum.STATUS_INJECTED) {
  112             injected.set(val);
  113             injected.setStatus(CrawlDatum.STATUS_DB_UNFETCHED);
  114           } else {
  115             old.set(val);
  116             oldSet = true;
  117           }
  118         }
  119         CrawlDatum res = null;
  120         if (oldSet) res = old; // don't overwrite existing value
  121         else res = injected;
  122   
  123         output.collect(key, res);
  124       }
  125     }
  126   
  127     public Injector() {}
  128     
  129     public Injector(Configuration conf) {
  130       setConf(conf);
  131     }
  132     
  133     public void inject(Path crawlDb, Path urlDir) throws IOException {
  134   
  135       if (LOG.isInfoEnabled()) {
  136         LOG.info("Injector: starting");
  137         LOG.info("Injector: crawlDb: " + crawlDb);
  138         LOG.info("Injector: urlDir: " + urlDir);
  139       }
  140   
  141       Path tempDir =
  142         new Path(getConf().get("mapred.temp.dir", ".") +
  143                  "/inject-temp-"+
  144                  Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
  145   
  146       // map text input file to a <url,CrawlDatum> file
  147       if (LOG.isInfoEnabled()) {
  148         LOG.info("Injector: Converting injected urls to crawl db entries.");
  149       }
  150       JobConf sortJob = new NutchJob(getConf());
  151       sortJob.setJobName("inject " + urlDir);
  152       FileInputFormat.addInputPath(sortJob, urlDir);
  153       sortJob.setMapperClass(InjectMapper.class);
  154   
  155       FileOutputFormat.setOutputPath(sortJob, tempDir);
  156       sortJob.setOutputFormat(SequenceFileOutputFormat.class);
  157       sortJob.setOutputKeyClass(Text.class);
  158       sortJob.setOutputValueClass(CrawlDatum.class);
  159       sortJob.setLong("injector.current.time", System.currentTimeMillis());
  160       JobClient.runJob(sortJob);
  161   
  162       // merge with existing crawl db
  163       if (LOG.isInfoEnabled()) {
  164         LOG.info("Injector: Merging injected urls into crawl db.");
  165       }
  166       JobConf mergeJob = CrawlDb.createJob(getConf(), crawlDb);
  167       FileInputFormat.addInputPath(mergeJob, tempDir);
  168       mergeJob.setReducerClass(InjectReducer.class);
  169       JobClient.runJob(mergeJob);
  170       CrawlDb.install(mergeJob, crawlDb);
  171   
  172       // clean up
  173       FileSystem fs = FileSystem.get(getConf());
  174       fs.delete(tempDir, true);
  175       if (LOG.isInfoEnabled()) { LOG.info("Injector: done"); }
  176   
  177     }
  178   
  179     public static void main(String[] args) throws Exception {
  180       int res = ToolRunner.run(NutchConfiguration.create(), new Injector(), args);
  181       System.exit(res);
  182     }
  183     
  184     public int run(String[] args) throws Exception {
  185       if (args.length < 2) {
  186         System.err.println("Usage: Injector <crawldb> <url_dir>");
  187         return -1;
  188       }
  189       try {
  190         inject(new Path(args[0]), new Path(args[1]));
  191         return 0;
  192       } catch (Exception e) {
  193         LOG.fatal("Injector: " + StringUtils.stringifyException(e));
  194         return -1;
  195       }
  196     }
  197   
  198   }

Save This Page
Home » nutch-1.0 » org.apache.nutch » crawl » [javadoc | source]