1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18 package org.apache.nutch.crawl;
19
20 import java.io;
21 import java.net;
22 import java.util;
23 import java.text;
24
25 // Commons Logging imports
26 import org.apache.commons.logging.Log;
27 import org.apache.commons.logging.LogFactory;
28
29 import org.apache.hadoop.io;
30 import org.apache.hadoop.conf;
31 import org.apache.hadoop.mapred;
32 import org.apache.hadoop.util;
33 import org.apache.hadoop.fs.FileSystem;
34 import org.apache.hadoop.fs.Path;
35
36 import org.apache.nutch.metadata.Nutch;
37 import org.apache.nutch.net.URLFilterException;
38 import org.apache.nutch.net.URLFilters;
39 import org.apache.nutch.net.URLNormalizers;
40 import org.apache.nutch.scoring.ScoringFilterException;
41 import org.apache.nutch.scoring.ScoringFilters;
42 import org.apache.nutch.util.LockUtil;
43 import org.apache.nutch.util.NutchConfiguration;
44 import org.apache.nutch.util.NutchJob;
45
46 /** Generates a subset of a crawl db to fetch. */
47 public class Generator extends Configured implements Tool {
48
49 public static final String CRAWL_GENERATE_FILTER = "crawl.generate.filter";
50 public static final String GENERATE_MAX_PER_HOST_BY_IP = "generate.max.per.host.by.ip";
51 public static final String GENERATE_MAX_PER_HOST = "generate.max.per.host";
52 public static final String GENERATE_UPDATE_CRAWLDB = "generate.update.crawldb";
53 public static final String CRAWL_TOP_N = "crawl.topN";
54 public static final String CRAWL_GEN_CUR_TIME = "crawl.gen.curTime";
55 public static final String CRAWL_GEN_DELAY = "crawl.gen.delay";
56 public static final Log LOG = LogFactory.getLog(Generator.class);
57
58 public static class SelectorEntry implements Writable {
59 public Text url;
60 public CrawlDatum datum;
61
62 public SelectorEntry() {
63 url = new Text();
64 datum = new CrawlDatum();
65 }
66
67 public void readFields(DataInput in) throws IOException {
68 url.readFields(in);
69 datum.readFields(in);
70 }
71
72 public void write(DataOutput out) throws IOException {
73 url.write(out);
74 datum.write(out);
75 }
76
77 public String toString() {
78 return "url=" + url.toString() + ", datum=" + datum.toString();
79 }
80 }
81
82 /** Selects entries due for fetch. */
83 public static class Selector implements Mapper<Text, CrawlDatum, FloatWritable, SelectorEntry>, Partitioner<FloatWritable, Writable>, Reducer<FloatWritable, SelectorEntry, FloatWritable, SelectorEntry> {
84 private LongWritable genTime = new LongWritable(System.currentTimeMillis());
85 private long curTime;
86 private long limit;
87 private long count;
88 private HashMap<String, IntWritable> hostCounts =
89 new HashMap<String, IntWritable>();
90 private int maxPerHost;
91 private HashSet<String> maxedHosts = new HashSet<String>();
92 private HashSet<String> dnsFailureHosts = new HashSet<String>();
93 private Partitioner<Text, Writable> hostPartitioner = new PartitionUrlByHost();
94 private URLFilters filters;
95 private URLNormalizers normalizers;
96 private ScoringFilters scfilters;
97 private SelectorEntry entry = new SelectorEntry();
98 private FloatWritable sortValue = new FloatWritable();
99 private boolean byIP;
100 private long dnsFailure = 0L;
101 private boolean filter;
102 private long genDelay;
103 private FetchSchedule schedule;
104
105 public void configure(JobConf job) {
106 curTime = job.getLong(CRAWL_GEN_CUR_TIME, System.currentTimeMillis());
107 limit = job.getLong(CRAWL_TOP_N,Long.MAX_VALUE)/job.getNumReduceTasks();
108 maxPerHost = job.getInt(GENERATE_MAX_PER_HOST, -1);
109 byIP = job.getBoolean(GENERATE_MAX_PER_HOST_BY_IP, false);
110 filters = new URLFilters(job);
111 normalizers = new URLNormalizers(job, URLNormalizers.SCOPE_GENERATE_HOST_COUNT);
112 scfilters = new ScoringFilters(job);
113 hostPartitioner.configure(job);
114 filter = job.getBoolean(CRAWL_GENERATE_FILTER, true);
115 genDelay = job.getLong(CRAWL_GEN_DELAY, 7L) * 3600L * 24L * 1000L;
116 long time = job.getLong(Nutch.GENERATE_TIME_KEY, 0L);
117 if (time > 0) genTime.set(time);
118 schedule = FetchScheduleFactory.getFetchSchedule(job);
119 }
120
121 public void close() {}
122
123 /** Select & invert subset due for fetch. */
124 public void map(Text key, CrawlDatum value,
125 OutputCollector<FloatWritable, SelectorEntry> output, Reporter reporter)
126 throws IOException {
127 Text url = key;
128 if (filter) {
129 // If filtering is on don't generate URLs that don't pass URLFilters
130 try {
131 if (filters.filter(url.toString()) == null)
132 return;
133 } catch (URLFilterException e) {
134 if (LOG.isWarnEnabled()) {
135 LOG.warn("Couldn't filter url: " + url + " (" + e.getMessage()
136 + ")");
137 }
138 }
139 }
140 CrawlDatum crawlDatum = value;
141
142 // check fetch schedule
143 if (!schedule.shouldFetch(url, crawlDatum, curTime)) {
144 LOG.debug("-shouldFetch rejected '" + url+ "', fetchTime=" + crawlDatum.getFetchTime() + ", curTime=" + curTime);
145 return;
146 }
147
148 LongWritable oldGenTime = (LongWritable)crawlDatum.getMetaData().get(Nutch.WRITABLE_GENERATE_TIME_KEY);
149 if (oldGenTime != null) { // awaiting fetch & update
150 if (oldGenTime.get() + genDelay > curTime) // still wait for update
151 return;
152 }
153 float sort = 1.0f;
154 try {
155 sort = scfilters.generatorSortValue((Text)key, crawlDatum, sort);
156 } catch (ScoringFilterException sfe) {
157 if (LOG.isWarnEnabled()) {
158 LOG.warn("Couldn't filter generatorSortValue for " + key + ": " + sfe);
159 }
160 }
161 // sort by decreasing score, using DecreasingFloatComparator
162 sortValue.set(sort);
163 // record generation time
164 crawlDatum.getMetaData().put(Nutch.WRITABLE_GENERATE_TIME_KEY, genTime);
165 entry.datum = crawlDatum;
166 entry.url = (Text)key;
167 output.collect(sortValue, entry); // invert for sort by score
168 }
169
170 /** Partition by host. */
171 public int getPartition(FloatWritable key, Writable value,
172 int numReduceTasks) {
173 return hostPartitioner.getPartition(((SelectorEntry)value).url, key,
174 numReduceTasks);
175 }
176
177 /** Collect until limit is reached. */
178 public void reduce(FloatWritable key, Iterator<SelectorEntry> values,
179 OutputCollector<FloatWritable, SelectorEntry> output,
180 Reporter reporter)
181 throws IOException {
182
183 while (values.hasNext() && count < limit) {
184
185 SelectorEntry entry = values.next();
186 Text url = entry.url;
187 String urlString = url.toString();
188 URL u = null;
189
190 // skip bad urls, including empty and null urls
191 try {
192 u = new URL(url.toString());
193 } catch (MalformedURLException e) {
194 LOG.info("Bad protocol in url: " + url.toString());
195 continue;
196 }
197
198 String host = u.getHost();
199 host = host.toLowerCase();
200 String hostname = host;
201
202 // partitioning by ip will generate lots of DNS requests here, and will
203 // be up to double the overall dns load, do not run this way unless you
204 // are running a local caching DNS server or a two layer DNS cache
205 if (byIP) {
206 if (maxedHosts.contains(host)) {
207 if (LOG.isDebugEnabled()) { LOG.debug("Host already maxed out: " + host); }
208 continue;
209 }
210 if (dnsFailureHosts.contains(host)) {
211 if (LOG.isDebugEnabled()) { LOG.debug("Host name lookup already failed: " + host); }
212 continue;
213 }
214 try {
215 InetAddress ia = InetAddress.getByName(host);
216 host = ia.getHostAddress();
217 urlString = new URL(u.getProtocol(), host, u.getPort(), u.getFile()).toString();
218 }
219 catch (UnknownHostException uhe) {
220 // remember hostnames that could not be looked up
221 dnsFailureHosts.add(hostname);
222 if (LOG.isDebugEnabled()) {
223 LOG.debug("DNS lookup failed: " + host + ", skipping.");
224 }
225 dnsFailure++;
226 if ((dnsFailure % 1000 == 0) && (LOG.isWarnEnabled())) {
227 LOG.warn("DNS failures: " + dnsFailure);
228 }
229 continue;
230 }
231 }
232
233 try {
234 urlString = normalizers.normalize(urlString, URLNormalizers.SCOPE_GENERATE_HOST_COUNT);
235 host = new URL(urlString).getHost();
236 } catch (Exception e) {
237 LOG.warn("Malformed URL: '" + urlString + "', skipping (" +
238 StringUtils.stringifyException(e) + ")");
239 continue;
240 }
241
242 // only filter if we are counting hosts
243 if (maxPerHost > 0) {
244
245 IntWritable hostCount = hostCounts.get(host);
246 if (hostCount == null) {
247 hostCount = new IntWritable();
248 hostCounts.put(host, hostCount);
249 }
250
251 // increment hostCount
252 hostCount.set(hostCount.get() + 1);
253
254 // skip URL if above the limit per host.
255 if (hostCount.get() > maxPerHost) {
256 if (hostCount.get() == maxPerHost + 1) {
257 // remember the raw hostname that is maxed out
258 maxedHosts.add(hostname);
259 if (LOG.isInfoEnabled()) {
260 LOG.info("Host " + host + " has more than " + maxPerHost +
261 " URLs." + " Skipping additional.");
262 }
263 }
264 continue;
265 }
266 }
267
268 output.collect(key, entry);
269
270 // Count is incremented only when we keep the URL
271 // maxPerHost may cause us to skip it.
272 count++;
273 }
274 }
275 }
276
277 public static class DecreasingFloatComparator extends FloatWritable.Comparator {
278
279 /** Compares two FloatWritables decreasing. */
280 public int compare(byte[] b1, int s1, int l1,
281 byte[] b2, int s2, int l2) {
282 return super.compare(b2, s2, l2, b1, s1, l1);
283 }
284 }
285
286 public static class SelectorInverseMapper extends MapReduceBase implements Mapper<FloatWritable, SelectorEntry, Text, SelectorEntry> {
287
288 public void map(FloatWritable key, SelectorEntry value, OutputCollector<Text, SelectorEntry> output, Reporter reporter) throws IOException {
289 SelectorEntry entry = (SelectorEntry)value;
290 output.collect(entry.url, entry);
291 }
292 }
293
294 public static class PartitionReducer extends MapReduceBase
295 implements Reducer<Text, SelectorEntry, Text, CrawlDatum> {
296
297 public void reduce(Text key, Iterator<SelectorEntry> values,
298 OutputCollector<Text, CrawlDatum> output, Reporter reporter) throws IOException {
299 // if using HashComparator, we get only one input key in case of hash collision
300 // so use only URLs from values
301 while (values.hasNext()) {
302 SelectorEntry entry = values.next();
303 output.collect(entry.url, entry.datum);
304 }
305 }
306
307 }
308
309 /** Sort fetch lists by hash of URL. */
310 public static class HashComparator extends WritableComparator {
311 public HashComparator() {
312 super(Text.class);
313 }
314
315 public int compare(WritableComparable a, WritableComparable b) {
316 Text url1 = (Text) a;
317 Text url2 = (Text) b;
318 int hash1 = hash(url1.getBytes(), 0, url1.getLength());
319 int hash2 = hash(url2.getBytes(), 0, url2.getLength());
320 return (hash1 < hash2 ? -1 : (hash1 == hash2 ? 0 : 1));
321 }
322
323 public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
324 int hash1 = hash(b1, s1, l1);
325 int hash2 = hash(b2, s2, l2);
326 return (hash1 < hash2 ? -1 : (hash1 == hash2 ? 0 : 1));
327 }
328
329 private static int hash(byte[] bytes, int start, int length) {
330 int hash = 1;
331 // make later bytes more significant in hash code, so that sorting by
332 // hashcode correlates less with by-host ordering.
333 for (int i = length - 1; i >= 0; i--)
334 hash = (31 * hash) + (int) bytes[start + i];
335 return hash;
336 }
337 }
338
339 /**
340 * Update the CrawlDB so that the next generate won't include the same URLs.
341 */
342 public static class CrawlDbUpdater extends MapReduceBase implements Mapper<WritableComparable, Writable, Text, CrawlDatum>, Reducer<Text, CrawlDatum, Text, CrawlDatum> {
343 long generateTime;
344
345 public void configure(JobConf job) {
346 generateTime = job.getLong(Nutch.GENERATE_TIME_KEY, 0L);
347 }
348
349 public void map(WritableComparable key, Writable value, OutputCollector<Text, CrawlDatum> output, Reporter reporter) throws IOException {
350 if (key instanceof FloatWritable) { // tempDir source
351 SelectorEntry se = (SelectorEntry)value;
352 output.collect(se.url, se.datum);
353 } else {
354 output.collect((Text)key, (CrawlDatum)value);
355 }
356 }
357 private CrawlDatum orig = new CrawlDatum();
358 private LongWritable genTime = new LongWritable(0L);
359
360 public void reduce(Text key, Iterator<CrawlDatum> values, OutputCollector<Text, CrawlDatum> output, Reporter reporter) throws IOException {
361 while (values.hasNext()) {
362 CrawlDatum val = values.next();
363 if (val.getMetaData().containsKey(Nutch.WRITABLE_GENERATE_TIME_KEY)) {
364 LongWritable gt = (LongWritable)val.getMetaData().get(Nutch.WRITABLE_GENERATE_TIME_KEY);
365 genTime.set(gt.get());
366 if (genTime.get() != generateTime) {
367 orig.set(val);
368 genTime.set(0L);
369 continue;
370 }
371 } else {
372 orig.set(val);
373 }
374 }
375 if (genTime.get() != 0L) {
376 orig.getMetaData().put(Nutch.WRITABLE_GENERATE_TIME_KEY, genTime);
377 }
378 output.collect(key, orig);
379 }
380 }
381
382 public Generator() {}
383
384 public Generator(Configuration conf) {
385 setConf(conf);
386 }
387
388 /**
389 * Generate fetchlists in a segment. Whether to filter URLs or not is
390 * read from the crawl.generate.filter property in the configuration
391 * files. If the property is not found, the URLs are filtered.
392 *
393 * @param dbDir Crawl database directory
394 * @param segments Segments directory
395 * @param numLists Number of reduce tasks
396 * @param topN Number of top URLs to be selected
397 * @param curTime Current time in milliseconds
398 *
399 * @return Path to generated segment or null if no entries were
400 * selected
401 *
402 * @throws IOException When an I/O error occurs
403 */
404 public Path generate(Path dbDir, Path segments, int numLists,
405 long topN, long curTime) throws IOException {
406
407 JobConf job = new NutchJob(getConf());
408 boolean filter = job.getBoolean(CRAWL_GENERATE_FILTER, true);
409 return generate(dbDir, segments, numLists, topN, curTime, filter, false);
410 }
411
412 /**
413 * Generate fetchlists in a segment.
414 * @return Path to generated segment or null if no entries were selected.
415 * */
416 public Path generate(Path dbDir, Path segments,
417 int numLists, long topN, long curTime, boolean filter,
418 boolean force)
419 throws IOException {
420
421 Path tempDir =
422 new Path(getConf().get("mapred.temp.dir", ".") +
423 "/generate-temp-"+ System.currentTimeMillis());
424
425 Path segment = new Path(segments, generateSegmentName());
426 Path output = new Path(segment, CrawlDatum.GENERATE_DIR_NAME);
427
428 Path lock = new Path(dbDir, CrawlDb.LOCK_NAME);
429 FileSystem fs = FileSystem.get(getConf());
430 LockUtil.createLockFile(fs, lock, force);
431
432 LOG.info("Generator: Selecting best-scoring urls due for fetch.");
433 LOG.info("Generator: starting");
434 LOG.info("Generator: segment: " + segment);
435 LOG.info("Generator: filtering: " + filter);
436 if (topN != Long.MAX_VALUE) {
437 LOG.info("Generator: topN: " + topN);
438 }
439
440 // map to inverted subset due for fetch, sort by score
441 JobConf job = new NutchJob(getConf());
442 job.setJobName("generate: select " + segment);
443
444 if (numLists == -1) { // for politeness make
445 numLists = job.getNumMapTasks(); // a partition per fetch task
446 }
447 if ("local".equals(job.get("mapred.job.tracker")) && numLists != 1) {
448 // override
449 LOG.info("Generator: jobtracker is 'local', generating exactly one partition.");
450 numLists = 1;
451 }
452 job.setLong(CRAWL_GEN_CUR_TIME, curTime);
453 // record real generation time
454 long generateTime = System.currentTimeMillis();
455 job.setLong(Nutch.GENERATE_TIME_KEY, generateTime);
456 job.setLong(CRAWL_TOP_N, topN);
457 job.setBoolean(CRAWL_GENERATE_FILTER, filter);
458
459 FileInputFormat.addInputPath(job, new Path(dbDir, CrawlDb.CURRENT_NAME));
460 job.setInputFormat(SequenceFileInputFormat.class);
461
462 job.setMapperClass(Selector.class);
463 job.setPartitionerClass(Selector.class);
464 job.setReducerClass(Selector.class);
465
466 FileOutputFormat.setOutputPath(job, tempDir);
467 job.setOutputFormat(SequenceFileOutputFormat.class);
468 job.setOutputKeyClass(FloatWritable.class);
469 job.setOutputKeyComparatorClass(DecreasingFloatComparator.class);
470 job.setOutputValueClass(SelectorEntry.class);
471 try {
472 JobClient.runJob(job);
473 } catch (IOException e) {
474 LockUtil.removeLockFile(fs, lock);
475 throw e;
476 }
477
478 // check that we selected at least some entries ...
479 SequenceFile.Reader[] readers = SequenceFileOutputFormat.getReaders(job, tempDir);
480 boolean empty = true;
481 if (readers != null && readers.length > 0) {
482 for (int num = 0; num < readers.length; num++) {
483 if (readers[num].next(new FloatWritable())) {
484 empty = false;
485 break;
486 }
487 }
488 }
489
490 for (int i = 0; i < readers.length; i++) readers[i].close();
491
492 if (empty) {
493 LOG.warn("Generator: 0 records selected for fetching, exiting ...");
494 LockUtil.removeLockFile(fs, lock);
495 fs.delete(tempDir, true);
496 return null;
497 }
498
499 // invert again, paritition by host, sort by url hash
500 if (LOG.isInfoEnabled()) {
501 LOG.info("Generator: Partitioning selected urls by host, for politeness.");
502 }
503 job = new NutchJob(getConf());
504 job.setJobName("generate: partition " + segment);
505
506 job.setInt("partition.url.by.host.seed", new Random().nextInt());
507
508 FileInputFormat.addInputPath(job, tempDir);
509 job.setInputFormat(SequenceFileInputFormat.class);
510
511 job.setMapperClass(SelectorInverseMapper.class);
512 job.setMapOutputKeyClass(Text.class);
513 job.setMapOutputValueClass(SelectorEntry.class);
514 job.setPartitionerClass(PartitionUrlByHost.class);
515 job.setReducerClass(PartitionReducer.class);
516 job.setNumReduceTasks(numLists);
517
518 FileOutputFormat.setOutputPath(job, output);
519 job.setOutputFormat(SequenceFileOutputFormat.class);
520 job.setOutputKeyClass(Text.class);
521 job.setOutputValueClass(CrawlDatum.class);
522 job.setOutputKeyComparatorClass(HashComparator.class);
523 try {
524 JobClient.runJob(job);
525 } catch (IOException e) {
526 LockUtil.removeLockFile(fs, lock);
527 fs.delete(tempDir, true);
528 throw e;
529 }
530 if (getConf().getBoolean(GENERATE_UPDATE_CRAWLDB, false)) {
531 // update the db from tempDir
532 Path tempDir2 =
533 new Path(getConf().get("mapred.temp.dir", ".") +
534 "/generate-temp-"+ System.currentTimeMillis());
535
536 job = new NutchJob(getConf());
537 job.setJobName("generate: updatedb " + dbDir);
538 job.setLong(Nutch.GENERATE_TIME_KEY, generateTime);
539 FileInputFormat.addInputPath(job, tempDir);
540 FileInputFormat.addInputPath(job, new Path(dbDir, CrawlDb.CURRENT_NAME));
541 job.setInputFormat(SequenceFileInputFormat.class);
542 job.setMapperClass(CrawlDbUpdater.class);
543 job.setReducerClass(CrawlDbUpdater.class);
544 job.setOutputFormat(MapFileOutputFormat.class);
545 job.setOutputKeyClass(Text.class);
546 job.setOutputValueClass(CrawlDatum.class);
547 FileOutputFormat.setOutputPath(job, tempDir2);
548 try {
549 JobClient.runJob(job);
550 CrawlDb.install(job, dbDir);
551 } catch (IOException e) {
552 LockUtil.removeLockFile(fs, lock);
553 fs.delete(tempDir, true);
554 fs.delete(tempDir2, true);
555 throw e;
556 }
557 fs.delete(tempDir2, true);
558 }
559 LockUtil.removeLockFile(fs, lock);
560 fs.delete(tempDir, true);
561
562 if (LOG.isInfoEnabled()) { LOG.info("Generator: done."); }
563
564 return segment;
565 }
566
567 private static SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmss");
568
569 public static synchronized String generateSegmentName() {
570 try {
571 Thread.sleep(1000);
572 } catch (Throwable t) {};
573 return sdf.format
574 (new Date(System.currentTimeMillis()));
575 }
576
577 /**
578 * Generate a fetchlist from the crawldb.
579 */
580 public static void main(String args[]) throws Exception {
581 int res = ToolRunner.run(NutchConfiguration.create(), new Generator(), args);
582 System.exit(res);
583 }
584
585 public int run(String[] args) throws Exception {
586 if (args.length < 2) {
587 System.out.println("Usage: Generator <crawldb> <segments_dir> [-force] [-topN N] [-numFetchers numFetchers] [-adddays numDays] [-noFilter]");
588 return -1;
589 }
590
591 Path dbDir = new Path(args[0]);
592 Path segmentsDir = new Path(args[1]);
593 long curTime = System.currentTimeMillis();
594 long topN = Long.MAX_VALUE;
595 int numFetchers = -1;
596 boolean filter = true;
597 boolean force = false;
598
599 for (int i = 2; i < args.length; i++) {
600 if ("-topN".equals(args[i])) {
601 topN = Long.parseLong(args[i+1]);
602 i++;
603 } else if ("-numFetchers".equals(args[i])) {
604 numFetchers = Integer.parseInt(args[i+1]);
605 i++;
606 } else if ("-adddays".equals(args[i])) {
607 long numDays = Integer.parseInt(args[i+1]);
608 curTime += numDays * 1000L * 60 * 60 * 24;
609 } else if ("-noFilter".equals(args[i])) {
610 filter = false;
611 } else if ("-force".equals(args[i])) {
612 force = true;
613 }
614
615 }
616
617 try {
618 Path seg = generate(dbDir, segmentsDir, numFetchers, topN, curTime, filter, force);
619 if (seg == null) return -2;
620 else return 0;
621 } catch (Exception e) {
622 LOG.fatal("Generator: " + StringUtils.stringifyException(e));
623 return -1;
624 }
625 }
626 }