1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17 package org.apache.nutch.protocol.http.api;
18
19 // JDK imports
20 import java.io.IOException;
21 import java.net.InetAddress;
22 import java.net.URL;
23 import java.net.UnknownHostException;
24 import java.util.HashMap;
25 import java.util.LinkedList;
26
27 // Commons Logging imports
28 import org.apache.commons.logging.Log;
29 import org.apache.commons.logging.LogFactory;
30
31 // Nutch imports
32 import org.apache.nutch.crawl.CrawlDatum;
33 import org.apache.nutch.net.protocols.Response;
34 import org.apache.nutch.protocol.Content;
35 import org.apache.nutch.protocol.Protocol;
36 import org.apache.nutch.protocol.ProtocolException;
37 import org.apache.nutch.protocol.ProtocolOutput;
38 import org.apache.nutch.protocol.ProtocolStatus;
39 import org.apache.nutch.protocol.RobotRules;
40 import org.apache.nutch.util.GZIPUtils;
41 import org.apache.nutch.util.DeflateUtils;
42 import org.apache.nutch.util.LogUtil;
43
44 // Hadoop imports
45 import org.apache.hadoop.conf.Configuration;
46 import org.apache.hadoop.io.Text;
47
48 /**
49 * @author Jérôme Charron
50 */
51 public abstract class HttpBase implements Protocol {
52
53
54 public static final int BUFFER_SIZE = 8 * 1024;
55
56 private static final byte[] EMPTY_CONTENT = new byte[0];
57
58 private RobotRulesParser robots = null;
59
60 /** The proxy hostname. */
61 protected String proxyHost = null;
62
63 /** The proxy port. */
64 protected int proxyPort = 8080;
65
66 /** Indicates if a proxy is used */
67 protected boolean useProxy = false;
68
69 /** The network timeout in millisecond */
70 protected int timeout = 10000;
71
72 /** The length limit for downloaded content, in bytes. */
73 protected int maxContent = 64 * 1024;
74
75 /** The number of times a thread will delay when trying to fetch a page. */
76 protected int maxDelays = 3;
77
78 /**
79 * The maximum number of threads that should be allowed
80 * to access a host at one time.
81 */
82 protected int maxThreadsPerHost = 1;
83
84 /**
85 * The number of seconds the fetcher will delay between
86 * successive requests to the same server.
87 */
88 protected long serverDelay = 1000;
89
90 /** The Nutch 'User-Agent' request header */
91 protected String userAgent = getAgentString(
92 "NutchCVS", null, "Nutch",
93 "http://lucene.apache.org/nutch/bot.html",
94 "nutch-agent@lucene.apache.org");
95
96
97 /**
98 * Maps from host to a Long naming the time it should be unblocked.
99 * The Long is zero while the host is in use, then set to now+wait when
100 * a request finishes. This way only one thread at a time accesses a
101 * host.
102 */
103 private static HashMap BLOCKED_ADDR_TO_TIME = new HashMap();
104
105 /**
106 * Maps a host to the number of threads accessing that host.
107 */
108 private static HashMap THREADS_PER_HOST_COUNT = new HashMap();
109
110 /**
111 * Queue of blocked hosts. This contains all of the non-zero entries
112 * from BLOCKED_ADDR_TO_TIME, ordered by increasing time.
113 */
114 private static LinkedList BLOCKED_ADDR_QUEUE = new LinkedList();
115
116 /** The default logger */
117 private final static Log LOGGER = LogFactory.getLog(HttpBase.class);
118
119 /** The specified logger */
120 private Log logger = LOGGER;
121
122 /** The nutch configuration */
123 private Configuration conf = null;
124
125 /** Do we block by IP addresses or by hostnames? */
126 private boolean byIP = true;
127
128 /** Do we use HTTP/1.1? */
129 protected boolean useHttp11 = false;
130
131 /** Skip page if Crawl-Delay longer than this value. */
132 protected long maxCrawlDelay = -1L;
133
134 /** Plugin should handle host blocking internally. */
135 protected boolean checkBlocking = true;
136
137 /** Plugin should handle robot rules checking internally. */
138 protected boolean checkRobots = true;
139
140 /** Creates a new instance of HttpBase */
141 public HttpBase() {
142 this(null);
143 }
144
145 /** Creates a new instance of HttpBase */
146 public HttpBase(Log logger) {
147 if (logger != null) {
148 this.logger = logger;
149 }
150 robots = new RobotRulesParser();
151 }
152
153 // Inherited Javadoc
154 public void setConf(Configuration conf) {
155 this.conf = conf;
156 this.proxyHost = conf.get("http.proxy.host");
157 this.proxyPort = conf.getInt("http.proxy.port", 8080);
158 this.useProxy = (proxyHost != null && proxyHost.length() > 0);
159 this.timeout = conf.getInt("http.timeout", 10000);
160 this.maxContent = conf.getInt("http.content.limit", 64 * 1024);
161 this.maxDelays = conf.getInt("http.max.delays", 3);
162 this.maxThreadsPerHost = conf.getInt("fetcher.threads.per.host", 1);
163 this.userAgent = getAgentString(conf.get("http.agent.name"), conf.get("http.agent.version"), conf
164 .get("http.agent.description"), conf.get("http.agent.url"), conf.get("http.agent.email"));
165 this.serverDelay = (long) (conf.getFloat("fetcher.server.delay", 1.0f) * 1000);
166 this.maxCrawlDelay = (long)(conf.getInt("fetcher.max.crawl.delay", -1) * 1000);
167 // backward-compatible default setting
168 this.byIP = conf.getBoolean("fetcher.threads.per.host.by.ip", true);
169 this.useHttp11 = conf.getBoolean("http.useHttp11", false);
170 this.robots.setConf(conf);
171 this.checkBlocking = conf.getBoolean(Protocol.CHECK_BLOCKING, true);
172 this.checkRobots = conf.getBoolean(Protocol.CHECK_ROBOTS, true);
173 logConf();
174 }
175
176 // Inherited Javadoc
177 public Configuration getConf() {
178 return this.conf;
179 }
180
181
182
183 public ProtocolOutput getProtocolOutput(Text url, CrawlDatum datum) {
184
185 String urlString = url.toString();
186 try {
187 URL u = new URL(urlString);
188
189 if (checkRobots) {
190 try {
191 if (!robots.isAllowed(this, u)) {
192 return new ProtocolOutput(null, new ProtocolStatus(ProtocolStatus.ROBOTS_DENIED, url));
193 }
194 } catch (Throwable e) {
195 // XXX Maybe bogus: assume this is allowed.
196 if (logger.isTraceEnabled()) {
197 logger.trace("Exception checking robot rules for " + url + ": " + e);
198 }
199 }
200 }
201
202 long crawlDelay = robots.getCrawlDelay(this, u);
203 long delay = crawlDelay > 0 ? crawlDelay : serverDelay;
204 if (checkBlocking && maxCrawlDelay >= 0 && delay > maxCrawlDelay) {
205 // skip this page, otherwise the thread would block for too long.
206 LOGGER.info("Skipping: " + u + " exceeds fetcher.max.crawl.delay, max="
207 + (maxCrawlDelay / 1000) + ", Crawl-Delay=" + (delay / 1000));
208 return new ProtocolOutput(null, ProtocolStatus.STATUS_WOULDBLOCK);
209 }
210 String host = null;
211 if (checkBlocking) {
212 try {
213 host = blockAddr(u, delay);
214 } catch (BlockedException be) {
215 return new ProtocolOutput(null, ProtocolStatus.STATUS_BLOCKED);
216 }
217 }
218 Response response;
219 try {
220 response = getResponse(u, datum, false); // make a request
221 } finally {
222 if (checkBlocking) unblockAddr(host, delay);
223 }
224
225 int code = response.getCode();
226 byte[] content = response.getContent();
227 Content c = new Content(u.toString(), u.toString(),
228 (content == null ? EMPTY_CONTENT : content),
229 response.getHeader("Content-Type"),
230 response.getHeaders(), this.conf);
231
232 if (code == 200) { // got a good response
233 return new ProtocolOutput(c); // return it
234
235 } else if (code == 410) { // page is gone
236 return new ProtocolOutput(c, new ProtocolStatus(ProtocolStatus.GONE, "Http: " + code + " url=" + url));
237
238 } else if (code >= 300 && code < 400) { // handle redirect
239 String location = response.getHeader("Location");
240 // some broken servers, such as MS IIS, use lowercase header name...
241 if (location == null) location = response.getHeader("location");
242 if (location == null) location = "";
243 u = new URL(u, location);
244 int protocolStatusCode;
245 switch (code) {
246 case 300: // multiple choices, preferred value in Location
247 protocolStatusCode = ProtocolStatus.MOVED;
248 break;
249 case 301: // moved permanently
250 case 305: // use proxy (Location is URL of proxy)
251 protocolStatusCode = ProtocolStatus.MOVED;
252 break;
253 case 302: // found (temporarily moved)
254 case 303: // see other (redirect after POST)
255 case 307: // temporary redirect
256 protocolStatusCode = ProtocolStatus.TEMP_MOVED;
257 break;
258 case 304: // not modified
259 protocolStatusCode = ProtocolStatus.NOTMODIFIED;
260 break;
261 default:
262 protocolStatusCode = ProtocolStatus.MOVED;
263 }
264 // handle this in the higher layer.
265 return new ProtocolOutput(c, new ProtocolStatus(protocolStatusCode, u));
266 } else if (code == 400) { // bad request, mark as GONE
267 if (logger.isTraceEnabled()) { logger.trace("400 Bad request: " + u); }
268 return new ProtocolOutput(c, new ProtocolStatus(ProtocolStatus.GONE, u));
269 } else if (code == 401) { // requires authorization, but no valid auth provided.
270 if (logger.isTraceEnabled()) { logger.trace("401 Authentication Required"); }
271 return new ProtocolOutput(c, new ProtocolStatus(ProtocolStatus.ACCESS_DENIED, "Authentication required: "
272 + urlString));
273 } else if (code == 404) {
274 return new ProtocolOutput(c, new ProtocolStatus(ProtocolStatus.NOTFOUND, u));
275 } else if (code == 410) { // permanently GONE
276 return new ProtocolOutput(c, new ProtocolStatus(ProtocolStatus.GONE, u));
277 } else {
278 return new ProtocolOutput(c, new ProtocolStatus(ProtocolStatus.EXCEPTION, "Http code=" + code + ", url="
279 + u));
280 }
281 } catch (Throwable e) {
282 e.printStackTrace(LogUtil.getErrorStream(logger));
283 return new ProtocolOutput(null, new ProtocolStatus(e));
284 }
285 }
286
287 /* -------------------------- *
288 * </implementation:Protocol> *
289 * -------------------------- */
290
291
292 public String getProxyHost() {
293 return proxyHost;
294 }
295
296 public int getProxyPort() {
297 return proxyPort;
298 }
299
300 public boolean useProxy() {
301 return useProxy;
302 }
303
304 public int getTimeout() {
305 return timeout;
306 }
307
308 public int getMaxContent() {
309 return maxContent;
310 }
311
312 public int getMaxDelays() {
313 return maxDelays;
314 }
315
316 public int getMaxThreadsPerHost() {
317 return maxThreadsPerHost;
318 }
319
320 public long getServerDelay() {
321 return serverDelay;
322 }
323
324 public String getUserAgent() {
325 return userAgent;
326 }
327
328 public boolean getUseHttp11() {
329 return useHttp11;
330 }
331
332 private String blockAddr(URL url, long crawlDelay) throws ProtocolException {
333
334 String host;
335 if (byIP) {
336 try {
337 InetAddress addr = InetAddress.getByName(url.getHost());
338 host = addr.getHostAddress();
339 } catch (UnknownHostException e) {
340 // unable to resolve it, so don't fall back to host name
341 throw new HttpException(e);
342 }
343 } else {
344 host = url.getHost();
345 if (host == null)
346 throw new HttpException("Unknown host for url: " + url);
347 host = host.toLowerCase();
348 }
349
350 int delays = 0;
351 while (true) {
352 cleanExpiredServerBlocks(); // free held addresses
353
354 Long time;
355 synchronized (BLOCKED_ADDR_TO_TIME) {
356 time = (Long) BLOCKED_ADDR_TO_TIME.get(host);
357 if (time == null) { // address is free
358
359 // get # of threads already accessing this addr
360 Integer counter = (Integer)THREADS_PER_HOST_COUNT.get(host);
361 int count = (counter == null) ? 0 : counter.intValue();
362
363 count++; // increment & store
364 THREADS_PER_HOST_COUNT.put(host, new Integer(count));
365
366 if (count >= maxThreadsPerHost) {
367 BLOCKED_ADDR_TO_TIME.put(host, new Long(0)); // block it
368 }
369 return host;
370 }
371 }
372
373 if (delays == maxDelays)
374 throw new BlockedException("Exceeded http.max.delays: retry later.");
375
376 long done = time.longValue();
377 long now = System.currentTimeMillis();
378 long sleep = 0;
379 if (done == 0) { // address is still in use
380 sleep = crawlDelay; // wait at least delay
381
382 } else if (now < done) { // address is on hold
383 sleep = done - now; // wait until its free
384 }
385
386 try {
387 Thread.sleep(sleep);
388 } catch (InterruptedException e) {}
389 delays++;
390 }
391 }
392
393 private void unblockAddr(String host, long crawlDelay) {
394 synchronized (BLOCKED_ADDR_TO_TIME) {
395 int addrCount = ((Integer)THREADS_PER_HOST_COUNT.get(host)).intValue();
396 if (addrCount == 1) {
397 THREADS_PER_HOST_COUNT.remove(host);
398 BLOCKED_ADDR_QUEUE.addFirst(host);
399 BLOCKED_ADDR_TO_TIME.put
400 (host, new Long(System.currentTimeMillis() + crawlDelay));
401 } else {
402 THREADS_PER_HOST_COUNT.put(host, new Integer(addrCount - 1));
403 }
404 }
405 }
406
407 private static void cleanExpiredServerBlocks() {
408 synchronized (BLOCKED_ADDR_TO_TIME) {
409 for (int i = BLOCKED_ADDR_QUEUE.size() - 1; i >= 0; i--) {
410 String host = (String) BLOCKED_ADDR_QUEUE.get(i);
411 long time = ((Long) BLOCKED_ADDR_TO_TIME.get(host)).longValue();
412 if (time <= System.currentTimeMillis()) {
413 BLOCKED_ADDR_TO_TIME.remove(host);
414 BLOCKED_ADDR_QUEUE.remove(i);
415 }
416 }
417 }
418 }
419
420 private static String getAgentString(String agentName,
421 String agentVersion,
422 String agentDesc,
423 String agentURL,
424 String agentEmail) {
425
426 if ( (agentName == null) || (agentName.trim().length() == 0) ) {
427 // TODO : NUTCH-258
428 if (LOGGER.isFatalEnabled()) {
429 LOGGER.fatal("No User-Agent string set (http.agent.name)!");
430 }
431 }
432
433 StringBuffer buf= new StringBuffer();
434
435 buf.append(agentName);
436 if (agentVersion != null) {
437 buf.append("/");
438 buf.append(agentVersion);
439 }
440 if ( ((agentDesc != null) && (agentDesc.length() != 0))
441 || ((agentEmail != null) && (agentEmail.length() != 0))
442 || ((agentURL != null) && (agentURL.length() != 0)) ) {
443 buf.append(" (");
444
445 if ((agentDesc != null) && (agentDesc.length() != 0)) {
446 buf.append(agentDesc);
447 if ( (agentURL != null) || (agentEmail != null) )
448 buf.append("; ");
449 }
450
451 if ((agentURL != null) && (agentURL.length() != 0)) {
452 buf.append(agentURL);
453 if (agentEmail != null)
454 buf.append("; ");
455 }
456
457 if ((agentEmail != null) && (agentEmail.length() != 0))
458 buf.append(agentEmail);
459
460 buf.append(")");
461 }
462 return buf.toString();
463 }
464
465 protected void logConf() {
466 if (logger.isInfoEnabled()) {
467 logger.info("http.proxy.host = " + proxyHost);
468 logger.info("http.proxy.port = " + proxyPort);
469 logger.info("http.timeout = " + timeout);
470 logger.info("http.content.limit = " + maxContent);
471 logger.info("http.agent = " + userAgent);
472 logger.info(Protocol.CHECK_BLOCKING + " = " + checkBlocking);
473 logger.info(Protocol.CHECK_ROBOTS + " = " + checkRobots);
474 if (checkBlocking) {
475 logger.info("fetcher.server.delay = " + serverDelay);
476 logger.info("http.max.delays = " + maxDelays);
477 }
478 }
479 }
480
481 public byte[] processGzipEncoded(byte[] compressed, URL url) throws IOException {
482
483 if (LOGGER.isTraceEnabled()) { LOGGER.trace("uncompressing...."); }
484
485 byte[] content;
486 if (getMaxContent() >= 0) {
487 content = GZIPUtils.unzipBestEffort(compressed, getMaxContent());
488 } else {
489 content = GZIPUtils.unzipBestEffort(compressed);
490 }
491
492 if (content == null)
493 throw new IOException("unzipBestEffort returned null");
494
495 if (LOGGER.isTraceEnabled()) {
496 LOGGER.trace("fetched " + compressed.length
497 + " bytes of compressed content (expanded to "
498 + content.length + " bytes) from " + url);
499 }
500 return content;
501 }
502
503 public byte[] processDeflateEncoded(byte[] compressed, URL url) throws IOException {
504
505 if (LOGGER.isTraceEnabled()) { LOGGER.trace("inflating...."); }
506
507 byte[] content = DeflateUtils.inflateBestEffort(compressed, getMaxContent());
508
509 if (content == null)
510 throw new IOException("inflateBestEffort returned null");
511
512 if (LOGGER.isTraceEnabled()) {
513 LOGGER.trace("fetched " + compressed.length
514 + " bytes of compressed content (expanded to "
515 + content.length + " bytes) from " + url);
516 }
517 return content;
518 }
519
520 protected static void main(HttpBase http, String[] args) throws Exception {
521 boolean verbose = false;
522 String url = null;
523
524 String usage = "Usage: Http [-verbose] [-timeout N] url";
525
526 if (args.length == 0) {
527 System.err.println(usage);
528 System.exit(-1);
529 }
530
531 for (int i = 0; i < args.length; i++) { // parse command line
532 if (args[i].equals("-timeout")) { // found -timeout option
533 http.timeout = Integer.parseInt(args[++i]) * 1000;
534 } else if (args[i].equals("-verbose")) { // found -verbose option
535 verbose = true;
536 } else if (i != args.length - 1) {
537 System.err.println(usage);
538 System.exit(-1);
539 } else // root is required parameter
540 url = args[i];
541 }
542
543 // if (verbose) {
544 // LOGGER.setLevel(Level.FINE);
545 // }
546
547 ProtocolOutput out = http.getProtocolOutput(new Text(url), new CrawlDatum());
548 Content content = out.getContent();
549
550 System.out.println("Status: " + out.getStatus());
551 if (content != null) {
552 System.out.println("Content Type: " + content.getContentType());
553 System.out.println("Content Length: " +
554 content.getMetadata().get(Response.CONTENT_LENGTH));
555 System.out.println("Content:");
556 String text = new String(content.getContent());
557 System.out.println(text);
558 }
559
560 }
561
562
563 protected abstract Response getResponse(URL url,
564 CrawlDatum datum,
565 boolean followRedirects)
566 throws ProtocolException, IOException;
567
568 public RobotRules getRobotRules(Text url, CrawlDatum datum) {
569 return robots.getRobotRulesSet(this, url);
570 }
571
572 }