Save This Page
Home » nutch-1.0 » org.apache.nutch » fetcher » [javadoc | source]
    1   /*
    2    * Licensed to the Apache Software Foundation (ASF) under one or more
    3    * contributor license agreements.  See the NOTICE file distributed with
    4    * this work for additional information regarding copyright ownership.
    5    * The ASF licenses this file to You under the Apache License, Version 2.0
    6    * (the "License"); you may not use this file except in compliance with
    7    * the License.  You may obtain a copy of the License at
    8    *
    9    *     http://www.apache.org/licenses/LICENSE-2.0
   10    *
   11    * Unless required by applicable law or agreed to in writing, software
   12    * distributed under the License is distributed on an "AS IS" BASIS,
   13    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   14    * See the License for the specific language governing permissions and
   15    * limitations under the License.
   16    */
   17   package org.apache.nutch.fetcher;
   18   
   19   import java.io.IOException;
   20   import java.net.InetAddress;
   21   import java.net.MalformedURLException;
   22   import java.net.URL;
   23   import java.net.UnknownHostException;
   24   import java.util;
   25   import java.util.Map.Entry;
   26   import java.util.concurrent.atomic.AtomicInteger;
   27   import java.util.concurrent.atomic.AtomicLong;
   28   
   29   // Commons Logging imports
   30   import org.apache.commons.logging.Log;
   31   import org.apache.commons.logging.LogFactory;
   32   
   33   import org.apache.hadoop.io;
   34   import org.apache.hadoop.fs;
   35   import org.apache.hadoop.conf;
   36   import org.apache.hadoop.mapred;
   37   import org.apache.hadoop.util.StringUtils;
   38   
   39   import org.apache.nutch.crawl.CrawlDatum;
   40   import org.apache.nutch.crawl.NutchWritable;
   41   import org.apache.nutch.crawl.SignatureFactory;
   42   import org.apache.nutch.metadata.Metadata;
   43   import org.apache.nutch.metadata.Nutch;
   44   import org.apache.nutch.net;
   45   import org.apache.nutch.protocol;
   46   import org.apache.nutch.parse;
   47   import org.apache.nutch.scoring.ScoringFilters;
   48   import org.apache.nutch.util;
   49   
   50   
   51   /** 
   52    * A queue-based fetcher.
   53    * 
   54    * <p>This fetcher uses a well-known model of one producer (a QueueFeeder)
   55    * and many consumers (FetcherThread-s).
   56    * 
   57    * <p>QueueFeeder reads input fetchlists and
   58    * populates a set of FetchItemQueue-s, which hold FetchItem-s that
   59    * describe the items to be fetched. There are as many queues as there are unique
   60    * hosts, but at any given time the total number of fetch items in all queues
   61    * is less than a fixed number (currently set to a multiple of the number of
   62    * threads).
   63    * 
   64    * <p>As items are consumed from the queues, the QueueFeeder continues to add new
   65    * input items, so that their total count stays fixed (FetcherThread-s may also
   66    * add new items to the queues e.g. as a results of redirection) - until all
   67    * input items are exhausted, at which point the number of items in the queues
   68    * begins to decrease. When this number reaches 0 fetcher will finish.
   69    * 
   70    * <p>This fetcher implementation handles per-host blocking itself, instead
   71    * of delegating this work to protocol-specific plugins.
   72    * Each per-host queue handles its own "politeness" settings, such as the
   73    * maximum number of concurrent requests and crawl delay between consecutive
   74    * requests - and also a list of requests in progress, and the time the last
   75    * request was finished. As FetcherThread-s ask for new items to be fetched,
   76    * queues may return eligible items or null if for "politeness" reasons this
   77    * host's queue is not yet ready.
   78    * 
   79    * <p>If there are still unfetched items in the queues, but none of the items
   80    * are ready, FetcherThread-s will spin-wait until either some items become
   81    * available, or a timeout is reached (at which point the Fetcher will abort,
   82    * assuming the task is hung).
   83    * 
   84    * @author Andrzej Bialecki
   85    */
   86   public class Fetcher extends Configured implements
   87       MapRunnable<Text, CrawlDatum, Text, NutchWritable> { 
   88   
   89     public static final int PERM_REFRESH_TIME = 5;
   90   
   91     public static final String CONTENT_REDIR = "content";
   92   
   93     public static final String PROTOCOL_REDIR = "protocol";
   94   
   95     public static final Log LOG = LogFactory.getLog(Fetcher.class);
   96     
   97     public static class InputFormat extends SequenceFileInputFormat<Text, CrawlDatum> {
   98       /** Don't split inputs, to keep things polite. */
   99       public InputSplit[] getSplits(JobConf job, int nSplits)
  100         throws IOException {
  101         FileStatus[] files = listStatus(job);
  102         FileSplit[] splits = new FileSplit[files.length];
  103         for (int i = 0; i < files.length; i++) {
  104           FileStatus cur = files[i];
  105           splits[i] = new FileSplit(cur.getPath(), 0,
  106               cur.getLen(), (String[])null);
  107         }
  108         return splits;
  109       }
  110     }
  111   
  112     private OutputCollector<Text, NutchWritable> output;
  113     private Reporter reporter;
  114     
  115     private String segmentName;
  116     private AtomicInteger activeThreads = new AtomicInteger(0);
  117     private AtomicInteger spinWaiting = new AtomicInteger(0);
  118   
  119     private long start = System.currentTimeMillis(); // start time of fetcher run
  120     private AtomicLong lastRequestStart = new AtomicLong(start);
  121   
  122     private AtomicLong bytes = new AtomicLong(0);        // total bytes fetched
  123     private AtomicInteger pages = new AtomicInteger(0);  // total pages fetched
  124     private AtomicInteger errors = new AtomicInteger(0); // total pages errored
  125   
  126     private boolean storingContent;
  127     private boolean parsing;
  128     FetchItemQueues fetchQueues;
  129     QueueFeeder feeder;
  130     
  131     /**
  132      * This class described the item to be fetched.
  133      */
  134     private static class FetchItem {    
  135       String queueID;
  136       Text url;
  137       URL u;
  138       CrawlDatum datum;
  139       
  140       public FetchItem(Text url, URL u, CrawlDatum datum, String queueID) {
  141         this.url = url;
  142         this.u = u;
  143         this.datum = datum;
  144         this.queueID = queueID;
  145       }
  146       
  147       /** Create an item. Queue id will be created based on <code>byIP</code>
  148        * argument, either as a protocol + hostname pair, or protocol + IP
  149        * address pair.
  150        */
  151       public static FetchItem create(Text url, CrawlDatum datum, boolean byIP) {
  152         String queueID;
  153         URL u = null;
  154         try {
  155           u = new URL(url.toString());
  156         } catch (Exception e) {
  157           LOG.warn("Cannot parse url: " + url, e);
  158           return null;
  159         }
  160         String proto = u.getProtocol().toLowerCase();
  161         String host;
  162         if (byIP) {
  163           try {
  164             InetAddress addr = InetAddress.getByName(u.getHost());
  165             host = addr.getHostAddress();
  166           } catch (UnknownHostException e) {
  167             // unable to resolve it, so don't fall back to host name
  168             LOG.warn("Unable to resolve: " + u.getHost() + ", skipping.");
  169             return null;
  170           }
  171         } else {
  172           host = u.getHost();
  173           if (host == null) {
  174             LOG.warn("Unknown host for url: " + url + ", skipping.");
  175             return null;
  176           }
  177           host = host.toLowerCase();
  178         }
  179         queueID = proto + "://" + host;
  180         return new FetchItem(url, u, datum, queueID);
  181       }
  182   
  183       public CrawlDatum getDatum() {
  184         return datum;
  185       }
  186   
  187       public String getQueueID() {
  188         return queueID;
  189       }
  190   
  191       public Text getUrl() {
  192         return url;
  193       }
  194       
  195       public URL getURL2() {
  196         return u;
  197       }
  198     }
  199     
  200     /**
  201      * This class handles FetchItems which come from the same host ID (be it
  202      * a proto/hostname or proto/IP pair). It also keeps track of requests in
  203      * progress and elapsed time between requests.
  204      */
  205     private static class FetchItemQueue {
  206       List<FetchItem> queue = Collections.synchronizedList(new LinkedList<FetchItem>());
  207       Set<FetchItem>  inProgress = Collections.synchronizedSet(new HashSet<FetchItem>());
  208       AtomicLong nextFetchTime = new AtomicLong();
  209       long crawlDelay;
  210       long minCrawlDelay;
  211       int maxThreads;
  212       Configuration conf;
  213       
  214       public FetchItemQueue(Configuration conf, int maxThreads, long crawlDelay, long minCrawlDelay) {
  215         this.conf = conf;
  216         this.maxThreads = maxThreads;
  217         this.crawlDelay = crawlDelay;
  218         this.minCrawlDelay = minCrawlDelay;
  219         // ready to start
  220         setEndTime(System.currentTimeMillis() - crawlDelay);
  221       }
  222       
  223       public int getQueueSize() {
  224         return queue.size();
  225       }
  226       
  227       public int getInProgressSize() {
  228         return inProgress.size();
  229       }
  230       
  231       public void finishFetchItem(FetchItem it, boolean asap) {
  232         if (it != null) {
  233           inProgress.remove(it);
  234           setEndTime(System.currentTimeMillis(), asap);
  235         }
  236       }
  237       
  238       public void addFetchItem(FetchItem it) {
  239         if (it == null) return;
  240         queue.add(it);
  241       }
  242       
  243       public void addInProgressFetchItem(FetchItem it) {
  244         if (it == null) return;
  245         inProgress.add(it);
  246       }
  247       
  248       public FetchItem getFetchItem() {
  249         if (inProgress.size() >= maxThreads) return null;
  250         long now = System.currentTimeMillis();
  251         if (nextFetchTime.get() > now) return null;
  252         FetchItem it = null;
  253         if (queue.size() == 0) return null;
  254         try {
  255           it = queue.remove(0);
  256           inProgress.add(it);
  257         } catch (Exception e) {
  258           LOG.error("Cannot remove FetchItem from queue or cannot add it to inProgress queue", e);
  259         }
  260         return it;
  261       }
  262       
  263       public synchronized void dump() {
  264         LOG.info("  maxThreads    = " + maxThreads);
  265         LOG.info("  inProgress    = " + inProgress.size());
  266         LOG.info("  crawlDelay    = " + crawlDelay);
  267         LOG.info("  minCrawlDelay = " + minCrawlDelay);
  268         LOG.info("  nextFetchTime = " + nextFetchTime.get());
  269         LOG.info("  now           = " + System.currentTimeMillis());
  270         for (int i = 0; i < queue.size(); i++) {
  271           FetchItem it = queue.get(i);
  272           LOG.info("  " + i + ". " + it.url);
  273         }
  274       }
  275       
  276       private void setEndTime(long endTime) {
  277         setEndTime(endTime, false);
  278       }
  279       
  280       private void setEndTime(long endTime, boolean asap) {
  281         if (!asap)
  282           nextFetchTime.set(endTime + (maxThreads > 1 ? minCrawlDelay : crawlDelay));
  283         else
  284           nextFetchTime.set(endTime);
  285       }
  286     }
  287     
  288     /**
  289      * Convenience class - a collection of queues that keeps track of the total
  290      * number of items, and provides items eligible for fetching from any queue.
  291      */
  292     private static class FetchItemQueues {
  293       public static final String DEFAULT_ID = "default";
  294       Map<String, FetchItemQueue> queues = new HashMap<String, FetchItemQueue>();
  295       AtomicInteger totalSize = new AtomicInteger(0);
  296       int maxThreads;
  297       boolean byIP;
  298       long crawlDelay;
  299       long minCrawlDelay;
  300       Configuration conf;    
  301       
  302       public FetchItemQueues(Configuration conf) {
  303         this.conf = conf;
  304         this.maxThreads = conf.getInt("fetcher.threads.per.host", 1);
  305         // backward-compatible default setting
  306         this.byIP = conf.getBoolean("fetcher.threads.per.host.by.ip", false);
  307         this.crawlDelay = (long) (conf.getFloat("fetcher.server.delay", 1.0f) * 1000);
  308         this.minCrawlDelay = (long) (conf.getFloat("fetcher.server.min.delay", 0.0f) * 1000);
  309       }
  310       
  311       public int getTotalSize() {
  312         return totalSize.get();
  313       }
  314       
  315       public int getQueueCount() {
  316         return queues.size();
  317       }
  318       
  319       public void addFetchItem(Text url, CrawlDatum datum) {
  320         FetchItem it = FetchItem.create(url, datum, byIP);
  321         if (it != null) addFetchItem(it);
  322       }
  323       
  324       public void addFetchItem(FetchItem it) {
  325         FetchItemQueue fiq = getFetchItemQueue(it.queueID);
  326         fiq.addFetchItem(it);
  327         totalSize.incrementAndGet();
  328       }
  329       
  330       public void finishFetchItem(FetchItem it) {
  331         finishFetchItem(it, false);
  332       }
  333       
  334       public void finishFetchItem(FetchItem it, boolean asap) {
  335         FetchItemQueue fiq = queues.get(it.queueID);
  336         if (fiq == null) {
  337           LOG.warn("Attempting to finish item from unknown queue: " + it);
  338           return;
  339         }
  340         fiq.finishFetchItem(it, asap);
  341       }
  342       
  343       public synchronized FetchItemQueue getFetchItemQueue(String id) {
  344         FetchItemQueue fiq = queues.get(id);
  345         if (fiq == null) {
  346           // initialize queue
  347           fiq = new FetchItemQueue(conf, maxThreads, crawlDelay, minCrawlDelay);
  348           queues.put(id, fiq);
  349         }
  350         return fiq;
  351       }
  352       
  353       public synchronized FetchItem getFetchItem() {
  354         Iterator<Map.Entry<String, FetchItemQueue>> it =
  355           queues.entrySet().iterator();
  356         while (it.hasNext()) {
  357           FetchItemQueue fiq = it.next().getValue();
  358           // reap empty queues
  359           if (fiq.getQueueSize() == 0 && fiq.getInProgressSize() == 0) {
  360             it.remove();
  361             continue;
  362           }
  363           FetchItem fit = fiq.getFetchItem();
  364           if (fit != null) {
  365             totalSize.decrementAndGet();
  366             return fit;
  367           }
  368         }
  369         return null;
  370       }
  371       
  372       public synchronized void dump() {
  373         for (String id : queues.keySet()) {
  374           FetchItemQueue fiq = queues.get(id);
  375           if (fiq.getQueueSize() == 0) continue;
  376           LOG.info("* queue: " + id);
  377           fiq.dump();
  378         }
  379       }
  380     }
  381     
  382     /**
  383      * This class feeds the queues with input items, and re-fills them as
  384      * items are consumed by FetcherThread-s.
  385      */
  386     private static class QueueFeeder extends Thread {
  387       private RecordReader<Text, CrawlDatum> reader;
  388       private FetchItemQueues queues;
  389       private int size;
  390       
  391       public QueueFeeder(RecordReader<Text, CrawlDatum> reader,
  392           FetchItemQueues queues, int size) {
  393         this.reader = reader;
  394         this.queues = queues;
  395         this.size = size;
  396         this.setDaemon(true);
  397         this.setName("QueueFeeder");
  398       }
  399       
  400       public void run() {
  401         boolean hasMore = true;
  402         int cnt = 0;
  403         
  404         while (hasMore) {
  405           int feed = size - queues.getTotalSize();
  406           if (feed <= 0) {
  407             // queues are full - spin-wait until they have some free space
  408             try {
  409               Thread.sleep(1000);
  410             } catch (Exception e) {};
  411             continue;
  412           } else {
  413             LOG.debug("-feeding " + feed + " input urls ...");
  414             while (feed > 0 && hasMore) {
  415               try {
  416                 Text url = new Text();
  417                 CrawlDatum datum = new CrawlDatum();
  418                 hasMore = reader.next(url, datum);
  419                 if (hasMore) {
  420                   queues.addFetchItem(url, datum);
  421                   cnt++;
  422                   feed--;
  423                 }
  424               } catch (IOException e) {
  425                 LOG.fatal("QueueFeeder error reading input, record " + cnt, e);
  426                 return;
  427               }
  428             }
  429           }
  430         }
  431         LOG.info("QueueFeeder finished: total " + cnt + " records.");
  432       }
  433     }
  434     
  435     /**
  436      * This class picks items from queues and fetches the pages.
  437      */
  438     private class FetcherThread extends Thread {
  439       private Configuration conf;
  440       private URLFilters urlFilters;
  441       private ScoringFilters scfilters;
  442       private ParseUtil parseUtil;
  443       private URLNormalizers normalizers;
  444       private ProtocolFactory protocolFactory;
  445       private long maxCrawlDelay;
  446       private boolean byIP;
  447       private int maxRedirect;
  448       private String reprUrl;
  449       private boolean redirecting;
  450       private int redirectCount;
  451       private boolean ignoreExternalLinks;
  452   
  453       public FetcherThread(Configuration conf) {
  454         this.setDaemon(true);                       // don't hang JVM on exit
  455         this.setName("FetcherThread");              // use an informative name
  456         this.conf = conf;
  457         this.urlFilters = new URLFilters(conf);
  458         this.scfilters = new ScoringFilters(conf);
  459         this.parseUtil = new ParseUtil(conf);
  460         this.protocolFactory = new ProtocolFactory(conf);
  461         this.normalizers = new URLNormalizers(conf, URLNormalizers.SCOPE_FETCHER);
  462         this.maxCrawlDelay = conf.getInt("fetcher.max.crawl.delay", 30) * 1000;
  463         // backward-compatible default setting
  464         this.byIP = conf.getBoolean("fetcher.threads.per.host.by.ip", true);
  465         this.maxRedirect = conf.getInt("http.redirect.max", 3);
  466         this.ignoreExternalLinks = 
  467           conf.getBoolean("db.ignore.external.links", false);
  468       }
  469   
  470       public void run() {
  471         activeThreads.incrementAndGet(); // count threads
  472         
  473         FetchItem fit = null;
  474         try {
  475           
  476           while (true) {
  477             fit = fetchQueues.getFetchItem();
  478             if (fit == null) {
  479               if (feeder.isAlive() || fetchQueues.getTotalSize() > 0) {
  480                 LOG.debug(getName() + " spin-waiting ...");
  481                 // spin-wait.
  482                 spinWaiting.incrementAndGet();
  483                 try {
  484                   Thread.sleep(500);
  485                 } catch (Exception e) {}
  486                   spinWaiting.decrementAndGet();
  487                 continue;
  488               } else {
  489                 // all done, finish this thread
  490                 return;
  491               }
  492             }
  493             lastRequestStart.set(System.currentTimeMillis());
  494             Text reprUrlWritable =
  495               (Text) fit.datum.getMetaData().get(Nutch.WRITABLE_REPR_URL_KEY);
  496             if (reprUrlWritable == null) {
  497               reprUrl = fit.url.toString();
  498             } else {
  499               reprUrl = reprUrlWritable.toString();
  500             }
  501             try {
  502               if (LOG.isInfoEnabled()) { LOG.info("fetching " + fit.url); }
  503   
  504               // fetch the page
  505               redirecting = false;
  506               redirectCount = 0;
  507               do {
  508                 if (LOG.isDebugEnabled()) {
  509                   LOG.debug("redirectCount=" + redirectCount);
  510                 }
  511                 redirecting = false;
  512                 Protocol protocol = this.protocolFactory.getProtocol(fit.url.toString());
  513                 RobotRules rules = protocol.getRobotRules(fit.url, fit.datum);
  514                 if (!rules.isAllowed(fit.u)) {
  515                   // unblock
  516                   fetchQueues.finishFetchItem(fit, true);
  517                   if (LOG.isDebugEnabled()) {
  518                     LOG.debug("Denied by robots.txt: " + fit.url);
  519                   }
  520                   output(fit.url, fit.datum, null, ProtocolStatus.STATUS_ROBOTS_DENIED, CrawlDatum.STATUS_FETCH_GONE);
  521                   continue;
  522                 }
  523                 if (rules.getCrawlDelay() > 0) {
  524                   if (rules.getCrawlDelay() > maxCrawlDelay) {
  525                     // unblock
  526                     fetchQueues.finishFetchItem(fit, true);
  527                     LOG.debug("Crawl-Delay for " + fit.url + " too long (" + rules.getCrawlDelay() + "), skipping");
  528                     output(fit.url, fit.datum, null, ProtocolStatus.STATUS_ROBOTS_DENIED, CrawlDatum.STATUS_FETCH_GONE);
  529                     continue;
  530                   } else {
  531                     FetchItemQueue fiq = fetchQueues.getFetchItemQueue(fit.queueID);
  532                     fiq.crawlDelay = rules.getCrawlDelay();
  533                   }
  534                 }
  535                 ProtocolOutput output = protocol.getProtocolOutput(fit.url, fit.datum);
  536                 ProtocolStatus status = output.getStatus();
  537                 Content content = output.getContent();
  538                 ParseStatus pstatus = null;
  539                 // unblock queue
  540                 fetchQueues.finishFetchItem(fit);
  541   
  542                 String urlString = fit.url.toString();
  543   
  544                 switch(status.getCode()) {
  545                   
  546                 case ProtocolStatus.WOULDBLOCK:
  547                   // retry ?
  548                   fetchQueues.addFetchItem(fit);
  549                   break;
  550   
  551                 case ProtocolStatus.SUCCESS:        // got a page
  552                   pstatus = output(fit.url, fit.datum, content, status, CrawlDatum.STATUS_FETCH_SUCCESS);
  553                   updateStatus(content.getContent().length);
  554                   if (pstatus != null && pstatus.isSuccess() &&
  555                           pstatus.getMinorCode() == ParseStatus.SUCCESS_REDIRECT) {
  556                     String newUrl = pstatus.getMessage();
  557                     int refreshTime = Integer.valueOf(pstatus.getArgs()[1]);
  558                     Text redirUrl =
  559                       handleRedirect(fit.url, fit.datum,
  560                                      urlString, newUrl,
  561                                      refreshTime < Fetcher.PERM_REFRESH_TIME,
  562                                      Fetcher.CONTENT_REDIR);
  563                     if (redirUrl != null) {
  564                       CrawlDatum newDatum = new CrawlDatum(CrawlDatum.STATUS_DB_UNFETCHED,
  565                           fit.datum.getFetchInterval(), fit.datum.getScore());
  566                       if (reprUrl != null) {
  567                         newDatum.getMetaData().put(Nutch.WRITABLE_REPR_URL_KEY,
  568                             new Text(reprUrl));
  569                       }
  570                       fit = FetchItem.create(redirUrl, newDatum, byIP);
  571                       if (fit != null) {
  572                         FetchItemQueue fiq =
  573                           fetchQueues.getFetchItemQueue(fit.queueID);
  574                         fiq.addInProgressFetchItem(fit);
  575                       } else {
  576                         // stop redirecting
  577                         redirecting = false;
  578                       }
  579                     }
  580                   }
  581                   break;
  582   
  583                 case ProtocolStatus.MOVED:         // redirect
  584                 case ProtocolStatus.TEMP_MOVED:
  585                   int code;
  586                   boolean temp;
  587                   if (status.getCode() == ProtocolStatus.MOVED) {
  588                     code = CrawlDatum.STATUS_FETCH_REDIR_PERM;
  589                     temp = false;
  590                   } else {
  591                     code = CrawlDatum.STATUS_FETCH_REDIR_TEMP;
  592                     temp = true;
  593                   }
  594                   output(fit.url, fit.datum, content, status, code);
  595                   String newUrl = status.getMessage();
  596                   Text redirUrl =
  597                     handleRedirect(fit.url, fit.datum,
  598                                    urlString, newUrl, temp,
  599                                    Fetcher.PROTOCOL_REDIR);
  600                   if (redirUrl != null) {
  601                     CrawlDatum newDatum = new CrawlDatum(CrawlDatum.STATUS_DB_UNFETCHED,
  602                         fit.datum.getFetchInterval(), fit.datum.getScore());
  603                     if (reprUrl != null) {
  604                       newDatum.getMetaData().put(Nutch.WRITABLE_REPR_URL_KEY,
  605                           new Text(reprUrl));
  606                     }
  607                     fit = FetchItem.create(redirUrl, newDatum, byIP);
  608                     if (fit != null) {
  609                       FetchItemQueue fiq =
  610                         fetchQueues.getFetchItemQueue(fit.queueID);
  611                       fiq.addInProgressFetchItem(fit);
  612                     } else {
  613                       // stop redirecting
  614                       redirecting = false;
  615                     }
  616                   } else {
  617                     // stop redirecting
  618                     redirecting = false;
  619                   }
  620                   break;
  621   
  622                 case ProtocolStatus.EXCEPTION:
  623                   logError(fit.url, status.getMessage());
  624                   /* FALLTHROUGH */
  625                 case ProtocolStatus.RETRY:          // retry
  626                 case ProtocolStatus.BLOCKED:
  627                   output(fit.url, fit.datum, null, status, CrawlDatum.STATUS_FETCH_RETRY);
  628                   break;
  629                   
  630                 case ProtocolStatus.GONE:           // gone
  631                 case ProtocolStatus.NOTFOUND:
  632                 case ProtocolStatus.ACCESS_DENIED:
  633                 case ProtocolStatus.ROBOTS_DENIED:
  634                   output(fit.url, fit.datum, null, status, CrawlDatum.STATUS_FETCH_GONE);
  635                   break;
  636   
  637                 case ProtocolStatus.NOTMODIFIED:
  638                   output(fit.url, fit.datum, null, status, CrawlDatum.STATUS_FETCH_NOTMODIFIED);
  639                   break;
  640   
  641                 default:
  642                   if (LOG.isWarnEnabled()) {
  643                     LOG.warn("Unknown ProtocolStatus: " + status.getCode());
  644                   }
  645                   output(fit.url, fit.datum, null, status, CrawlDatum.STATUS_FETCH_RETRY);
  646                 }
  647   
  648                 if (redirecting && redirectCount >= maxRedirect) {
  649                   fetchQueues.finishFetchItem(fit);
  650                   if (LOG.isInfoEnabled()) {
  651                     LOG.info(" - redirect count exceeded " + fit.url);
  652                   }
  653                   output(fit.url, fit.datum, null, ProtocolStatus.STATUS_REDIR_EXCEEDED, CrawlDatum.STATUS_FETCH_GONE);
  654                 }
  655   
  656               } while (redirecting && (redirectCount < maxRedirect));
  657               
  658             } catch (Throwable t) {                 // unexpected exception
  659               // unblock
  660               fetchQueues.finishFetchItem(fit);
  661               logError(fit.url, t.toString());
  662               output(fit.url, fit.datum, null, ProtocolStatus.STATUS_FAILED, CrawlDatum.STATUS_FETCH_RETRY);
  663             }
  664           }
  665   
  666         } catch (Throwable e) {
  667           if (LOG.isFatalEnabled()) {
  668             e.printStackTrace(LogUtil.getFatalStream(LOG));
  669             LOG.fatal("fetcher caught:"+e.toString());
  670           }
  671         } finally {
  672           if (fit != null) fetchQueues.finishFetchItem(fit);
  673           activeThreads.decrementAndGet(); // count threads
  674           LOG.info("-finishing thread " + getName() + ", activeThreads=" + activeThreads);
  675         }
  676       }
  677   
  678       private Text handleRedirect(Text url, CrawlDatum datum,
  679                                   String urlString, String newUrl,
  680                                   boolean temp, String redirType)
  681       throws MalformedURLException, URLFilterException {
  682         newUrl = normalizers.normalize(newUrl, URLNormalizers.SCOPE_FETCHER);
  683         newUrl = urlFilters.filter(newUrl);
  684         
  685         if (ignoreExternalLinks) {
  686           try {
  687             String origHost = new URL(urlString).getHost().toLowerCase();
  688             String newHost = new URL(newUrl).getHost().toLowerCase();
  689             if (!origHost.equals(newHost)) {
  690               if (LOG.isDebugEnabled()) {
  691                 LOG.debug(" - ignoring redirect " + redirType + " from " +
  692                             urlString + " to " + newUrl +
  693                             " because external links are ignored");
  694               }
  695               return null;
  696             }
  697           } catch (MalformedURLException e) { }
  698         }
  699         
  700         if (newUrl != null && !newUrl.equals(urlString)) {
  701           reprUrl = URLUtil.chooseRepr(reprUrl, newUrl, temp);
  702           url = new Text(newUrl);
  703           if (maxRedirect > 0) {
  704             redirecting = true;
  705             redirectCount++;
  706             if (LOG.isDebugEnabled()) {
  707               LOG.debug(" - " + redirType + " redirect to " +
  708                   url + " (fetching now)");
  709             }
  710             return url;
  711           } else {
  712             CrawlDatum newDatum = new CrawlDatum(CrawlDatum.STATUS_LINKED,
  713                 datum.getFetchInterval());
  714             if (reprUrl != null) {
  715               newDatum.getMetaData().put(Nutch.WRITABLE_REPR_URL_KEY,
  716                   new Text(reprUrl));
  717             }
  718             output(url, newDatum, null, null, CrawlDatum.STATUS_LINKED);
  719             if (LOG.isDebugEnabled()) {
  720               LOG.debug(" - " + redirType + " redirect to " +
  721                   url + " (fetching later)");
  722             }
  723             return null;
  724           }
  725         } else {
  726           if (LOG.isDebugEnabled()) {
  727             LOG.debug(" - " + redirType + " redirect skipped: " +
  728                 (newUrl != null ? "to same url" : "filtered"));
  729           }
  730           return null;
  731         }
  732       }
  733   
  734       private void logError(Text url, String message) {
  735         if (LOG.isInfoEnabled()) {
  736           LOG.info("fetch of " + url + " failed with: " + message);
  737         }
  738         errors.incrementAndGet();
  739       }
  740   
  741       private ParseStatus output(Text key, CrawlDatum datum,
  742                           Content content, ProtocolStatus pstatus, int status) {
  743   
  744         datum.setStatus(status);
  745         datum.setFetchTime(System.currentTimeMillis());
  746         if (pstatus != null) datum.getMetaData().put(Nutch.WRITABLE_PROTO_STATUS_KEY, pstatus);
  747   
  748         ParseResult parseResult = null;
  749         if (content != null) {
  750           Metadata metadata = content.getMetadata();
  751           // add segment to metadata
  752           metadata.set(Nutch.SEGMENT_NAME_KEY, segmentName);
  753           // add score to content metadata so that ParseSegment can pick it up.
  754           try {
  755             scfilters.passScoreBeforeParsing(key, datum, content);
  756           } catch (Exception e) {
  757             if (LOG.isWarnEnabled()) {
  758               e.printStackTrace(LogUtil.getWarnStream(LOG));
  759               LOG.warn("Couldn't pass score, url " + key + " (" + e + ")");
  760             }
  761           }
  762           /* Note: Fetcher will only follow meta-redirects coming from the
  763            * original URL. */ 
  764           if (parsing && status == CrawlDatum.STATUS_FETCH_SUCCESS) {
  765             try {
  766               parseResult = this.parseUtil.parse(content);
  767             } catch (Exception e) {
  768               LOG.warn("Error parsing: " + key + ": " + StringUtils.stringifyException(e));
  769             }
  770   
  771             if (parseResult == null) {
  772               byte[] signature = 
  773                 SignatureFactory.getSignature(getConf()).calculate(content, 
  774                     new ParseStatus().getEmptyParse(conf));
  775               datum.setSignature(signature);
  776             }
  777           }
  778           
  779           /* Store status code in content So we can read this value during 
  780            * parsing (as a separate job) and decide to parse or not.
  781            */
  782           content.getMetadata().add(Nutch.FETCH_STATUS_KEY, Integer.toString(status));
  783         }
  784   
  785         try {
  786           output.collect(key, new NutchWritable(datum));
  787           if (content != null && storingContent)
  788             output.collect(key, new NutchWritable(content));
  789           if (parseResult != null) {
  790             for (Entry<Text, Parse> entry : parseResult) {
  791               Text url = entry.getKey();
  792               Parse parse = entry.getValue();
  793               ParseStatus parseStatus = parse.getData().getStatus();
  794               
  795               if (!parseStatus.isSuccess()) {
  796                 LOG.warn("Error parsing: " + key + ": " + parseStatus);
  797                 parse = parseStatus.getEmptyParse(getConf());
  798               }
  799   
  800               // Calculate page signature. For non-parsing fetchers this will
  801               // be done in ParseSegment
  802               byte[] signature = 
  803                 SignatureFactory.getSignature(getConf()).calculate(content, parse);
  804               // Ensure segment name and score are in parseData metadata
  805               parse.getData().getContentMeta().set(Nutch.SEGMENT_NAME_KEY, 
  806                   segmentName);
  807               parse.getData().getContentMeta().set(Nutch.SIGNATURE_KEY, 
  808                   StringUtil.toHexString(signature));
  809               // Pass fetch time to content meta
  810               parse.getData().getContentMeta().set(Nutch.FETCH_TIME_KEY,
  811                   Long.toString(datum.getFetchTime()));
  812               if (url.equals(key))
  813                 datum.setSignature(signature);
  814               try {
  815                 scfilters.passScoreAfterParsing(url, content, parse);
  816               } catch (Exception e) {
  817                 if (LOG.isWarnEnabled()) {
  818                   e.printStackTrace(LogUtil.getWarnStream(LOG));
  819                   LOG.warn("Couldn't pass score, url " + key + " (" + e + ")");
  820                 }
  821               }
  822               output.collect(url, new NutchWritable(
  823                       new ParseImpl(new ParseText(parse.getText()), 
  824                                     parse.getData(), parse.isCanonical())));
  825             }
  826           }
  827         } catch (IOException e) {
  828           if (LOG.isFatalEnabled()) {
  829             e.printStackTrace(LogUtil.getFatalStream(LOG));
  830             LOG.fatal("fetcher caught:"+e.toString());
  831           }
  832         }
  833   
  834         // return parse status if it exits
  835         if (parseResult != null && !parseResult.isEmpty()) {
  836           Parse p = parseResult.get(content.getUrl());
  837           if (p != null) {
  838             return p.getData().getStatus();
  839           }
  840         }
  841         return null;
  842       }
  843       
  844     }
  845   
  846     public Fetcher() { super(null); }
  847   
  848     public Fetcher(Configuration conf) { super(conf); }
  849   
  850     private void updateStatus(int bytesInPage) throws IOException {
  851       pages.incrementAndGet();
  852       bytes.addAndGet(bytesInPage);
  853     }
  854   
  855     
  856     private void reportStatus() throws IOException {
  857       String status;
  858       long elapsed = (System.currentTimeMillis() - start)/1000;
  859       status = activeThreads + " threads, " +
  860         pages+" pages, "+errors+" errors, "
  861         + Math.round(((float)pages.get()*10)/elapsed)/10.0+" pages/s, "
  862         + Math.round(((((float)bytes.get())*8)/1024)/elapsed)+" kb/s, ";
  863       reporter.setStatus(status);
  864     }
  865   
  866     public void configure(JobConf job) {
  867       setConf(job);
  868   
  869       this.segmentName = job.get(Nutch.SEGMENT_NAME_KEY);
  870       this.storingContent = isStoringContent(job);
  871       this.parsing = isParsing(job);
  872   
  873   //    if (job.getBoolean("fetcher.verbose", false)) {
  874   //      LOG.setLevel(Level.FINE);
  875   //    }
  876     }
  877   
  878     public void close() {}
  879   
  880     public static boolean isParsing(Configuration conf) {
  881       return conf.getBoolean("fetcher.parse", true);
  882     }
  883   
  884     public static boolean isStoringContent(Configuration conf) {
  885       return conf.getBoolean("fetcher.store.content", true);
  886     }
  887   
  888     public void run(RecordReader<Text, CrawlDatum> input,
  889         OutputCollector<Text, NutchWritable> output,
  890                     Reporter reporter) throws IOException {
  891   
  892       this.output = output;
  893       this.reporter = reporter;
  894       this.fetchQueues = new FetchItemQueues(getConf());
  895   
  896       int threadCount = getConf().getInt("fetcher.threads.fetch", 10);
  897       if (LOG.isInfoEnabled()) { LOG.info("Fetcher: threads: " + threadCount); }
  898   
  899       feeder = new QueueFeeder(input, fetchQueues, threadCount * 50);
  900       //feeder.setPriority((Thread.MAX_PRIORITY + Thread.NORM_PRIORITY) / 2);
  901       feeder.start();
  902   
  903       // set non-blocking & no-robots mode for HTTP protocol plugins.
  904       getConf().setBoolean(Protocol.CHECK_BLOCKING, false);
  905       getConf().setBoolean(Protocol.CHECK_ROBOTS, false);
  906       
  907       for (int i = 0; i < threadCount; i++) {       // spawn threads
  908         new FetcherThread(getConf()).start();
  909       }
  910   
  911       // select a timeout that avoids a task timeout
  912       long timeout = getConf().getInt("mapred.task.timeout", 10*60*1000)/2;
  913   
  914       do {                                          // wait for threads to exit
  915         try {
  916           Thread.sleep(1000);
  917         } catch (InterruptedException e) {}
  918   
  919         reportStatus();
  920         LOG.info("-activeThreads=" + activeThreads + ", spinWaiting=" + spinWaiting.get()
  921             + ", fetchQueues.totalSize=" + fetchQueues.getTotalSize());
  922   
  923         if (!feeder.isAlive() && fetchQueues.getTotalSize() < 5) {
  924           fetchQueues.dump();
  925         }
  926         // some requests seem to hang, despite all intentions
  927         if ((System.currentTimeMillis() - lastRequestStart.get()) > timeout) {
  928           if (LOG.isWarnEnabled()) {
  929             LOG.warn("Aborting with "+activeThreads+" hung threads.");
  930           }
  931           return;
  932         }
  933   
  934       } while (activeThreads.get() > 0);
  935       LOG.info("-activeThreads=" + activeThreads);
  936       
  937     }
  938   
  939     public void fetch(Path segment, int threads, boolean parsing)
  940       throws IOException {
  941   
  942       checkConfiguration();
  943   
  944       if (LOG.isInfoEnabled()) {
  945         LOG.info("Fetcher: starting");
  946         LOG.info("Fetcher: segment: " + segment);
  947       }
  948   
  949       JobConf job = new NutchJob(getConf());
  950       job.setJobName("fetch " + segment);
  951   
  952       job.setInt("fetcher.threads.fetch", threads);
  953       job.set(Nutch.SEGMENT_NAME_KEY, segment.getName());
  954       job.setBoolean("fetcher.parse", parsing);
  955   
  956       // for politeness, don't permit parallel execution of a single task
  957       job.setSpeculativeExecution(false);
  958   
  959       FileInputFormat.addInputPath(job, new Path(segment, CrawlDatum.GENERATE_DIR_NAME));
  960       job.setInputFormat(InputFormat.class);
  961   
  962       job.setMapRunnerClass(Fetcher.class);
  963   
  964       FileOutputFormat.setOutputPath(job, segment);
  965       job.setOutputFormat(FetcherOutputFormat.class);
  966       job.setOutputKeyClass(Text.class);
  967       job.setOutputValueClass(NutchWritable.class);
  968   
  969       JobClient.runJob(job);
  970       if (LOG.isInfoEnabled()) { LOG.info("Fetcher: done"); }
  971     }
  972   
  973   
  974     /** Run the fetcher. */
  975     public static void main(String[] args) throws Exception {
  976   
  977       String usage = "Usage: Fetcher <segment> [-threads n] [-noParsing]";
  978   
  979       if (args.length < 1) {
  980         System.err.println(usage);
  981         System.exit(-1);
  982       }
  983         
  984       Path segment = new Path(args[0]);
  985   
  986       Configuration conf = NutchConfiguration.create();
  987   
  988       int threads = conf.getInt("fetcher.threads.fetch", 10);
  989       boolean parsing = true;
  990   
  991       for (int i = 1; i < args.length; i++) {       // parse command line
  992         if (args[i].equals("-threads")) {           // found -threads option
  993           threads =  Integer.parseInt(args[++i]);
  994         } else if (args[i].equals("-noParsing")) parsing = false;
  995       }
  996   
  997       conf.setInt("fetcher.threads.fetch", threads);
  998       if (!parsing) {
  999         conf.setBoolean("fetcher.parse", parsing);
 1000       }
 1001       Fetcher fetcher = new Fetcher(conf);          // make a Fetcher
 1002       
 1003       fetcher.fetch(segment, threads, parsing);              // run the Fetcher
 1004   
 1005     }
 1006   
 1007     private void checkConfiguration() {
 1008   
 1009       // ensure that a value has been set for the agent name and that that
 1010       // agent name is the first value in the agents we advertise for robot
 1011       // rules parsing
 1012       String agentName = getConf().get("http.agent.name");
 1013       if (agentName == null || agentName.trim().length() == 0) {
 1014         String message = "Fetcher: No agents listed in 'http.agent.name'"
 1015             + " property.";
 1016         if (LOG.isFatalEnabled()) {
 1017           LOG.fatal(message);
 1018         }
 1019         throw new IllegalArgumentException(message);
 1020       } else {
 1021   
 1022         // get all of the agents that we advertise
 1023         String agentNames = getConf().get("http.robots.agents");
 1024         StringTokenizer tok = new StringTokenizer(agentNames, ",");
 1025         ArrayList<String> agents = new ArrayList<String>();
 1026         while (tok.hasMoreTokens()) {
 1027           agents.add(tok.nextToken().trim());
 1028         }
 1029   
 1030         // if the first one is not equal to our agent name, log fatal and throw
 1031         // an exception
 1032         if (!(agents.get(0)).equalsIgnoreCase(agentName)) {
 1033           String message = "Fetcher: Your 'http.agent.name' value should be "
 1034               + "listed first in 'http.robots.agents' property.";
 1035           if (LOG.isWarnEnabled()) {
 1036             LOG.warn(message);
 1037           }
 1038         }
 1039       }
 1040     }
 1041   
 1042   }

Save This Page
Home » nutch-1.0 » org.apache.nutch » fetcher » [javadoc | source]