Save This Page
Home » nutch-1.0 » org.apache.nutch » crawl » [javadoc | source]
    1   /**
    2    * Licensed to the Apache Software Foundation (ASF) under one or more
    3    * contributor license agreements.  See the NOTICE file distributed with
    4    * this work for additional information regarding copyright ownership.
    5    * The ASF licenses this file to You under the Apache License, Version 2.0
    6    * (the "License"); you may not use this file except in compliance with
    7    * the License.  You may obtain a copy of the License at
    8    *
    9    *     http://www.apache.org/licenses/LICENSE-2.0
   10    *
   11    * Unless required by applicable law or agreed to in writing, software
   12    * distributed under the License is distributed on an "AS IS" BASIS,
   13    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   14    * See the License for the specific language governing permissions and
   15    * limitations under the License.
   16    */
   17   
   18   package org.apache.nutch.crawl;
   19   
   20   import java.util.ArrayList;
   21   import java.util.Iterator;
   22   import java.io.IOException;
   23   
   24   // Commons Logging imports
   25   import org.apache.commons.logging.Log;
   26   import org.apache.commons.logging.LogFactory;
   27   
   28   import org.apache.hadoop.io;
   29   import org.apache.hadoop.mapred;
   30   import org.apache.nutch.metadata.Nutch;
   31   import org.apache.nutch.scoring.ScoringFilterException;
   32   import org.apache.nutch.scoring.ScoringFilters;
   33   
   34   /** Merge new page entries with existing entries. */
   35   public class CrawlDbReducer implements Reducer<Text, CrawlDatum, Text, CrawlDatum> {
   36     public static final Log LOG = LogFactory.getLog(CrawlDbReducer.class);
   37     
   38     private int retryMax;
   39     private CrawlDatum result = new CrawlDatum();
   40     private ArrayList<CrawlDatum> linked = new ArrayList<CrawlDatum>();
   41     private ScoringFilters scfilters = null;
   42     private boolean additionsAllowed;
   43     private int maxInterval;
   44     private FetchSchedule schedule;
   45     private CrawlDatum fetch = new CrawlDatum();
   46     private CrawlDatum old = new CrawlDatum();
   47   
   48     public void configure(JobConf job) {
   49       retryMax = job.getInt("db.fetch.retry.max", 3);
   50       scfilters = new ScoringFilters(job);
   51       additionsAllowed = job.getBoolean(CrawlDb.CRAWLDB_ADDITIONS_ALLOWED, true);
   52       int oldMaxInterval = job.getInt("db.max.fetch.interval", 0);
   53       maxInterval = job.getInt("db.fetch.interval.max", 0 );
   54       if (oldMaxInterval > 0 && maxInterval == 0) maxInterval = oldMaxInterval * FetchSchedule.SECONDS_PER_DAY;
   55       schedule = FetchScheduleFactory.getFetchSchedule(job);
   56     }
   57   
   58     public void close() {}
   59   
   60     public void reduce(Text key, Iterator<CrawlDatum> values,
   61                        OutputCollector<Text, CrawlDatum> output, Reporter reporter)
   62       throws IOException {
   63   
   64       boolean fetchSet = false;
   65       boolean oldSet = false;
   66       byte[] signature = null;
   67       linked.clear();
   68   
   69       while (values.hasNext()) {
   70         CrawlDatum datum = (CrawlDatum)values.next();
   71         if (CrawlDatum.hasDbStatus(datum)) {
   72           if (!oldSet) {
   73             old.set(datum);
   74             oldSet = true;
   75           } else {
   76             // always take the latest version
   77             if (old.getFetchTime() < datum.getFetchTime()) old.set(datum);
   78           }
   79           continue;
   80         }
   81   
   82         if (CrawlDatum.hasFetchStatus(datum)) {
   83           if (!fetchSet) {
   84             fetch.set(datum);
   85             fetchSet = true;
   86           } else {
   87             // always take the latest version
   88             if (fetch.getFetchTime() < datum.getFetchTime()) fetch.set(datum);
   89           }
   90           continue;
   91         }
   92   
   93         switch (datum.getStatus()) {                // collect other info
   94         case CrawlDatum.STATUS_LINKED:
   95           CrawlDatum link = new CrawlDatum();
   96           link.set(datum);
   97           linked.add(link);
   98           break;
   99         case CrawlDatum.STATUS_SIGNATURE:
  100           signature = datum.getSignature();
  101           break;
  102         default:
  103           LOG.warn("Unknown status, key: " + key + ", datum: " + datum);
  104         }
  105       }
  106   
  107       // if it doesn't already exist, skip it
  108       if (!oldSet && !additionsAllowed) return;
  109       
  110       // if there is no fetched datum, perhaps there is a link
  111       if (!fetchSet && linked.size() > 0) {
  112         fetch = linked.get(0);
  113         fetchSet = true;
  114       }
  115       
  116       // still no new data - record only unchanged old data, if exists, and return
  117       if (!fetchSet) {
  118         if (oldSet) // at this point at least "old" should be present
  119           output.collect(key, old);
  120         else
  121           LOG.warn("Missing fetch and old value, signature=" + signature);
  122         return;
  123       }
  124       
  125       if (signature == null) signature = fetch.getSignature();
  126       long prevModifiedTime = oldSet ? old.getModifiedTime() : 0L;
  127       long prevFetchTime = oldSet ? old.getFetchTime() : 0L;
  128   
  129       // initialize with the latest version, be it fetch or link
  130       result.set(fetch);
  131       if (oldSet) {
  132         // copy metadata from old, if exists
  133         if (old.getMetaData().size() > 0) {
  134           result.putAllMetaData(old);
  135           // overlay with new, if any
  136           if (fetch.getMetaData().size() > 0)
  137             result.putAllMetaData(fetch);
  138         }
  139         // set the most recent valid value of modifiedTime
  140         if (old.getModifiedTime() > 0 && fetch.getModifiedTime() == 0) {
  141           result.setModifiedTime(old.getModifiedTime());
  142         }
  143       }
  144       
  145       switch (fetch.getStatus()) {                // determine new status
  146   
  147       case CrawlDatum.STATUS_LINKED:                // it was link
  148         if (oldSet) {                          // if old exists
  149           result.set(old);                          // use it
  150         } else {
  151           result = schedule.initializeSchedule((Text)key, result);
  152           result.setStatus(CrawlDatum.STATUS_DB_UNFETCHED);
  153           try {
  154             scfilters.initialScore((Text)key, result);
  155           } catch (ScoringFilterException e) {
  156             if (LOG.isWarnEnabled()) {
  157               LOG.warn("Cannot filter init score for url " + key +
  158                        ", using default: " + e.getMessage());
  159             }
  160             result.setScore(0.0f);
  161           }
  162         }
  163         break;
  164         
  165       case CrawlDatum.STATUS_FETCH_SUCCESS:         // succesful fetch
  166       case CrawlDatum.STATUS_FETCH_REDIR_TEMP:      // successful fetch, redirected
  167       case CrawlDatum.STATUS_FETCH_REDIR_PERM:
  168       case CrawlDatum.STATUS_FETCH_NOTMODIFIED:     // successful fetch, notmodified
  169         // determine the modification status
  170         int modified = FetchSchedule.STATUS_UNKNOWN;
  171         if (fetch.getStatus() == CrawlDatum.STATUS_FETCH_NOTMODIFIED) {
  172           modified = FetchSchedule.STATUS_NOTMODIFIED;
  173         } else {
  174           if (oldSet && old.getSignature() != null && signature != null) {
  175             if (SignatureComparator._compare(old.getSignature(), signature) != 0) {
  176               modified = FetchSchedule.STATUS_MODIFIED;
  177             } else {
  178               modified = FetchSchedule.STATUS_NOTMODIFIED;
  179             }
  180           }
  181         }
  182         // set the schedule
  183         result = schedule.setFetchSchedule((Text)key, result, prevFetchTime,
  184             prevModifiedTime, fetch.getFetchTime(), fetch.getModifiedTime(), modified);
  185         // set the result status and signature
  186         if (modified == FetchSchedule.STATUS_NOTMODIFIED) {
  187           result.setStatus(CrawlDatum.STATUS_DB_NOTMODIFIED);
  188           if (oldSet) result.setSignature(old.getSignature());
  189         } else {
  190           switch (fetch.getStatus()) {
  191           case CrawlDatum.STATUS_FETCH_SUCCESS:
  192             result.setStatus(CrawlDatum.STATUS_DB_FETCHED);
  193             break;
  194           case CrawlDatum.STATUS_FETCH_REDIR_PERM:
  195             result.setStatus(CrawlDatum.STATUS_DB_REDIR_PERM);
  196             break;
  197           case CrawlDatum.STATUS_FETCH_REDIR_TEMP:
  198             result.setStatus(CrawlDatum.STATUS_DB_REDIR_TEMP);
  199             break;
  200           default:
  201             LOG.warn("Unexpected status: " + fetch.getStatus() + " resetting to old status.");
  202             if (oldSet) result.setStatus(old.getStatus());
  203             else result.setStatus(CrawlDatum.STATUS_DB_UNFETCHED);
  204           }
  205           result.setSignature(signature);
  206         }
  207         // if fetchInterval is larger than the system-wide maximum, trigger
  208         // an unconditional recrawl. This prevents the page to be stuck at
  209         // NOTMODIFIED state, when the old fetched copy was already removed with
  210         // old segments.
  211         if (maxInterval < result.getFetchInterval())
  212           result = schedule.forceRefetch((Text)key, result, false);
  213         break;
  214       case CrawlDatum.STATUS_SIGNATURE:
  215         if (LOG.isWarnEnabled()) {
  216           LOG.warn("Lone CrawlDatum.STATUS_SIGNATURE: " + key);
  217         }   
  218         return;
  219       case CrawlDatum.STATUS_FETCH_RETRY:           // temporary failure
  220         if (oldSet) {
  221           result.setSignature(old.getSignature());  // use old signature
  222         }
  223         result = schedule.setPageRetrySchedule((Text)key, result, prevFetchTime,
  224             prevModifiedTime, fetch.getFetchTime());
  225         if (result.getRetriesSinceFetch() < retryMax) {
  226           result.setStatus(CrawlDatum.STATUS_DB_UNFETCHED);
  227         } else {
  228           result.setStatus(CrawlDatum.STATUS_DB_GONE);
  229         }
  230         break;
  231   
  232       case CrawlDatum.STATUS_FETCH_GONE:            // permanent failure
  233         if (oldSet)
  234           result.setSignature(old.getSignature());  // use old signature
  235         result.setStatus(CrawlDatum.STATUS_DB_GONE);
  236         result = schedule.setPageGoneSchedule((Text)key, result, prevFetchTime,
  237             prevModifiedTime, fetch.getFetchTime());
  238         break;
  239   
  240       default:
  241         throw new RuntimeException("Unknown status: " + fetch.getStatus() + " " + key);
  242       }
  243   
  244       try {
  245         scfilters.updateDbScore((Text)key, oldSet ? old : null, result, linked);
  246       } catch (Exception e) {
  247         if (LOG.isWarnEnabled()) {
  248           LOG.warn("Couldn't update score, key=" + key + ": " + e);
  249         }
  250       }
  251       // remove generation time, if any
  252       result.getMetaData().remove(Nutch.WRITABLE_GENERATE_TIME_KEY);
  253       output.collect(key, result);
  254     }
  255   
  256   }

Save This Page
Home » nutch-1.0 » org.apache.nutch » crawl » [javadoc | source]