1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17 package org.apache.nutch.fetcher;
18
19 import java.io.IOException;
20 import java.net.InetAddress;
21 import java.net.MalformedURLException;
22 import java.net.URL;
23 import java.net.UnknownHostException;
24 import java.util;
25 import java.util.Map.Entry;
26 import java.util.concurrent.atomic.AtomicInteger;
27 import java.util.concurrent.atomic.AtomicLong;
28
29 // Commons Logging imports
30 import org.apache.commons.logging.Log;
31 import org.apache.commons.logging.LogFactory;
32
33 import org.apache.hadoop.io;
34 import org.apache.hadoop.fs;
35 import org.apache.hadoop.conf;
36 import org.apache.hadoop.mapred;
37 import org.apache.hadoop.util.StringUtils;
38
39 import org.apache.nutch.crawl.CrawlDatum;
40 import org.apache.nutch.crawl.NutchWritable;
41 import org.apache.nutch.crawl.SignatureFactory;
42 import org.apache.nutch.metadata.Metadata;
43 import org.apache.nutch.metadata.Nutch;
44 import org.apache.nutch.net;
45 import org.apache.nutch.protocol;
46 import org.apache.nutch.parse;
47 import org.apache.nutch.scoring.ScoringFilters;
48 import org.apache.nutch.util;
49
50
51 /**
52 * A queue-based fetcher.
53 *
54 * <p>This fetcher uses a well-known model of one producer (a QueueFeeder)
55 * and many consumers (FetcherThread-s).
56 *
57 * <p>QueueFeeder reads input fetchlists and
58 * populates a set of FetchItemQueue-s, which hold FetchItem-s that
59 * describe the items to be fetched. There are as many queues as there are unique
60 * hosts, but at any given time the total number of fetch items in all queues
61 * is less than a fixed number (currently set to a multiple of the number of
62 * threads).
63 *
64 * <p>As items are consumed from the queues, the QueueFeeder continues to add new
65 * input items, so that their total count stays fixed (FetcherThread-s may also
66 * add new items to the queues e.g. as a results of redirection) - until all
67 * input items are exhausted, at which point the number of items in the queues
68 * begins to decrease. When this number reaches 0 fetcher will finish.
69 *
70 * <p>This fetcher implementation handles per-host blocking itself, instead
71 * of delegating this work to protocol-specific plugins.
72 * Each per-host queue handles its own "politeness" settings, such as the
73 * maximum number of concurrent requests and crawl delay between consecutive
74 * requests - and also a list of requests in progress, and the time the last
75 * request was finished. As FetcherThread-s ask for new items to be fetched,
76 * queues may return eligible items or null if for "politeness" reasons this
77 * host's queue is not yet ready.
78 *
79 * <p>If there are still unfetched items in the queues, but none of the items
80 * are ready, FetcherThread-s will spin-wait until either some items become
81 * available, or a timeout is reached (at which point the Fetcher will abort,
82 * assuming the task is hung).
83 *
84 * @author Andrzej Bialecki
85 */
86 public class Fetcher extends Configured implements
87 MapRunnable<Text, CrawlDatum, Text, NutchWritable> {
88
89 public static final int PERM_REFRESH_TIME = 5;
90
91 public static final String CONTENT_REDIR = "content";
92
93 public static final String PROTOCOL_REDIR = "protocol";
94
95 public static final Log LOG = LogFactory.getLog(Fetcher.class);
96
97 public static class InputFormat extends SequenceFileInputFormat<Text, CrawlDatum> {
98 /** Don't split inputs, to keep things polite. */
99 public InputSplit[] getSplits(JobConf job, int nSplits)
100 throws IOException {
101 FileStatus[] files = listStatus(job);
102 FileSplit[] splits = new FileSplit[files.length];
103 for (int i = 0; i < files.length; i++) {
104 FileStatus cur = files[i];
105 splits[i] = new FileSplit(cur.getPath(), 0,
106 cur.getLen(), (String[])null);
107 }
108 return splits;
109 }
110 }
111
112 private OutputCollector<Text, NutchWritable> output;
113 private Reporter reporter;
114
115 private String segmentName;
116 private AtomicInteger activeThreads = new AtomicInteger(0);
117 private AtomicInteger spinWaiting = new AtomicInteger(0);
118
119 private long start = System.currentTimeMillis(); // start time of fetcher run
120 private AtomicLong lastRequestStart = new AtomicLong(start);
121
122 private AtomicLong bytes = new AtomicLong(0); // total bytes fetched
123 private AtomicInteger pages = new AtomicInteger(0); // total pages fetched
124 private AtomicInteger errors = new AtomicInteger(0); // total pages errored
125
126 private boolean storingContent;
127 private boolean parsing;
128 FetchItemQueues fetchQueues;
129 QueueFeeder feeder;
130
131 /**
132 * This class described the item to be fetched.
133 */
134 private static class FetchItem {
135 String queueID;
136 Text url;
137 URL u;
138 CrawlDatum datum;
139
140 public FetchItem(Text url, URL u, CrawlDatum datum, String queueID) {
141 this.url = url;
142 this.u = u;
143 this.datum = datum;
144 this.queueID = queueID;
145 }
146
147 /** Create an item. Queue id will be created based on <code>byIP</code>
148 * argument, either as a protocol + hostname pair, or protocol + IP
149 * address pair.
150 */
151 public static FetchItem create(Text url, CrawlDatum datum, boolean byIP) {
152 String queueID;
153 URL u = null;
154 try {
155 u = new URL(url.toString());
156 } catch (Exception e) {
157 LOG.warn("Cannot parse url: " + url, e);
158 return null;
159 }
160 String proto = u.getProtocol().toLowerCase();
161 String host;
162 if (byIP) {
163 try {
164 InetAddress addr = InetAddress.getByName(u.getHost());
165 host = addr.getHostAddress();
166 } catch (UnknownHostException e) {
167 // unable to resolve it, so don't fall back to host name
168 LOG.warn("Unable to resolve: " + u.getHost() + ", skipping.");
169 return null;
170 }
171 } else {
172 host = u.getHost();
173 if (host == null) {
174 LOG.warn("Unknown host for url: " + url + ", skipping.");
175 return null;
176 }
177 host = host.toLowerCase();
178 }
179 queueID = proto + "://" + host;
180 return new FetchItem(url, u, datum, queueID);
181 }
182
183 public CrawlDatum getDatum() {
184 return datum;
185 }
186
187 public String getQueueID() {
188 return queueID;
189 }
190
191 public Text getUrl() {
192 return url;
193 }
194
195 public URL getURL2() {
196 return u;
197 }
198 }
199
200 /**
201 * This class handles FetchItems which come from the same host ID (be it
202 * a proto/hostname or proto/IP pair). It also keeps track of requests in
203 * progress and elapsed time between requests.
204 */
205 private static class FetchItemQueue {
206 List<FetchItem> queue = Collections.synchronizedList(new LinkedList<FetchItem>());
207 Set<FetchItem> inProgress = Collections.synchronizedSet(new HashSet<FetchItem>());
208 AtomicLong nextFetchTime = new AtomicLong();
209 long crawlDelay;
210 long minCrawlDelay;
211 int maxThreads;
212 Configuration conf;
213
214 public FetchItemQueue(Configuration conf, int maxThreads, long crawlDelay, long minCrawlDelay) {
215 this.conf = conf;
216 this.maxThreads = maxThreads;
217 this.crawlDelay = crawlDelay;
218 this.minCrawlDelay = minCrawlDelay;
219 // ready to start
220 setEndTime(System.currentTimeMillis() - crawlDelay);
221 }
222
223 public int getQueueSize() {
224 return queue.size();
225 }
226
227 public int getInProgressSize() {
228 return inProgress.size();
229 }
230
231 public void finishFetchItem(FetchItem it, boolean asap) {
232 if (it != null) {
233 inProgress.remove(it);
234 setEndTime(System.currentTimeMillis(), asap);
235 }
236 }
237
238 public void addFetchItem(FetchItem it) {
239 if (it == null) return;
240 queue.add(it);
241 }
242
243 public void addInProgressFetchItem(FetchItem it) {
244 if (it == null) return;
245 inProgress.add(it);
246 }
247
248 public FetchItem getFetchItem() {
249 if (inProgress.size() >= maxThreads) return null;
250 long now = System.currentTimeMillis();
251 if (nextFetchTime.get() > now) return null;
252 FetchItem it = null;
253 if (queue.size() == 0) return null;
254 try {
255 it = queue.remove(0);
256 inProgress.add(it);
257 } catch (Exception e) {
258 LOG.error("Cannot remove FetchItem from queue or cannot add it to inProgress queue", e);
259 }
260 return it;
261 }
262
263 public synchronized void dump() {
264 LOG.info(" maxThreads = " + maxThreads);
265 LOG.info(" inProgress = " + inProgress.size());
266 LOG.info(" crawlDelay = " + crawlDelay);
267 LOG.info(" minCrawlDelay = " + minCrawlDelay);
268 LOG.info(" nextFetchTime = " + nextFetchTime.get());
269 LOG.info(" now = " + System.currentTimeMillis());
270 for (int i = 0; i < queue.size(); i++) {
271 FetchItem it = queue.get(i);
272 LOG.info(" " + i + ". " + it.url);
273 }
274 }
275
276 private void setEndTime(long endTime) {
277 setEndTime(endTime, false);
278 }
279
280 private void setEndTime(long endTime, boolean asap) {
281 if (!asap)
282 nextFetchTime.set(endTime + (maxThreads > 1 ? minCrawlDelay : crawlDelay));
283 else
284 nextFetchTime.set(endTime);
285 }
286 }
287
288 /**
289 * Convenience class - a collection of queues that keeps track of the total
290 * number of items, and provides items eligible for fetching from any queue.
291 */
292 private static class FetchItemQueues {
293 public static final String DEFAULT_ID = "default";
294 Map<String, FetchItemQueue> queues = new HashMap<String, FetchItemQueue>();
295 AtomicInteger totalSize = new AtomicInteger(0);
296 int maxThreads;
297 boolean byIP;
298 long crawlDelay;
299 long minCrawlDelay;
300 Configuration conf;
301
302 public FetchItemQueues(Configuration conf) {
303 this.conf = conf;
304 this.maxThreads = conf.getInt("fetcher.threads.per.host", 1);
305 // backward-compatible default setting
306 this.byIP = conf.getBoolean("fetcher.threads.per.host.by.ip", false);
307 this.crawlDelay = (long) (conf.getFloat("fetcher.server.delay", 1.0f) * 1000);
308 this.minCrawlDelay = (long) (conf.getFloat("fetcher.server.min.delay", 0.0f) * 1000);
309 }
310
311 public int getTotalSize() {
312 return totalSize.get();
313 }
314
315 public int getQueueCount() {
316 return queues.size();
317 }
318
319 public void addFetchItem(Text url, CrawlDatum datum) {
320 FetchItem it = FetchItem.create(url, datum, byIP);
321 if (it != null) addFetchItem(it);
322 }
323
324 public void addFetchItem(FetchItem it) {
325 FetchItemQueue fiq = getFetchItemQueue(it.queueID);
326 fiq.addFetchItem(it);
327 totalSize.incrementAndGet();
328 }
329
330 public void finishFetchItem(FetchItem it) {
331 finishFetchItem(it, false);
332 }
333
334 public void finishFetchItem(FetchItem it, boolean asap) {
335 FetchItemQueue fiq = queues.get(it.queueID);
336 if (fiq == null) {
337 LOG.warn("Attempting to finish item from unknown queue: " + it);
338 return;
339 }
340 fiq.finishFetchItem(it, asap);
341 }
342
343 public synchronized FetchItemQueue getFetchItemQueue(String id) {
344 FetchItemQueue fiq = queues.get(id);
345 if (fiq == null) {
346 // initialize queue
347 fiq = new FetchItemQueue(conf, maxThreads, crawlDelay, minCrawlDelay);
348 queues.put(id, fiq);
349 }
350 return fiq;
351 }
352
353 public synchronized FetchItem getFetchItem() {
354 Iterator<Map.Entry<String, FetchItemQueue>> it =
355 queues.entrySet().iterator();
356 while (it.hasNext()) {
357 FetchItemQueue fiq = it.next().getValue();
358 // reap empty queues
359 if (fiq.getQueueSize() == 0 && fiq.getInProgressSize() == 0) {
360 it.remove();
361 continue;
362 }
363 FetchItem fit = fiq.getFetchItem();
364 if (fit != null) {
365 totalSize.decrementAndGet();
366 return fit;
367 }
368 }
369 return null;
370 }
371
372 public synchronized void dump() {
373 for (String id : queues.keySet()) {
374 FetchItemQueue fiq = queues.get(id);
375 if (fiq.getQueueSize() == 0) continue;
376 LOG.info("* queue: " + id);
377 fiq.dump();
378 }
379 }
380 }
381
382 /**
383 * This class feeds the queues with input items, and re-fills them as
384 * items are consumed by FetcherThread-s.
385 */
386 private static class QueueFeeder extends Thread {
387 private RecordReader<Text, CrawlDatum> reader;
388 private FetchItemQueues queues;
389 private int size;
390
391 public QueueFeeder(RecordReader<Text, CrawlDatum> reader,
392 FetchItemQueues queues, int size) {
393 this.reader = reader;
394 this.queues = queues;
395 this.size = size;
396 this.setDaemon(true);
397 this.setName("QueueFeeder");
398 }
399
400 public void run() {
401 boolean hasMore = true;
402 int cnt = 0;
403
404 while (hasMore) {
405 int feed = size - queues.getTotalSize();
406 if (feed <= 0) {
407 // queues are full - spin-wait until they have some free space
408 try {
409 Thread.sleep(1000);
410 } catch (Exception e) {};
411 continue;
412 } else {
413 LOG.debug("-feeding " + feed + " input urls ...");
414 while (feed > 0 && hasMore) {
415 try {
416 Text url = new Text();
417 CrawlDatum datum = new CrawlDatum();
418 hasMore = reader.next(url, datum);
419 if (hasMore) {
420 queues.addFetchItem(url, datum);
421 cnt++;
422 feed--;
423 }
424 } catch (IOException e) {
425 LOG.fatal("QueueFeeder error reading input, record " + cnt, e);
426 return;
427 }
428 }
429 }
430 }
431 LOG.info("QueueFeeder finished: total " + cnt + " records.");
432 }
433 }
434
435 /**
436 * This class picks items from queues and fetches the pages.
437 */
438 private class FetcherThread extends Thread {
439 private Configuration conf;
440 private URLFilters urlFilters;
441 private ScoringFilters scfilters;
442 private ParseUtil parseUtil;
443 private URLNormalizers normalizers;
444 private ProtocolFactory protocolFactory;
445 private long maxCrawlDelay;
446 private boolean byIP;
447 private int maxRedirect;
448 private String reprUrl;
449 private boolean redirecting;
450 private int redirectCount;
451 private boolean ignoreExternalLinks;
452
453 public FetcherThread(Configuration conf) {
454 this.setDaemon(true); // don't hang JVM on exit
455 this.setName("FetcherThread"); // use an informative name
456 this.conf = conf;
457 this.urlFilters = new URLFilters(conf);
458 this.scfilters = new ScoringFilters(conf);
459 this.parseUtil = new ParseUtil(conf);
460 this.protocolFactory = new ProtocolFactory(conf);
461 this.normalizers = new URLNormalizers(conf, URLNormalizers.SCOPE_FETCHER);
462 this.maxCrawlDelay = conf.getInt("fetcher.max.crawl.delay", 30) * 1000;
463 // backward-compatible default setting
464 this.byIP = conf.getBoolean("fetcher.threads.per.host.by.ip", true);
465 this.maxRedirect = conf.getInt("http.redirect.max", 3);
466 this.ignoreExternalLinks =
467 conf.getBoolean("db.ignore.external.links", false);
468 }
469
470 public void run() {
471 activeThreads.incrementAndGet(); // count threads
472
473 FetchItem fit = null;
474 try {
475
476 while (true) {
477 fit = fetchQueues.getFetchItem();
478 if (fit == null) {
479 if (feeder.isAlive() || fetchQueues.getTotalSize() > 0) {
480 LOG.debug(getName() + " spin-waiting ...");
481 // spin-wait.
482 spinWaiting.incrementAndGet();
483 try {
484 Thread.sleep(500);
485 } catch (Exception e) {}
486 spinWaiting.decrementAndGet();
487 continue;
488 } else {
489 // all done, finish this thread
490 return;
491 }
492 }
493 lastRequestStart.set(System.currentTimeMillis());
494 Text reprUrlWritable =
495 (Text) fit.datum.getMetaData().get(Nutch.WRITABLE_REPR_URL_KEY);
496 if (reprUrlWritable == null) {
497 reprUrl = fit.url.toString();
498 } else {
499 reprUrl = reprUrlWritable.toString();
500 }
501 try {
502 if (LOG.isInfoEnabled()) { LOG.info("fetching " + fit.url); }
503
504 // fetch the page
505 redirecting = false;
506 redirectCount = 0;
507 do {
508 if (LOG.isDebugEnabled()) {
509 LOG.debug("redirectCount=" + redirectCount);
510 }
511 redirecting = false;
512 Protocol protocol = this.protocolFactory.getProtocol(fit.url.toString());
513 RobotRules rules = protocol.getRobotRules(fit.url, fit.datum);
514 if (!rules.isAllowed(fit.u)) {
515 // unblock
516 fetchQueues.finishFetchItem(fit, true);
517 if (LOG.isDebugEnabled()) {
518 LOG.debug("Denied by robots.txt: " + fit.url);
519 }
520 output(fit.url, fit.datum, null, ProtocolStatus.STATUS_ROBOTS_DENIED, CrawlDatum.STATUS_FETCH_GONE);
521 continue;
522 }
523 if (rules.getCrawlDelay() > 0) {
524 if (rules.getCrawlDelay() > maxCrawlDelay) {
525 // unblock
526 fetchQueues.finishFetchItem(fit, true);
527 LOG.debug("Crawl-Delay for " + fit.url + " too long (" + rules.getCrawlDelay() + "), skipping");
528 output(fit.url, fit.datum, null, ProtocolStatus.STATUS_ROBOTS_DENIED, CrawlDatum.STATUS_FETCH_GONE);
529 continue;
530 } else {
531 FetchItemQueue fiq = fetchQueues.getFetchItemQueue(fit.queueID);
532 fiq.crawlDelay = rules.getCrawlDelay();
533 }
534 }
535 ProtocolOutput output = protocol.getProtocolOutput(fit.url, fit.datum);
536 ProtocolStatus status = output.getStatus();
537 Content content = output.getContent();
538 ParseStatus pstatus = null;
539 // unblock queue
540 fetchQueues.finishFetchItem(fit);
541
542 String urlString = fit.url.toString();
543
544 switch(status.getCode()) {
545
546 case ProtocolStatus.WOULDBLOCK:
547 // retry ?
548 fetchQueues.addFetchItem(fit);
549 break;
550
551 case ProtocolStatus.SUCCESS: // got a page
552 pstatus = output(fit.url, fit.datum, content, status, CrawlDatum.STATUS_FETCH_SUCCESS);
553 updateStatus(content.getContent().length);
554 if (pstatus != null && pstatus.isSuccess() &&
555 pstatus.getMinorCode() == ParseStatus.SUCCESS_REDIRECT) {
556 String newUrl = pstatus.getMessage();
557 int refreshTime = Integer.valueOf(pstatus.getArgs()[1]);
558 Text redirUrl =
559 handleRedirect(fit.url, fit.datum,
560 urlString, newUrl,
561 refreshTime < Fetcher.PERM_REFRESH_TIME,
562 Fetcher.CONTENT_REDIR);
563 if (redirUrl != null) {
564 CrawlDatum newDatum = new CrawlDatum(CrawlDatum.STATUS_DB_UNFETCHED,
565 fit.datum.getFetchInterval(), fit.datum.getScore());
566 if (reprUrl != null) {
567 newDatum.getMetaData().put(Nutch.WRITABLE_REPR_URL_KEY,
568 new Text(reprUrl));
569 }
570 fit = FetchItem.create(redirUrl, newDatum, byIP);
571 if (fit != null) {
572 FetchItemQueue fiq =
573 fetchQueues.getFetchItemQueue(fit.queueID);
574 fiq.addInProgressFetchItem(fit);
575 } else {
576 // stop redirecting
577 redirecting = false;
578 }
579 }
580 }
581 break;
582
583 case ProtocolStatus.MOVED: // redirect
584 case ProtocolStatus.TEMP_MOVED:
585 int code;
586 boolean temp;
587 if (status.getCode() == ProtocolStatus.MOVED) {
588 code = CrawlDatum.STATUS_FETCH_REDIR_PERM;
589 temp = false;
590 } else {
591 code = CrawlDatum.STATUS_FETCH_REDIR_TEMP;
592 temp = true;
593 }
594 output(fit.url, fit.datum, content, status, code);
595 String newUrl = status.getMessage();
596 Text redirUrl =
597 handleRedirect(fit.url, fit.datum,
598 urlString, newUrl, temp,
599 Fetcher.PROTOCOL_REDIR);
600 if (redirUrl != null) {
601 CrawlDatum newDatum = new CrawlDatum(CrawlDatum.STATUS_DB_UNFETCHED,
602 fit.datum.getFetchInterval(), fit.datum.getScore());
603 if (reprUrl != null) {
604 newDatum.getMetaData().put(Nutch.WRITABLE_REPR_URL_KEY,
605 new Text(reprUrl));
606 }
607 fit = FetchItem.create(redirUrl, newDatum, byIP);
608 if (fit != null) {
609 FetchItemQueue fiq =
610 fetchQueues.getFetchItemQueue(fit.queueID);
611 fiq.addInProgressFetchItem(fit);
612 } else {
613 // stop redirecting
614 redirecting = false;
615 }
616 } else {
617 // stop redirecting
618 redirecting = false;
619 }
620 break;
621
622 case ProtocolStatus.EXCEPTION:
623 logError(fit.url, status.getMessage());
624 /* FALLTHROUGH */
625 case ProtocolStatus.RETRY: // retry
626 case ProtocolStatus.BLOCKED:
627 output(fit.url, fit.datum, null, status, CrawlDatum.STATUS_FETCH_RETRY);
628 break;
629
630 case ProtocolStatus.GONE: // gone
631 case ProtocolStatus.NOTFOUND:
632 case ProtocolStatus.ACCESS_DENIED:
633 case ProtocolStatus.ROBOTS_DENIED:
634 output(fit.url, fit.datum, null, status, CrawlDatum.STATUS_FETCH_GONE);
635 break;
636
637 case ProtocolStatus.NOTMODIFIED:
638 output(fit.url, fit.datum, null, status, CrawlDatum.STATUS_FETCH_NOTMODIFIED);
639 break;
640
641 default:
642 if (LOG.isWarnEnabled()) {
643 LOG.warn("Unknown ProtocolStatus: " + status.getCode());
644 }
645 output(fit.url, fit.datum, null, status, CrawlDatum.STATUS_FETCH_RETRY);
646 }
647
648 if (redirecting && redirectCount >= maxRedirect) {
649 fetchQueues.finishFetchItem(fit);
650 if (LOG.isInfoEnabled()) {
651 LOG.info(" - redirect count exceeded " + fit.url);
652 }
653 output(fit.url, fit.datum, null, ProtocolStatus.STATUS_REDIR_EXCEEDED, CrawlDatum.STATUS_FETCH_GONE);
654 }
655
656 } while (redirecting && (redirectCount < maxRedirect));
657
658 } catch (Throwable t) { // unexpected exception
659 // unblock
660 fetchQueues.finishFetchItem(fit);
661 logError(fit.url, t.toString());
662 output(fit.url, fit.datum, null, ProtocolStatus.STATUS_FAILED, CrawlDatum.STATUS_FETCH_RETRY);
663 }
664 }
665
666 } catch (Throwable e) {
667 if (LOG.isFatalEnabled()) {
668 e.printStackTrace(LogUtil.getFatalStream(LOG));
669 LOG.fatal("fetcher caught:"+e.toString());
670 }
671 } finally {
672 if (fit != null) fetchQueues.finishFetchItem(fit);
673 activeThreads.decrementAndGet(); // count threads
674 LOG.info("-finishing thread " + getName() + ", activeThreads=" + activeThreads);
675 }
676 }
677
678 private Text handleRedirect(Text url, CrawlDatum datum,
679 String urlString, String newUrl,
680 boolean temp, String redirType)
681 throws MalformedURLException, URLFilterException {
682 newUrl = normalizers.normalize(newUrl, URLNormalizers.SCOPE_FETCHER);
683 newUrl = urlFilters.filter(newUrl);
684
685 if (ignoreExternalLinks) {
686 try {
687 String origHost = new URL(urlString).getHost().toLowerCase();
688 String newHost = new URL(newUrl).getHost().toLowerCase();
689 if (!origHost.equals(newHost)) {
690 if (LOG.isDebugEnabled()) {
691 LOG.debug(" - ignoring redirect " + redirType + " from " +
692 urlString + " to " + newUrl +
693 " because external links are ignored");
694 }
695 return null;
696 }
697 } catch (MalformedURLException e) { }
698 }
699
700 if (newUrl != null && !newUrl.equals(urlString)) {
701 reprUrl = URLUtil.chooseRepr(reprUrl, newUrl, temp);
702 url = new Text(newUrl);
703 if (maxRedirect > 0) {
704 redirecting = true;
705 redirectCount++;
706 if (LOG.isDebugEnabled()) {
707 LOG.debug(" - " + redirType + " redirect to " +
708 url + " (fetching now)");
709 }
710 return url;
711 } else {
712 CrawlDatum newDatum = new CrawlDatum(CrawlDatum.STATUS_LINKED,
713 datum.getFetchInterval());
714 if (reprUrl != null) {
715 newDatum.getMetaData().put(Nutch.WRITABLE_REPR_URL_KEY,
716 new Text(reprUrl));
717 }
718 output(url, newDatum, null, null, CrawlDatum.STATUS_LINKED);
719 if (LOG.isDebugEnabled()) {
720 LOG.debug(" - " + redirType + " redirect to " +
721 url + " (fetching later)");
722 }
723 return null;
724 }
725 } else {
726 if (LOG.isDebugEnabled()) {
727 LOG.debug(" - " + redirType + " redirect skipped: " +
728 (newUrl != null ? "to same url" : "filtered"));
729 }
730 return null;
731 }
732 }
733
734 private void logError(Text url, String message) {
735 if (LOG.isInfoEnabled()) {
736 LOG.info("fetch of " + url + " failed with: " + message);
737 }
738 errors.incrementAndGet();
739 }
740
741 private ParseStatus output(Text key, CrawlDatum datum,
742 Content content, ProtocolStatus pstatus, int status) {
743
744 datum.setStatus(status);
745 datum.setFetchTime(System.currentTimeMillis());
746 if (pstatus != null) datum.getMetaData().put(Nutch.WRITABLE_PROTO_STATUS_KEY, pstatus);
747
748 ParseResult parseResult = null;
749 if (content != null) {
750 Metadata metadata = content.getMetadata();
751 // add segment to metadata
752 metadata.set(Nutch.SEGMENT_NAME_KEY, segmentName);
753 // add score to content metadata so that ParseSegment can pick it up.
754 try {
755 scfilters.passScoreBeforeParsing(key, datum, content);
756 } catch (Exception e) {
757 if (LOG.isWarnEnabled()) {
758 e.printStackTrace(LogUtil.getWarnStream(LOG));
759 LOG.warn("Couldn't pass score, url " + key + " (" + e + ")");
760 }
761 }
762 /* Note: Fetcher will only follow meta-redirects coming from the
763 * original URL. */
764 if (parsing && status == CrawlDatum.STATUS_FETCH_SUCCESS) {
765 try {
766 parseResult = this.parseUtil.parse(content);
767 } catch (Exception e) {
768 LOG.warn("Error parsing: " + key + ": " + StringUtils.stringifyException(e));
769 }
770
771 if (parseResult == null) {
772 byte[] signature =
773 SignatureFactory.getSignature(getConf()).calculate(content,
774 new ParseStatus().getEmptyParse(conf));
775 datum.setSignature(signature);
776 }
777 }
778
779 /* Store status code in content So we can read this value during
780 * parsing (as a separate job) and decide to parse or not.
781 */
782 content.getMetadata().add(Nutch.FETCH_STATUS_KEY, Integer.toString(status));
783 }
784
785 try {
786 output.collect(key, new NutchWritable(datum));
787 if (content != null && storingContent)
788 output.collect(key, new NutchWritable(content));
789 if (parseResult != null) {
790 for (Entry<Text, Parse> entry : parseResult) {
791 Text url = entry.getKey();
792 Parse parse = entry.getValue();
793 ParseStatus parseStatus = parse.getData().getStatus();
794
795 if (!parseStatus.isSuccess()) {
796 LOG.warn("Error parsing: " + key + ": " + parseStatus);
797 parse = parseStatus.getEmptyParse(getConf());
798 }
799
800 // Calculate page signature. For non-parsing fetchers this will
801 // be done in ParseSegment
802 byte[] signature =
803 SignatureFactory.getSignature(getConf()).calculate(content, parse);
804 // Ensure segment name and score are in parseData metadata
805 parse.getData().getContentMeta().set(Nutch.SEGMENT_NAME_KEY,
806 segmentName);
807 parse.getData().getContentMeta().set(Nutch.SIGNATURE_KEY,
808 StringUtil.toHexString(signature));
809 // Pass fetch time to content meta
810 parse.getData().getContentMeta().set(Nutch.FETCH_TIME_KEY,
811 Long.toString(datum.getFetchTime()));
812 if (url.equals(key))
813 datum.setSignature(signature);
814 try {
815 scfilters.passScoreAfterParsing(url, content, parse);
816 } catch (Exception e) {
817 if (LOG.isWarnEnabled()) {
818 e.printStackTrace(LogUtil.getWarnStream(LOG));
819 LOG.warn("Couldn't pass score, url " + key + " (" + e + ")");
820 }
821 }
822 output.collect(url, new NutchWritable(
823 new ParseImpl(new ParseText(parse.getText()),
824 parse.getData(), parse.isCanonical())));
825 }
826 }
827 } catch (IOException e) {
828 if (LOG.isFatalEnabled()) {
829 e.printStackTrace(LogUtil.getFatalStream(LOG));
830 LOG.fatal("fetcher caught:"+e.toString());
831 }
832 }
833
834 // return parse status if it exits
835 if (parseResult != null && !parseResult.isEmpty()) {
836 Parse p = parseResult.get(content.getUrl());
837 if (p != null) {
838 return p.getData().getStatus();
839 }
840 }
841 return null;
842 }
843
844 }
845
846 public Fetcher() { super(null); }
847
848 public Fetcher(Configuration conf) { super(conf); }
849
850 private void updateStatus(int bytesInPage) throws IOException {
851 pages.incrementAndGet();
852 bytes.addAndGet(bytesInPage);
853 }
854
855
856 private void reportStatus() throws IOException {
857 String status;
858 long elapsed = (System.currentTimeMillis() - start)/1000;
859 status = activeThreads + " threads, " +
860 pages+" pages, "+errors+" errors, "
861 + Math.round(((float)pages.get()*10)/elapsed)/10.0+" pages/s, "
862 + Math.round(((((float)bytes.get())*8)/1024)/elapsed)+" kb/s, ";
863 reporter.setStatus(status);
864 }
865
866 public void configure(JobConf job) {
867 setConf(job);
868
869 this.segmentName = job.get(Nutch.SEGMENT_NAME_KEY);
870 this.storingContent = isStoringContent(job);
871 this.parsing = isParsing(job);
872
873 // if (job.getBoolean("fetcher.verbose", false)) {
874 // LOG.setLevel(Level.FINE);
875 // }
876 }
877
878 public void close() {}
879
880 public static boolean isParsing(Configuration conf) {
881 return conf.getBoolean("fetcher.parse", true);
882 }
883
884 public static boolean isStoringContent(Configuration conf) {
885 return conf.getBoolean("fetcher.store.content", true);
886 }
887
888 public void run(RecordReader<Text, CrawlDatum> input,
889 OutputCollector<Text, NutchWritable> output,
890 Reporter reporter) throws IOException {
891
892 this.output = output;
893 this.reporter = reporter;
894 this.fetchQueues = new FetchItemQueues(getConf());
895
896 int threadCount = getConf().getInt("fetcher.threads.fetch", 10);
897 if (LOG.isInfoEnabled()) { LOG.info("Fetcher: threads: " + threadCount); }
898
899 feeder = new QueueFeeder(input, fetchQueues, threadCount * 50);
900 //feeder.setPriority((Thread.MAX_PRIORITY + Thread.NORM_PRIORITY) / 2);
901 feeder.start();
902
903 // set non-blocking & no-robots mode for HTTP protocol plugins.
904 getConf().setBoolean(Protocol.CHECK_BLOCKING, false);
905 getConf().setBoolean(Protocol.CHECK_ROBOTS, false);
906
907 for (int i = 0; i < threadCount; i++) { // spawn threads
908 new FetcherThread(getConf()).start();
909 }
910
911 // select a timeout that avoids a task timeout
912 long timeout = getConf().getInt("mapred.task.timeout", 10*60*1000)/2;
913
914 do { // wait for threads to exit
915 try {
916 Thread.sleep(1000);
917 } catch (InterruptedException e) {}
918
919 reportStatus();
920 LOG.info("-activeThreads=" + activeThreads + ", spinWaiting=" + spinWaiting.get()
921 + ", fetchQueues.totalSize=" + fetchQueues.getTotalSize());
922
923 if (!feeder.isAlive() && fetchQueues.getTotalSize() < 5) {
924 fetchQueues.dump();
925 }
926 // some requests seem to hang, despite all intentions
927 if ((System.currentTimeMillis() - lastRequestStart.get()) > timeout) {
928 if (LOG.isWarnEnabled()) {
929 LOG.warn("Aborting with "+activeThreads+" hung threads.");
930 }
931 return;
932 }
933
934 } while (activeThreads.get() > 0);
935 LOG.info("-activeThreads=" + activeThreads);
936
937 }
938
939 public void fetch(Path segment, int threads, boolean parsing)
940 throws IOException {
941
942 checkConfiguration();
943
944 if (LOG.isInfoEnabled()) {
945 LOG.info("Fetcher: starting");
946 LOG.info("Fetcher: segment: " + segment);
947 }
948
949 JobConf job = new NutchJob(getConf());
950 job.setJobName("fetch " + segment);
951
952 job.setInt("fetcher.threads.fetch", threads);
953 job.set(Nutch.SEGMENT_NAME_KEY, segment.getName());
954 job.setBoolean("fetcher.parse", parsing);
955
956 // for politeness, don't permit parallel execution of a single task
957 job.setSpeculativeExecution(false);
958
959 FileInputFormat.addInputPath(job, new Path(segment, CrawlDatum.GENERATE_DIR_NAME));
960 job.setInputFormat(InputFormat.class);
961
962 job.setMapRunnerClass(Fetcher.class);
963
964 FileOutputFormat.setOutputPath(job, segment);
965 job.setOutputFormat(FetcherOutputFormat.class);
966 job.setOutputKeyClass(Text.class);
967 job.setOutputValueClass(NutchWritable.class);
968
969 JobClient.runJob(job);
970 if (LOG.isInfoEnabled()) { LOG.info("Fetcher: done"); }
971 }
972
973
974 /** Run the fetcher. */
975 public static void main(String[] args) throws Exception {
976
977 String usage = "Usage: Fetcher <segment> [-threads n] [-noParsing]";
978
979 if (args.length < 1) {
980 System.err.println(usage);
981 System.exit(-1);
982 }
983
984 Path segment = new Path(args[0]);
985
986 Configuration conf = NutchConfiguration.create();
987
988 int threads = conf.getInt("fetcher.threads.fetch", 10);
989 boolean parsing = true;
990
991 for (int i = 1; i < args.length; i++) { // parse command line
992 if (args[i].equals("-threads")) { // found -threads option
993 threads = Integer.parseInt(args[++i]);
994 } else if (args[i].equals("-noParsing")) parsing = false;
995 }
996
997 conf.setInt("fetcher.threads.fetch", threads);
998 if (!parsing) {
999 conf.setBoolean("fetcher.parse", parsing);
1000 }
1001 Fetcher fetcher = new Fetcher(conf); // make a Fetcher
1002
1003 fetcher.fetch(segment, threads, parsing); // run the Fetcher
1004
1005 }
1006
1007 private void checkConfiguration() {
1008
1009 // ensure that a value has been set for the agent name and that that
1010 // agent name is the first value in the agents we advertise for robot
1011 // rules parsing
1012 String agentName = getConf().get("http.agent.name");
1013 if (agentName == null || agentName.trim().length() == 0) {
1014 String message = "Fetcher: No agents listed in 'http.agent.name'"
1015 + " property.";
1016 if (LOG.isFatalEnabled()) {
1017 LOG.fatal(message);
1018 }
1019 throw new IllegalArgumentException(message);
1020 } else {
1021
1022 // get all of the agents that we advertise
1023 String agentNames = getConf().get("http.robots.agents");
1024 StringTokenizer tok = new StringTokenizer(agentNames, ",");
1025 ArrayList<String> agents = new ArrayList<String>();
1026 while (tok.hasMoreTokens()) {
1027 agents.add(tok.nextToken().trim());
1028 }
1029
1030 // if the first one is not equal to our agent name, log fatal and throw
1031 // an exception
1032 if (!(agents.get(0)).equalsIgnoreCase(agentName)) {
1033 String message = "Fetcher: Your 'http.agent.name' value should be "
1034 + "listed first in 'http.robots.agents' property.";
1035 if (LOG.isWarnEnabled()) {
1036 LOG.warn(message);
1037 }
1038 }
1039 }
1040 }
1041
1042 }