Save This Page
Home » nutch-1.0 » org.apache.nutch » protocol » http » api » [javadoc | source]
    1   /**
    2    * Licensed to the Apache Software Foundation (ASF) under one or more
    3    * contributor license agreements.  See the NOTICE file distributed with
    4    * this work for additional information regarding copyright ownership.
    5    * The ASF licenses this file to You under the Apache License, Version 2.0
    6    * (the "License"); you may not use this file except in compliance with
    7    * the License.  You may obtain a copy of the License at
    8    *
    9    *     http://www.apache.org/licenses/LICENSE-2.0
   10    *
   11    * Unless required by applicable law or agreed to in writing, software
   12    * distributed under the License is distributed on an "AS IS" BASIS,
   13    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   14    * See the License for the specific language governing permissions and
   15    * limitations under the License.
   16    */
   17   package org.apache.nutch.protocol.http.api;
   18   
   19   // JDK imports
   20   import java.io.IOException;
   21   import java.net.InetAddress;
   22   import java.net.URL;
   23   import java.net.UnknownHostException;
   24   import java.util.HashMap;
   25   import java.util.LinkedList;
   26   
   27   // Commons Logging imports
   28   import org.apache.commons.logging.Log;
   29   import org.apache.commons.logging.LogFactory;
   30   
   31   // Nutch imports
   32   import org.apache.nutch.crawl.CrawlDatum;
   33   import org.apache.nutch.net.protocols.Response;
   34   import org.apache.nutch.protocol.Content;
   35   import org.apache.nutch.protocol.Protocol;
   36   import org.apache.nutch.protocol.ProtocolException;
   37   import org.apache.nutch.protocol.ProtocolOutput;
   38   import org.apache.nutch.protocol.ProtocolStatus;
   39   import org.apache.nutch.protocol.RobotRules;
   40   import org.apache.nutch.util.GZIPUtils;
   41   import org.apache.nutch.util.DeflateUtils;
   42   import org.apache.nutch.util.LogUtil;
   43   
   44   // Hadoop imports
   45   import org.apache.hadoop.conf.Configuration;
   46   import org.apache.hadoop.io.Text;
   47   
   48   /**
   49    * @author Jérôme Charron
   50    */
   51   public abstract class HttpBase implements Protocol {
   52     
   53     
   54     public static final int BUFFER_SIZE = 8 * 1024;
   55     
   56     private static final byte[] EMPTY_CONTENT = new byte[0];
   57   
   58     private RobotRulesParser robots = null;
   59    
   60     /** The proxy hostname. */ 
   61     protected String proxyHost = null;
   62   
   63     /** The proxy port. */
   64     protected int proxyPort = 8080; 
   65   
   66     /** Indicates if a proxy is used */
   67     protected boolean useProxy = false;
   68   
   69     /** The network timeout in millisecond */
   70     protected int timeout = 10000;
   71   
   72     /** The length limit for downloaded content, in bytes. */
   73     protected int maxContent = 64 * 1024; 
   74   
   75     /** The number of times a thread will delay when trying to fetch a page. */
   76     protected int maxDelays = 3;
   77   
   78     /**
   79      * The maximum number of threads that should be allowed
   80      * to access a host at one time.
   81      */
   82     protected int maxThreadsPerHost = 1; 
   83   
   84     /**
   85      * The number of seconds the fetcher will delay between
   86      * successive requests to the same server.
   87      */
   88     protected long serverDelay = 1000;
   89   
   90     /** The Nutch 'User-Agent' request header */
   91     protected String userAgent = getAgentString(
   92                           "NutchCVS", null, "Nutch",
   93                           "http://lucene.apache.org/nutch/bot.html",
   94                           "nutch-agent@lucene.apache.org");
   95   
   96       
   97     /**
   98      * Maps from host to a Long naming the time it should be unblocked.
   99      * The Long is zero while the host is in use, then set to now+wait when
  100      * a request finishes.  This way only one thread at a time accesses a
  101      * host.
  102      */
  103     private static HashMap BLOCKED_ADDR_TO_TIME = new HashMap();
  104     
  105     /**
  106      * Maps a host to the number of threads accessing that host.
  107      */
  108     private static HashMap THREADS_PER_HOST_COUNT = new HashMap();
  109     
  110     /**
  111      * Queue of blocked hosts.  This contains all of the non-zero entries
  112      * from BLOCKED_ADDR_TO_TIME, ordered by increasing time.
  113      */
  114     private static LinkedList BLOCKED_ADDR_QUEUE = new LinkedList();
  115     
  116     /** The default logger */
  117     private final static Log LOGGER = LogFactory.getLog(HttpBase.class);
  118   
  119     /** The specified logger */
  120     private Log logger = LOGGER;
  121    
  122     /** The nutch configuration */
  123     private Configuration conf = null;
  124     
  125     /** Do we block by IP addresses or by hostnames? */
  126     private boolean byIP = true;
  127    
  128     /** Do we use HTTP/1.1? */
  129     protected boolean useHttp11 = false;
  130     
  131     /** Skip page if Crawl-Delay longer than this value. */
  132     protected long maxCrawlDelay = -1L;
  133   
  134     /** Plugin should handle host blocking internally. */
  135     protected boolean checkBlocking = true;
  136     
  137     /** Plugin should handle robot rules checking internally. */
  138     protected boolean checkRobots = true;
  139   
  140     /** Creates a new instance of HttpBase */
  141     public HttpBase() {
  142       this(null);
  143     }
  144     
  145     /** Creates a new instance of HttpBase */
  146     public HttpBase(Log logger) {
  147       if (logger != null) {
  148         this.logger = logger;
  149       }
  150       robots = new RobotRulesParser();
  151     }
  152     
  153      // Inherited Javadoc
  154       public void setConf(Configuration conf) {
  155           this.conf = conf;
  156           this.proxyHost = conf.get("http.proxy.host");
  157           this.proxyPort = conf.getInt("http.proxy.port", 8080);
  158           this.useProxy = (proxyHost != null && proxyHost.length() > 0);
  159           this.timeout = conf.getInt("http.timeout", 10000);
  160           this.maxContent = conf.getInt("http.content.limit", 64 * 1024);
  161           this.maxDelays = conf.getInt("http.max.delays", 3);
  162           this.maxThreadsPerHost = conf.getInt("fetcher.threads.per.host", 1);
  163           this.userAgent = getAgentString(conf.get("http.agent.name"), conf.get("http.agent.version"), conf
  164                   .get("http.agent.description"), conf.get("http.agent.url"), conf.get("http.agent.email"));
  165           this.serverDelay = (long) (conf.getFloat("fetcher.server.delay", 1.0f) * 1000);
  166           this.maxCrawlDelay = (long)(conf.getInt("fetcher.max.crawl.delay", -1) * 1000);
  167           // backward-compatible default setting
  168           this.byIP = conf.getBoolean("fetcher.threads.per.host.by.ip", true);
  169           this.useHttp11 = conf.getBoolean("http.useHttp11", false);
  170           this.robots.setConf(conf);
  171           this.checkBlocking = conf.getBoolean(Protocol.CHECK_BLOCKING, true);
  172           this.checkRobots = conf.getBoolean(Protocol.CHECK_ROBOTS, true);
  173           logConf();
  174       }
  175   
  176     // Inherited Javadoc
  177     public Configuration getConf() {
  178       return this.conf;
  179     }
  180      
  181     
  182     
  183     public ProtocolOutput getProtocolOutput(Text url, CrawlDatum datum) {
  184       
  185       String urlString = url.toString();
  186       try {
  187         URL u = new URL(urlString);
  188         
  189         if (checkRobots) {
  190           try {
  191             if (!robots.isAllowed(this, u)) {
  192               return new ProtocolOutput(null, new ProtocolStatus(ProtocolStatus.ROBOTS_DENIED, url));
  193             }
  194           } catch (Throwable e) {
  195             // XXX Maybe bogus: assume this is allowed.
  196             if (logger.isTraceEnabled()) {
  197               logger.trace("Exception checking robot rules for " + url + ": " + e);
  198             }
  199           }
  200         }
  201         
  202         long crawlDelay = robots.getCrawlDelay(this, u);
  203         long delay = crawlDelay > 0 ? crawlDelay : serverDelay;
  204         if (checkBlocking && maxCrawlDelay >= 0 && delay > maxCrawlDelay) {
  205           // skip this page, otherwise the thread would block for too long.
  206           LOGGER.info("Skipping: " + u + " exceeds fetcher.max.crawl.delay, max="
  207                   + (maxCrawlDelay / 1000) + ", Crawl-Delay=" + (delay / 1000));
  208           return new ProtocolOutput(null, ProtocolStatus.STATUS_WOULDBLOCK);
  209         }
  210         String host = null;
  211         if (checkBlocking) {
  212           try {
  213             host = blockAddr(u, delay);
  214           } catch (BlockedException be) {
  215             return new ProtocolOutput(null, ProtocolStatus.STATUS_BLOCKED);
  216           }
  217         }
  218         Response response;
  219         try {
  220           response = getResponse(u, datum, false); // make a request
  221         } finally {
  222           if (checkBlocking) unblockAddr(host, delay);
  223         }
  224         
  225         int code = response.getCode();
  226         byte[] content = response.getContent();
  227         Content c = new Content(u.toString(), u.toString(),
  228                                 (content == null ? EMPTY_CONTENT : content),
  229                                 response.getHeader("Content-Type"),
  230                                 response.getHeaders(), this.conf);
  231         
  232         if (code == 200) { // got a good response
  233           return new ProtocolOutput(c); // return it
  234           
  235         } else if (code == 410) { // page is gone
  236           return new ProtocolOutput(c, new ProtocolStatus(ProtocolStatus.GONE, "Http: " + code + " url=" + url));
  237           
  238         } else if (code >= 300 && code < 400) { // handle redirect
  239           String location = response.getHeader("Location");
  240           // some broken servers, such as MS IIS, use lowercase header name...
  241           if (location == null) location = response.getHeader("location");
  242           if (location == null) location = "";
  243           u = new URL(u, location);
  244           int protocolStatusCode;
  245           switch (code) {
  246             case 300:   // multiple choices, preferred value in Location
  247               protocolStatusCode = ProtocolStatus.MOVED;
  248               break;
  249             case 301:   // moved permanently
  250             case 305:   // use proxy (Location is URL of proxy)
  251               protocolStatusCode = ProtocolStatus.MOVED;
  252               break;
  253             case 302:   // found (temporarily moved)
  254             case 303:   // see other (redirect after POST)
  255             case 307:   // temporary redirect
  256               protocolStatusCode = ProtocolStatus.TEMP_MOVED;
  257               break;
  258             case 304:   // not modified
  259               protocolStatusCode = ProtocolStatus.NOTMODIFIED;
  260               break;
  261             default:
  262               protocolStatusCode = ProtocolStatus.MOVED;
  263           }
  264           // handle this in the higher layer.
  265           return new ProtocolOutput(c, new ProtocolStatus(protocolStatusCode, u));
  266         } else if (code == 400) { // bad request, mark as GONE
  267           if (logger.isTraceEnabled()) { logger.trace("400 Bad request: " + u); }
  268           return new ProtocolOutput(c, new ProtocolStatus(ProtocolStatus.GONE, u));
  269         } else if (code == 401) { // requires authorization, but no valid auth provided.
  270           if (logger.isTraceEnabled()) { logger.trace("401 Authentication Required"); }
  271           return new ProtocolOutput(c, new ProtocolStatus(ProtocolStatus.ACCESS_DENIED, "Authentication required: "
  272                   + urlString));
  273         } else if (code == 404) {
  274           return new ProtocolOutput(c, new ProtocolStatus(ProtocolStatus.NOTFOUND, u));
  275         } else if (code == 410) { // permanently GONE
  276           return new ProtocolOutput(c, new ProtocolStatus(ProtocolStatus.GONE, u));
  277         } else {
  278           return new ProtocolOutput(c, new ProtocolStatus(ProtocolStatus.EXCEPTION, "Http code=" + code + ", url="
  279                   + u));
  280         }
  281       } catch (Throwable e) {
  282         e.printStackTrace(LogUtil.getErrorStream(logger));
  283         return new ProtocolOutput(null, new ProtocolStatus(e));
  284       }
  285     }
  286     
  287     /* -------------------------- *
  288      * </implementation:Protocol> *
  289      * -------------------------- */
  290   
  291   
  292     public String getProxyHost() {
  293       return proxyHost;
  294     }
  295   
  296     public int getProxyPort() {
  297       return proxyPort;
  298     }
  299   
  300     public boolean useProxy() {
  301       return useProxy;
  302     }
  303   
  304     public int getTimeout() {
  305       return timeout;
  306     }
  307   
  308     public int getMaxContent() {
  309       return maxContent;
  310     }
  311   
  312     public int getMaxDelays() {
  313       return maxDelays;
  314     }
  315   
  316     public int getMaxThreadsPerHost() {
  317       return maxThreadsPerHost;
  318     }
  319   
  320     public long getServerDelay() {
  321       return serverDelay;
  322     }
  323   
  324     public String getUserAgent() {
  325       return userAgent;
  326     }
  327     
  328     public boolean getUseHttp11() {
  329       return useHttp11;
  330     }
  331     
  332     private String blockAddr(URL url, long crawlDelay) throws ProtocolException {
  333       
  334       String host;
  335       if (byIP) {
  336         try {
  337           InetAddress addr = InetAddress.getByName(url.getHost());
  338           host = addr.getHostAddress();
  339         } catch (UnknownHostException e) {
  340           // unable to resolve it, so don't fall back to host name
  341           throw new HttpException(e);
  342         }
  343       } else {
  344         host = url.getHost();
  345         if (host == null)
  346           throw new HttpException("Unknown host for url: " + url);
  347         host = host.toLowerCase();
  348       }
  349       
  350       int delays = 0;
  351       while (true) {
  352         cleanExpiredServerBlocks();                 // free held addresses
  353         
  354         Long time;
  355         synchronized (BLOCKED_ADDR_TO_TIME) {
  356           time = (Long) BLOCKED_ADDR_TO_TIME.get(host);
  357           if (time == null) {                       // address is free
  358             
  359             // get # of threads already accessing this addr
  360             Integer counter = (Integer)THREADS_PER_HOST_COUNT.get(host);
  361             int count = (counter == null) ? 0 : counter.intValue();
  362             
  363             count++;                              // increment & store
  364             THREADS_PER_HOST_COUNT.put(host, new Integer(count));
  365             
  366             if (count >= maxThreadsPerHost) {
  367               BLOCKED_ADDR_TO_TIME.put(host, new Long(0)); // block it
  368             }
  369             return host;
  370           }
  371         }
  372         
  373         if (delays == maxDelays)
  374           throw new BlockedException("Exceeded http.max.delays: retry later.");
  375         
  376         long done = time.longValue();
  377         long now = System.currentTimeMillis();
  378         long sleep = 0;
  379         if (done == 0) {                            // address is still in use
  380           sleep = crawlDelay;                      // wait at least delay
  381           
  382         } else if (now < done) {                    // address is on hold
  383           sleep = done - now;                       // wait until its free
  384         }
  385         
  386         try {
  387           Thread.sleep(sleep);
  388         } catch (InterruptedException e) {}
  389         delays++;
  390       }
  391     }
  392     
  393     private void unblockAddr(String host, long crawlDelay) {
  394       synchronized (BLOCKED_ADDR_TO_TIME) {
  395         int addrCount = ((Integer)THREADS_PER_HOST_COUNT.get(host)).intValue();
  396         if (addrCount == 1) {
  397           THREADS_PER_HOST_COUNT.remove(host);
  398           BLOCKED_ADDR_QUEUE.addFirst(host);
  399           BLOCKED_ADDR_TO_TIME.put
  400                   (host, new Long(System.currentTimeMillis() + crawlDelay));
  401         } else {
  402           THREADS_PER_HOST_COUNT.put(host, new Integer(addrCount - 1));
  403         }
  404       }
  405     }
  406     
  407     private static void cleanExpiredServerBlocks() {
  408       synchronized (BLOCKED_ADDR_TO_TIME) {
  409         for (int i = BLOCKED_ADDR_QUEUE.size() - 1; i >= 0; i--) {
  410           String host = (String) BLOCKED_ADDR_QUEUE.get(i);
  411           long time = ((Long) BLOCKED_ADDR_TO_TIME.get(host)).longValue();
  412           if (time <= System.currentTimeMillis()) {
  413             BLOCKED_ADDR_TO_TIME.remove(host);
  414             BLOCKED_ADDR_QUEUE.remove(i);
  415           }
  416         }
  417       }
  418     }
  419     
  420     private static String getAgentString(String agentName,
  421                                          String agentVersion,
  422                                          String agentDesc,
  423                                          String agentURL,
  424                                          String agentEmail) {
  425       
  426       if ( (agentName == null) || (agentName.trim().length() == 0) ) {
  427         // TODO : NUTCH-258
  428         if (LOGGER.isFatalEnabled()) {
  429           LOGGER.fatal("No User-Agent string set (http.agent.name)!");
  430         }
  431       }
  432       
  433       StringBuffer buf= new StringBuffer();
  434       
  435       buf.append(agentName);
  436       if (agentVersion != null) {
  437         buf.append("/");
  438         buf.append(agentVersion);
  439       }
  440       if ( ((agentDesc != null) && (agentDesc.length() != 0))
  441       || ((agentEmail != null) && (agentEmail.length() != 0))
  442       || ((agentURL != null) && (agentURL.length() != 0)) ) {
  443         buf.append(" (");
  444         
  445         if ((agentDesc != null) && (agentDesc.length() != 0)) {
  446           buf.append(agentDesc);
  447           if ( (agentURL != null) || (agentEmail != null) )
  448             buf.append("; ");
  449         }
  450         
  451         if ((agentURL != null) && (agentURL.length() != 0)) {
  452           buf.append(agentURL);
  453           if (agentEmail != null)
  454             buf.append("; ");
  455         }
  456         
  457         if ((agentEmail != null) && (agentEmail.length() != 0))
  458           buf.append(agentEmail);
  459         
  460         buf.append(")");
  461       }
  462       return buf.toString();
  463     }
  464   
  465     protected void logConf() {
  466       if (logger.isInfoEnabled()) {
  467         logger.info("http.proxy.host = " + proxyHost);
  468         logger.info("http.proxy.port = " + proxyPort);
  469         logger.info("http.timeout = " + timeout);
  470         logger.info("http.content.limit = " + maxContent);
  471         logger.info("http.agent = " + userAgent);
  472         logger.info(Protocol.CHECK_BLOCKING + " = " + checkBlocking);
  473         logger.info(Protocol.CHECK_ROBOTS + " = " + checkRobots);
  474         if (checkBlocking) {
  475           logger.info("fetcher.server.delay = " + serverDelay);
  476           logger.info("http.max.delays = " + maxDelays);
  477         }
  478       }
  479     }
  480     
  481     public byte[] processGzipEncoded(byte[] compressed, URL url) throws IOException {
  482   
  483       if (LOGGER.isTraceEnabled()) { LOGGER.trace("uncompressing...."); }
  484   
  485       byte[] content;
  486       if (getMaxContent() >= 0) {
  487           content = GZIPUtils.unzipBestEffort(compressed, getMaxContent());
  488       } else {
  489           content = GZIPUtils.unzipBestEffort(compressed);
  490       } 
  491   
  492       if (content == null)
  493         throw new IOException("unzipBestEffort returned null");
  494   
  495       if (LOGGER.isTraceEnabled()) {
  496         LOGGER.trace("fetched " + compressed.length
  497                    + " bytes of compressed content (expanded to "
  498                    + content.length + " bytes) from " + url);
  499       }
  500       return content;
  501     }
  502   
  503     public byte[] processDeflateEncoded(byte[] compressed, URL url) throws IOException {
  504   
  505       if (LOGGER.isTraceEnabled()) { LOGGER.trace("inflating...."); }
  506   
  507       byte[] content = DeflateUtils.inflateBestEffort(compressed, getMaxContent());
  508   
  509       if (content == null)
  510         throw new IOException("inflateBestEffort returned null");
  511   
  512       if (LOGGER.isTraceEnabled()) {
  513         LOGGER.trace("fetched " + compressed.length
  514                    + " bytes of compressed content (expanded to "
  515                    + content.length + " bytes) from " + url);
  516       }
  517       return content;
  518     }
  519   
  520     protected static void main(HttpBase http, String[] args) throws Exception {
  521       boolean verbose = false;
  522       String url = null;
  523       
  524       String usage = "Usage: Http [-verbose] [-timeout N] url";
  525       
  526       if (args.length == 0) {
  527         System.err.println(usage);
  528         System.exit(-1);
  529       }
  530       
  531       for (int i = 0; i < args.length; i++) { // parse command line
  532         if (args[i].equals("-timeout")) { // found -timeout option
  533           http.timeout = Integer.parseInt(args[++i]) * 1000;
  534         } else if (args[i].equals("-verbose")) { // found -verbose option
  535           verbose = true;
  536         } else if (i != args.length - 1) {
  537           System.err.println(usage);
  538           System.exit(-1);
  539         } else // root is required parameter
  540           url = args[i];
  541       }
  542       
  543   //    if (verbose) {
  544   //      LOGGER.setLevel(Level.FINE);
  545   //    }
  546       
  547       ProtocolOutput out = http.getProtocolOutput(new Text(url), new CrawlDatum());
  548       Content content = out.getContent();
  549       
  550       System.out.println("Status: " + out.getStatus());
  551       if (content != null) {
  552         System.out.println("Content Type: " + content.getContentType());
  553         System.out.println("Content Length: " +
  554                            content.getMetadata().get(Response.CONTENT_LENGTH));
  555         System.out.println("Content:");
  556         String text = new String(content.getContent());
  557         System.out.println(text);
  558       }
  559       
  560     }
  561     
  562     
  563     protected abstract Response getResponse(URL url,
  564                                             CrawlDatum datum,
  565                                             boolean followRedirects)
  566       throws ProtocolException, IOException;
  567   
  568     public RobotRules getRobotRules(Text url, CrawlDatum datum) {
  569       return robots.getRobotRulesSet(this, url);
  570     }
  571   
  572   }

Save This Page
Home » nutch-1.0 » org.apache.nutch » protocol » http » api » [javadoc | source]