Save This Page
Home » nutch-1.0 » org.apache.nutch » crawl » [javadoc | source]
    1   /**
    2    * Licensed to the Apache Software Foundation (ASF) under one or more
    3    * contributor license agreements.  See the NOTICE file distributed with
    4    * this work for additional information regarding copyright ownership.
    5    * The ASF licenses this file to You under the Apache License, Version 2.0
    6    * (the "License"); you may not use this file except in compliance with
    7    * the License.  You may obtain a copy of the License at
    8    *
    9    *     http://www.apache.org/licenses/LICENSE-2.0
   10    *
   11    * Unless required by applicable law or agreed to in writing, software
   12    * distributed under the License is distributed on an "AS IS" BASIS,
   13    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   14    * See the License for the specific language governing permissions and
   15    * limitations under the License.
   16    */
   17   
   18   package org.apache.nutch.crawl;
   19   
   20   import java.io;
   21   import java.util;
   22   import java.net;
   23   
   24   // Commons Logging imports
   25   import org.apache.commons.logging.Log;
   26   import org.apache.commons.logging.LogFactory;
   27   
   28   import org.apache.hadoop.io;
   29   import org.apache.hadoop.fs;
   30   import org.apache.hadoop.fs.FileSystem;
   31   import org.apache.hadoop.conf;
   32   import org.apache.hadoop.mapred;
   33   import org.apache.hadoop.util;
   34   
   35   import org.apache.nutch.net.URLFilters;
   36   import org.apache.nutch.net.URLNormalizers;
   37   import org.apache.nutch.parse;
   38   import org.apache.nutch.util.HadoopFSUtil;
   39   import org.apache.nutch.util.LockUtil;
   40   import org.apache.nutch.util.NutchConfiguration;
   41   import org.apache.nutch.util.NutchJob;
   42   
   43   /** Maintains an inverted link map, listing incoming links for each url. */
   44   public class LinkDb extends Configured implements Tool, Mapper<Text, ParseData, Text, Inlinks> {
   45   
   46     public static final Log LOG = LogFactory.getLog(LinkDb.class);
   47   
   48     public static final String CURRENT_NAME = "current";
   49     public static final String LOCK_NAME = ".locked";
   50   
   51     private int maxAnchorLength;
   52     private boolean ignoreInternalLinks;
   53     private URLFilters urlFilters;
   54     private URLNormalizers urlNormalizers;
   55     
   56     public LinkDb() {}
   57     
   58     public LinkDb(Configuration conf) {
   59       setConf(conf);
   60     }
   61     
   62     public void configure(JobConf job) {
   63       maxAnchorLength = job.getInt("db.max.anchor.length", 100);
   64       ignoreInternalLinks = job.getBoolean("db.ignore.internal.links", true);
   65       if (job.getBoolean(LinkDbFilter.URL_FILTERING, false)) {
   66         urlFilters = new URLFilters(job);
   67       }
   68       if (job.getBoolean(LinkDbFilter.URL_NORMALIZING, false)) {
   69         urlNormalizers = new URLNormalizers(job, URLNormalizers.SCOPE_LINKDB);
   70       }
   71     }
   72   
   73     public void close() {}
   74   
   75     public void map(Text key, ParseData parseData,
   76                     OutputCollector<Text, Inlinks> output, Reporter reporter)
   77       throws IOException {
   78       String fromUrl = key.toString();
   79       String fromHost = getHost(fromUrl);
   80       if (urlNormalizers != null) {
   81         try {
   82           fromUrl = urlNormalizers.normalize(fromUrl, URLNormalizers.SCOPE_LINKDB); // normalize the url
   83         } catch (Exception e) {
   84           LOG.warn("Skipping " + fromUrl + ":" + e);
   85           fromUrl = null;
   86         }
   87       }
   88       if (fromUrl != null && urlFilters != null) {
   89         try {
   90           fromUrl = urlFilters.filter(fromUrl); // filter the url
   91         } catch (Exception e) {
   92           LOG.warn("Skipping " + fromUrl + ":" + e);
   93           fromUrl = null;
   94         }
   95       }
   96       if (fromUrl == null) return; // discard all outlinks
   97       Outlink[] outlinks = parseData.getOutlinks();
   98       Inlinks inlinks = new Inlinks();
   99       for (int i = 0; i < outlinks.length; i++) {
  100         Outlink outlink = outlinks[i];
  101         String toUrl = outlink.getToUrl();
  102   
  103         if (ignoreInternalLinks) {
  104           String toHost = getHost(toUrl);
  105           if (toHost == null || toHost.equals(fromHost)) { // internal link
  106             continue;                               // skip it
  107           }
  108         }
  109         if (urlNormalizers != null) {
  110           try {
  111             toUrl = urlNormalizers.normalize(toUrl, URLNormalizers.SCOPE_LINKDB); // normalize the url
  112           } catch (Exception e) {
  113             LOG.warn("Skipping " + toUrl + ":" + e);
  114             toUrl = null;
  115           }
  116         }
  117         if (toUrl != null && urlFilters != null) {
  118           try {
  119             toUrl = urlFilters.filter(toUrl); // filter the url
  120           } catch (Exception e) {
  121             LOG.warn("Skipping " + toUrl + ":" + e);
  122             toUrl = null;
  123           }
  124         }
  125         if (toUrl == null) continue;
  126         inlinks.clear();
  127         String anchor = outlink.getAnchor();        // truncate long anchors
  128         if (anchor.length() > maxAnchorLength) {
  129           anchor = anchor.substring(0, maxAnchorLength);
  130         }
  131         inlinks.add(new Inlink(fromUrl, anchor));   // collect inverted link
  132         output.collect(new Text(toUrl), inlinks);
  133       }
  134     }
  135   
  136     private String getHost(String url) {
  137       try {
  138         return new URL(url).getHost().toLowerCase();
  139       } catch (MalformedURLException e) {
  140         return null;
  141       }
  142     }
  143   
  144     public void invert(Path linkDb, final Path segmentsDir, boolean normalize, boolean filter, boolean force) throws IOException {
  145       final FileSystem fs = FileSystem.get(getConf());
  146       FileStatus[] files = fs.listStatus(segmentsDir, HadoopFSUtil.getPassDirectoriesFilter(fs));
  147       invert(linkDb, HadoopFSUtil.getPaths(files), normalize, filter, force);
  148     }
  149   
  150     public void invert(Path linkDb, Path[] segments, boolean normalize, boolean filter, boolean force) throws IOException {
  151   
  152       Path lock = new Path(linkDb, LOCK_NAME);
  153       FileSystem fs = FileSystem.get(getConf());
  154       LockUtil.createLockFile(fs, lock, force);
  155       Path currentLinkDb = new Path(linkDb, CURRENT_NAME);
  156       if (LOG.isInfoEnabled()) {
  157         LOG.info("LinkDb: starting");
  158         LOG.info("LinkDb: linkdb: " + linkDb);
  159         LOG.info("LinkDb: URL normalize: " + normalize);
  160         LOG.info("LinkDb: URL filter: " + filter);
  161       }
  162       JobConf job = LinkDb.createJob(getConf(), linkDb, normalize, filter);
  163       for (int i = 0; i < segments.length; i++) {
  164         if (LOG.isInfoEnabled()) {
  165           LOG.info("LinkDb: adding segment: " + segments[i]);
  166         }
  167         FileInputFormat.addInputPath(job, new Path(segments[i], ParseData.DIR_NAME));
  168       }
  169       try {
  170         JobClient.runJob(job);
  171       } catch (IOException e) {
  172         LockUtil.removeLockFile(fs, lock);
  173         throw e;
  174       }
  175       if (fs.exists(currentLinkDb)) {
  176         if (LOG.isInfoEnabled()) {
  177           LOG.info("LinkDb: merging with existing linkdb: " + linkDb);
  178         }
  179         // try to merge
  180         Path newLinkDb = FileOutputFormat.getOutputPath(job);
  181         job = LinkDbMerger.createMergeJob(getConf(), linkDb, normalize, filter);
  182         FileInputFormat.addInputPath(job, currentLinkDb);
  183         FileInputFormat.addInputPath(job, newLinkDb);
  184         try {
  185           JobClient.runJob(job);
  186         } catch (IOException e) {
  187           LockUtil.removeLockFile(fs, lock);
  188           fs.delete(newLinkDb, true);
  189           throw e;
  190         }
  191         fs.delete(newLinkDb, true);
  192       }
  193       LinkDb.install(job, linkDb);
  194       if (LOG.isInfoEnabled()) { LOG.info("LinkDb: done"); }
  195     }
  196   
  197     private static JobConf createJob(Configuration config, Path linkDb, boolean normalize, boolean filter) {
  198       Path newLinkDb =
  199         new Path("linkdb-" +
  200                  Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
  201   
  202       JobConf job = new NutchJob(config);
  203       job.setJobName("linkdb " + linkDb);
  204   
  205       job.setInputFormat(SequenceFileInputFormat.class);
  206   
  207       job.setMapperClass(LinkDb.class);
  208       job.setCombinerClass(LinkDbMerger.class);
  209       // if we don't run the mergeJob, perform normalization/filtering now
  210       if (normalize || filter) {
  211         try {
  212           FileSystem fs = FileSystem.get(config);
  213           if (!fs.exists(linkDb)) {
  214             job.setBoolean(LinkDbFilter.URL_FILTERING, filter);
  215             job.setBoolean(LinkDbFilter.URL_NORMALIZING, normalize);
  216           }
  217         } catch (Exception e) {
  218           LOG.warn("LinkDb createJob: " + e);
  219         }
  220       }
  221       job.setReducerClass(LinkDbMerger.class);
  222   
  223       FileOutputFormat.setOutputPath(job, newLinkDb);
  224       job.setOutputFormat(MapFileOutputFormat.class);
  225       job.setBoolean("mapred.output.compress", true);
  226       job.setOutputKeyClass(Text.class);
  227       job.setOutputValueClass(Inlinks.class);
  228   
  229       return job;
  230     }
  231   
  232     public static void install(JobConf job, Path linkDb) throws IOException {
  233       Path newLinkDb = FileOutputFormat.getOutputPath(job);
  234       FileSystem fs = new JobClient(job).getFs();
  235       Path old = new Path(linkDb, "old");
  236       Path current = new Path(linkDb, CURRENT_NAME);
  237       if (fs.exists(current)) {
  238         if (fs.exists(old)) fs.delete(old, true);
  239         fs.rename(current, old);
  240       }
  241       fs.mkdirs(linkDb);
  242       fs.rename(newLinkDb, current);
  243       if (fs.exists(old)) fs.delete(old, true);
  244       LockUtil.removeLockFile(fs, new Path(linkDb, LOCK_NAME));
  245     }
  246   
  247     public static void main(String[] args) throws Exception {
  248       int res = ToolRunner.run(NutchConfiguration.create(), new LinkDb(), args);
  249       System.exit(res);
  250     }
  251     
  252     public int run(String[] args) throws Exception {
  253       if (args.length < 2) {
  254         System.err.println("Usage: LinkDb <linkdb> (-dir <segmentsDir> | <seg1> <seg2> ...) [-force] [-noNormalize] [-noFilter]");
  255         System.err.println("\tlinkdb\toutput LinkDb to create or update");
  256         System.err.println("\t-dir segmentsDir\tparent directory of several segments, OR");
  257         System.err.println("\tseg1 seg2 ...\t list of segment directories");
  258         System.err.println("\t-force\tforce update even if LinkDb appears to be locked (CAUTION advised)");
  259         System.err.println("\t-noNormalize\tdon't normalize link URLs");
  260         System.err.println("\t-noFilter\tdon't apply URLFilters to link URLs");
  261         return -1;
  262       }
  263       Path segDir = null;
  264       final FileSystem fs = FileSystem.get(getConf());
  265       Path db = new Path(args[0]);
  266       ArrayList<Path> segs = new ArrayList<Path>();
  267       boolean filter = true;
  268       boolean normalize = true;
  269       boolean force = false;
  270       for (int i = 1; i < args.length; i++) {
  271         if (args[i].equals("-dir")) {
  272           segDir = new Path(args[++i]);
  273           FileStatus[] files = fs.listStatus(segDir, HadoopFSUtil.getPassDirectoriesFilter(fs));
  274           if (files != null) segs.addAll(Arrays.asList(HadoopFSUtil.getPaths(files)));
  275           break;
  276         } else if (args[i].equalsIgnoreCase("-noNormalize")) {
  277           normalize = false;
  278         } else if (args[i].equalsIgnoreCase("-noFilter")) {
  279           filter = false;
  280         } else if (args[i].equalsIgnoreCase("-force")) {
  281           force = true;
  282         } else segs.add(new Path(args[i]));
  283       }
  284       try {
  285         invert(db, segs.toArray(new Path[segs.size()]), normalize, filter, force);
  286         return 0;
  287       } catch (Exception e) {
  288         LOG.fatal("LinkDb: " + StringUtils.stringifyException(e));
  289         return -1;
  290       }
  291     }
  292   
  293   }

Save This Page
Home » nutch-1.0 » org.apache.nutch » crawl » [javadoc | source]