Save This Page
Home » nutch-1.0 » org.apache.nutch » crawl » [javadoc | source]
    1   /**
    2    * Licensed to the Apache Software Foundation (ASF) under one or more
    3    * contributor license agreements.  See the NOTICE file distributed with
    4    * this work for additional information regarding copyright ownership.
    5    * The ASF licenses this file to You under the Apache License, Version 2.0
    6    * (the "License"); you may not use this file except in compliance with
    7    * the License.  You may obtain a copy of the License at
    8    *
    9    *     http://www.apache.org/licenses/LICENSE-2.0
   10    *
   11    * Unless required by applicable law or agreed to in writing, software
   12    * distributed under the License is distributed on an "AS IS" BASIS,
   13    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   14    * See the License for the specific language governing permissions and
   15    * limitations under the License.
   16    */
   17   
   18   package org.apache.nutch.crawl;
   19   
   20   import java.io;
   21   import java.util;
   22   
   23   // Commons Logging imports
   24   import org.apache.commons.logging.Log;
   25   import org.apache.commons.logging.LogFactory;
   26   
   27   import org.apache.hadoop.io;
   28   import org.apache.hadoop.fs;
   29   import org.apache.hadoop.conf;
   30   import org.apache.hadoop.mapred;
   31   import org.apache.hadoop.util;
   32   
   33   import org.apache.nutch.util.HadoopFSUtil;
   34   import org.apache.nutch.util.LockUtil;
   35   import org.apache.nutch.util.NutchConfiguration;
   36   import org.apache.nutch.util.NutchJob;
   37   
   38   /**
   39    * This class takes the output of the fetcher and updates the
   40    * crawldb accordingly.
   41    */
   42   public class CrawlDb extends Configured implements Tool {
   43     public static final Log LOG = LogFactory.getLog(CrawlDb.class);
   44   
   45     public static final String CRAWLDB_ADDITIONS_ALLOWED = "db.update.additions.allowed";
   46   
   47     public static final String CURRENT_NAME = "current";
   48     
   49     public static final String LOCK_NAME = ".locked";
   50     
   51     public CrawlDb() {}
   52     
   53     public CrawlDb(Configuration conf) {
   54       setConf(conf);
   55     }
   56   
   57     public void update(Path crawlDb, Path[] segments, boolean normalize, boolean filter) throws IOException {
   58       boolean additionsAllowed = getConf().getBoolean(CRAWLDB_ADDITIONS_ALLOWED, true);    
   59       update(crawlDb, segments, normalize, filter, additionsAllowed, false);
   60     }
   61     
   62     public void update(Path crawlDb, Path[] segments, boolean normalize, boolean filter, boolean additionsAllowed, boolean force) throws IOException {
   63       FileSystem fs = FileSystem.get(getConf());
   64       Path lock = new Path(crawlDb, LOCK_NAME);
   65       LockUtil.createLockFile(fs, lock, force);
   66       if (LOG.isInfoEnabled()) {
   67         LOG.info("CrawlDb update: starting");
   68         LOG.info("CrawlDb update: db: " + crawlDb);
   69         LOG.info("CrawlDb update: segments: " + Arrays.asList(segments));
   70         LOG.info("CrawlDb update: additions allowed: " + additionsAllowed);
   71         LOG.info("CrawlDb update: URL normalizing: " + normalize);
   72         LOG.info("CrawlDb update: URL filtering: " + filter);
   73       }
   74   
   75       JobConf job = CrawlDb.createJob(getConf(), crawlDb);
   76       job.setBoolean(CRAWLDB_ADDITIONS_ALLOWED, additionsAllowed);
   77       job.setBoolean(CrawlDbFilter.URL_FILTERING, filter);
   78       job.setBoolean(CrawlDbFilter.URL_NORMALIZING, normalize);
   79       for (int i = 0; i < segments.length; i++) {
   80         Path fetch = new Path(segments[i], CrawlDatum.FETCH_DIR_NAME);
   81         Path parse = new Path(segments[i], CrawlDatum.PARSE_DIR_NAME);
   82         if (fs.exists(fetch) && fs.exists(parse)) {
   83           FileInputFormat.addInputPath(job, fetch);
   84           FileInputFormat.addInputPath(job, parse);
   85         } else {
   86           LOG.info(" - skipping invalid segment " + segments[i]);
   87         }
   88       }
   89   
   90       if (LOG.isInfoEnabled()) {
   91         LOG.info("CrawlDb update: Merging segment data into db.");
   92       }
   93       try {
   94         JobClient.runJob(job);
   95       } catch (IOException e) {
   96         LockUtil.removeLockFile(fs, lock);
   97         Path outPath = FileOutputFormat.getOutputPath(job);
   98         if (fs.exists(outPath) ) fs.delete(outPath, true);
   99         throw e;
  100       }
  101   
  102       CrawlDb.install(job, crawlDb);
  103       if (LOG.isInfoEnabled()) { LOG.info("CrawlDb update: done"); }
  104     }
  105   
  106     public static JobConf createJob(Configuration config, Path crawlDb)
  107       throws IOException {
  108       Path newCrawlDb =
  109         new Path(crawlDb,
  110                  Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
  111   
  112       JobConf job = new NutchJob(config);
  113       job.setJobName("crawldb " + crawlDb);
  114   
  115   
  116       Path current = new Path(crawlDb, CURRENT_NAME);
  117       if (FileSystem.get(job).exists(current)) {
  118         FileInputFormat.addInputPath(job, current);
  119       }
  120       job.setInputFormat(SequenceFileInputFormat.class);
  121   
  122       job.setMapperClass(CrawlDbFilter.class);
  123       job.setReducerClass(CrawlDbReducer.class);
  124   
  125       FileOutputFormat.setOutputPath(job, newCrawlDb);
  126       job.setOutputFormat(MapFileOutputFormat.class);
  127       job.setOutputKeyClass(Text.class);
  128       job.setOutputValueClass(CrawlDatum.class);
  129   
  130       return job;
  131     }
  132   
  133     public static void install(JobConf job, Path crawlDb) throws IOException {
  134       Path newCrawlDb = FileOutputFormat.getOutputPath(job);
  135       FileSystem fs = new JobClient(job).getFs();
  136       Path old = new Path(crawlDb, "old");
  137       Path current = new Path(crawlDb, CURRENT_NAME);
  138       if (fs.exists(current)) {
  139         if (fs.exists(old)) fs.delete(old, true);
  140         fs.rename(current, old);
  141       }
  142       fs.mkdirs(crawlDb);
  143       fs.rename(newCrawlDb, current);
  144       if (fs.exists(old)) fs.delete(old, true);
  145       Path lock = new Path(crawlDb, LOCK_NAME);
  146       LockUtil.removeLockFile(fs, lock);
  147     }
  148   
  149     public static void main(String[] args) throws Exception {
  150       int res = ToolRunner.run(NutchConfiguration.create(), new CrawlDb(), args);
  151       System.exit(res);
  152     }
  153   
  154     public int run(String[] args) throws Exception {
  155       if (args.length < 2) {
  156         System.err.println("Usage: CrawlDb <crawldb> (-dir <segments> | <seg1> <seg2> ...) [-force] [-normalize] [-filter] [-noAdditions]");
  157         System.err.println("\tcrawldb\tCrawlDb to update");
  158         System.err.println("\t-dir segments\tparent directory containing all segments to update from");
  159         System.err.println("\tseg1 seg2 ...\tlist of segment names to update from");
  160         System.err.println("\t-force\tforce update even if CrawlDb appears to be locked (CAUTION advised)");
  161         System.err.println("\t-normalize\tuse URLNormalizer on urls in CrawlDb and segment (usually not needed)");
  162         System.err.println("\t-filter\tuse URLFilters on urls in CrawlDb and segment");
  163         System.err.println("\t-noAdditions\tonly update already existing URLs, don't add any newly discovered URLs");
  164         return -1;
  165       }
  166       boolean normalize = false;
  167       boolean filter = false;
  168       boolean force = false;
  169       final FileSystem fs = FileSystem.get(getConf());
  170       boolean additionsAllowed = getConf().getBoolean(CRAWLDB_ADDITIONS_ALLOWED, true);
  171       HashSet<Path> dirs = new HashSet<Path>();
  172       for (int i = 1; i < args.length; i++) {
  173         if (args[i].equals("-normalize")) {
  174           normalize = true;
  175         } else if (args[i].equals("-filter")) {
  176           filter = true;
  177         } else if (args[i].equals("-force")) {
  178           force = true;
  179         } else if (args[i].equals("-noAdditions")) {
  180           additionsAllowed = false;
  181         } else if (args[i].equals("-dir")) {
  182           FileStatus[] paths = fs.listStatus(new Path(args[++i]), HadoopFSUtil.getPassDirectoriesFilter(fs));
  183           dirs.addAll(Arrays.asList(HadoopFSUtil.getPaths(paths)));
  184         } else {
  185           dirs.add(new Path(args[i]));
  186         }
  187       }
  188       try {
  189         update(new Path(args[0]), dirs.toArray(new Path[dirs.size()]), normalize, filter, additionsAllowed, force);
  190         return 0;
  191       } catch (Exception e) {
  192         LOG.fatal("CrawlDb update: " + StringUtils.stringifyException(e));
  193         return -1;
  194       }
  195     }
  196   }

Save This Page
Home » nutch-1.0 » org.apache.nutch » crawl » [javadoc | source]