1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18 package org.apache.nutch.indexer;
19
20 import java.io;
21 import java.text.SimpleDateFormat;
22 import java.util;
23
24 import org.apache.commons.logging.Log;
25 import org.apache.commons.logging.LogFactory;
26
27 import org.apache.hadoop.io;
28 import org.apache.hadoop.fs;
29 import org.apache.hadoop.conf;
30 import org.apache.hadoop.mapred;
31 import org.apache.hadoop.util;
32
33 import org.apache.nutch.util.NutchConfiguration;
34 import org.apache.nutch.util.NutchJob;
35
36 import org.apache.lucene.index.IndexReader;
37 import org.apache.lucene.document.DateTools;
38 import org.apache.lucene.document.Document;
39
40 /**
41 * Delete duplicate documents in a set of Lucene indexes.
42 * Duplicates have either the same contents (via MD5 hash) or the same URL.
43 *
44 * This tool uses the following algorithm:
45 *
46 * <ul>
47 * <li><b>Phase 1 - remove URL duplicates:</b><br/>
48 * In this phase documents with the same URL
49 * are compared, and only the most recent document is retained -
50 * all other URL duplicates are scheduled for deletion.</li>
51 * <li><b>Phase 2 - remove content duplicates:</b><br/>
52 * In this phase documents with the same content hash are compared. If
53 * property "dedup.keep.highest.score" is set to true (default) then only
54 * the document with the highest score is retained. If this property is set
55 * to false, only the document with the shortest URL is retained - all other
56 * content duplicates are scheduled for deletion.</li>
57 * <li><b>Phase 3 - delete documents:</b><br/>
58 * In this phase documents scheduled for deletion are marked as deleted in
59 * Lucene index(es).</li>
60 * </ul>
61 *
62 * @author Andrzej Bialecki
63 */
64 public class DeleteDuplicates extends Configured
65 implements Tool, Mapper<WritableComparable, Writable, Text, IntWritable>, Reducer<Text, IntWritable, WritableComparable, Writable>, OutputFormat<WritableComparable, Writable> {
66 private static final Log LOG = LogFactory.getLog(DeleteDuplicates.class);
67
68 // Algorithm:
69 //
70 // 1. map indexes -> <url, <md5, url, time, urlLen, index,doc>>
71 // reduce, deleting all but most recent
72 //
73 // 2. map indexes -> <md5, <md5, url, time, urlLen, index,doc>>
74 // partition by md5
75 // reduce, deleting all but with highest score (or shortest url).
76
77 public static class IndexDoc implements WritableComparable {
78 private Text url = new Text();
79 private int urlLen;
80 private float score;
81 private long time;
82 private MD5Hash hash = new MD5Hash();
83 private Text index = new Text(); // the segment index
84 private int doc; // within the index
85 private boolean keep = true; // keep or discard
86
87 public String toString() {
88 return "[url=" + url + ",score=" + score + ",time=" + time
89 + ",hash=" + hash + ",index=" + index + ",doc=" + doc
90 + ",keep=" + keep + "]";
91 }
92
93 public void write(DataOutput out) throws IOException {
94 url.write(out);
95 out.writeFloat(score);
96 out.writeLong(time);
97 hash.write(out);
98 index.write(out);
99 out.writeInt(doc);
100 out.writeBoolean(keep);
101 }
102
103 public void readFields(DataInput in) throws IOException {
104 url.readFields(in);
105 urlLen = url.getLength();
106 score = in.readFloat();
107 time = in.readLong();
108 hash.readFields(in);
109 index.readFields(in);
110 doc = in.readInt();
111 keep = in.readBoolean();
112 }
113
114 public int compareTo(Object o) {
115 IndexDoc that = (IndexDoc)o;
116 if (this.keep != that.keep) {
117 return this.keep ? 1 : -1;
118 } else if (!this.hash.equals(that.hash)) { // order first by hash
119 return this.hash.compareTo(that.hash);
120 } else if (this.time != that.time) { // prefer more recent docs
121 return this.time > that.time ? 1 : -1 ;
122 } else if (this.urlLen != that.urlLen) { // prefer shorter urls
123 return this.urlLen - that.urlLen;
124 } else {
125 return this.score > that.score ? 1 : -1;
126 }
127 }
128
129 public boolean equals(Object o) {
130 IndexDoc that = (IndexDoc)o;
131 return this.keep == that.keep
132 && this.hash.equals(that.hash)
133 && this.time == that.time
134 && this.score == that.score
135 && this.urlLen == that.urlLen
136 && this.index.equals(that.index)
137 && this.doc == that.doc;
138 }
139
140 }
141
142 public static class InputFormat extends FileInputFormat<Text, IndexDoc> {
143 private static final long INDEX_LENGTH = Integer.MAX_VALUE;
144
145 /** Return each index as a split. */
146 public InputSplit[] getSplits(JobConf job, int numSplits)
147 throws IOException {
148 FileStatus[] files = listStatus(job);
149 InputSplit[] splits = new InputSplit[files.length];
150 for (int i = 0; i < files.length; i++) {
151 FileStatus cur = files[i];
152 splits[i] = new FileSplit(cur.getPath(), 0, INDEX_LENGTH, (String[])null);
153 }
154 return splits;
155 }
156
157 public class DDRecordReader implements RecordReader<Text, IndexDoc> {
158
159 private IndexReader indexReader;
160 private int maxDoc = 0;
161 private int doc = 0;
162 private Text index;
163
164 public DDRecordReader(FileSplit split, JobConf job,
165 Text index) throws IOException {
166 try {
167 indexReader = IndexReader.open(new FsDirectory(FileSystem.get(job), split.getPath(), false, job));
168 maxDoc = indexReader.maxDoc();
169 } catch (IOException ioe) {
170 LOG.warn("Can't open index at " + split + ", skipping. (" + ioe.getMessage() + ")");
171 indexReader = null;
172 }
173 this.index = index;
174 }
175
176 public boolean next(Text key, IndexDoc indexDoc)
177 throws IOException {
178
179 // skip empty indexes
180 if (indexReader == null || maxDoc <= 0)
181 return false;
182
183 // skip deleted documents
184 while (doc < maxDoc && indexReader.isDeleted(doc)) doc++;
185 if (doc >= maxDoc)
186 return false;
187
188 Document document = indexReader.document(doc);
189
190 // fill in key
191 key.set(document.get("url"));
192 // fill in value
193 indexDoc.keep = true;
194 indexDoc.url.set(document.get("url"));
195 indexDoc.hash.setDigest(document.get("digest"));
196 indexDoc.score = Float.parseFloat(document.get("boost"));
197 try {
198 indexDoc.time = DateTools.stringToTime(document.get("tstamp"));
199 } catch (Exception e) {
200 // try to figure out the time from segment name
201 try {
202 String segname = document.get("segment");
203 indexDoc.time = new SimpleDateFormat("yyyyMMddHHmmss").parse(segname).getTime();
204 // make it unique
205 indexDoc.time += doc;
206 } catch (Exception e1) {
207 // use current time
208 indexDoc.time = System.currentTimeMillis();
209 }
210 }
211 indexDoc.index = index;
212 indexDoc.doc = doc;
213
214 doc++;
215
216 return true;
217 }
218
219 public long getPos() throws IOException {
220 return maxDoc == 0 ? 0 : (doc*INDEX_LENGTH)/maxDoc;
221 }
222
223 public void close() throws IOException {
224 if (indexReader != null) indexReader.close();
225 }
226
227 public Text createKey() {
228 return new Text();
229 }
230
231 public IndexDoc createValue() {
232 return new IndexDoc();
233 }
234
235 public float getProgress() throws IOException {
236 return maxDoc == 0 ? 0.0f : (float)doc / (float)maxDoc;
237 }
238 }
239
240 /** Return each index as a split. */
241 public RecordReader<Text, IndexDoc> getRecordReader(InputSplit split,
242 JobConf job,
243 Reporter reporter) throws IOException {
244 FileSplit fsplit = (FileSplit)split;
245 Text index = new Text(fsplit.getPath().toString());
246 reporter.setStatus(index.toString());
247 return new DDRecordReader(fsplit, job, index);
248 }
249 }
250
251 public static class HashPartitioner implements Partitioner<MD5Hash, Writable> {
252 public void configure(JobConf job) {}
253 public void close() {}
254 public int getPartition(MD5Hash key, Writable value,
255 int numReduceTasks) {
256 int hashCode = key.hashCode();
257 return (hashCode & Integer.MAX_VALUE) % numReduceTasks;
258 }
259 }
260
261 public static class UrlsReducer implements Reducer<Text, IndexDoc, MD5Hash, IndexDoc> {
262
263 public void configure(JobConf job) {}
264
265 public void close() {}
266
267 private IndexDoc latest = new IndexDoc();
268
269 public void reduce(Text key, Iterator<IndexDoc> values,
270 OutputCollector<MD5Hash, IndexDoc> output, Reporter reporter) throws IOException {
271 WritableUtils.cloneInto(latest, values.next());
272 while (values.hasNext()) {
273 IndexDoc value = values.next();
274 if (value.time > latest.time) {
275 // discard current and use more recent
276 latest.keep = false;
277 LOG.debug("-discard " + latest + ", keep " + value);
278 output.collect(latest.hash, latest);
279 WritableUtils.cloneInto(latest, value);
280 } else {
281 // discard
282 value.keep = false;
283 LOG.debug("-discard " + value + ", keep " + latest);
284 output.collect(value.hash, value);
285 }
286
287 }
288 // keep the latest
289 latest.keep = true;
290 output.collect(latest.hash, latest);
291
292 }
293 }
294
295 public static class HashReducer implements Reducer<MD5Hash, IndexDoc, Text, IndexDoc> {
296 boolean byScore;
297
298 public void configure(JobConf job) {
299 byScore = job.getBoolean("dedup.keep.highest.score", true);
300 }
301
302 public void close() {}
303
304 private IndexDoc highest = new IndexDoc();
305
306 public void reduce(MD5Hash key, Iterator<IndexDoc> values,
307 OutputCollector<Text, IndexDoc> output, Reporter reporter)
308 throws IOException {
309 boolean highestSet = false;
310 while (values.hasNext()) {
311 IndexDoc value = values.next();
312 // skip already deleted
313 if (!value.keep) {
314 LOG.debug("-discard " + value + " (already marked)");
315 output.collect(value.url, value);
316 continue;
317 }
318 if (!highestSet) {
319 WritableUtils.cloneInto(highest, value);
320 highestSet = true;
321 continue;
322 }
323 IndexDoc toDelete = null, toKeep = null;
324 boolean metric = byScore ? (value.score > highest.score) :
325 (value.urlLen < highest.urlLen);
326 if (metric) {
327 toDelete = highest;
328 toKeep = value;
329 } else {
330 toDelete = value;
331 toKeep = highest;
332 }
333
334 if (LOG.isDebugEnabled()) {
335 LOG.debug("-discard " + toDelete + ", keep " + toKeep);
336 }
337
338 toDelete.keep = false;
339 output.collect(toDelete.url, toDelete);
340 WritableUtils.cloneInto(highest, toKeep);
341 }
342 LOG.debug("-keep " + highest);
343 // no need to add this - in phase 2 we only process docs to delete them
344 // highest.keep = true;
345 // output.collect(key, highest);
346 }
347 }
348
349 private FileSystem fs;
350
351 public void configure(JobConf job) {
352 setConf(job);
353 }
354
355 public void setConf(Configuration conf) {
356 super.setConf(conf);
357 try {
358 if(conf != null) fs = FileSystem.get(conf);
359 } catch (IOException e) {
360 throw new RuntimeException(e);
361 }
362 }
363
364 public void close() {}
365
366 /** Map [*,IndexDoc] pairs to [index,doc] pairs. */
367 public void map(WritableComparable key, Writable value,
368 OutputCollector<Text, IntWritable> output, Reporter reporter)
369 throws IOException {
370 IndexDoc indexDoc = (IndexDoc)value;
371 // don't delete these
372 if (indexDoc.keep) return;
373 // delete all others
374 output.collect(indexDoc.index, new IntWritable(indexDoc.doc));
375 }
376
377 /** Delete docs named in values from index named in key. */
378 public void reduce(Text key, Iterator<IntWritable> values,
379 OutputCollector<WritableComparable, Writable> output, Reporter reporter)
380 throws IOException {
381 Path index = new Path(key.toString());
382 IndexReader reader = IndexReader.open(new FsDirectory(fs, index, false, getConf()));
383 try {
384 while (values.hasNext()) {
385 IntWritable value = values.next();
386 LOG.debug("-delete " + index + " doc=" + value);
387 reader.deleteDocument(value.get());
388 }
389 } finally {
390 reader.close();
391 }
392 }
393
394 /** Write nothing. */
395 public RecordWriter<WritableComparable, Writable> getRecordWriter(final FileSystem fs,
396 final JobConf job,
397 final String name,
398 final Progressable progress) throws IOException {
399 return new RecordWriter<WritableComparable, Writable>() {
400 public void write(WritableComparable key, Writable value)
401 throws IOException {
402 throw new UnsupportedOperationException();
403 }
404 public void close(Reporter reporter) throws IOException {}
405 };
406 }
407
408 public DeleteDuplicates() {
409
410 }
411
412 public DeleteDuplicates(Configuration conf) {
413 setConf(conf);
414 }
415
416 public void checkOutputSpecs(FileSystem fs, JobConf job) {}
417
418 public void dedup(Path[] indexDirs)
419 throws IOException {
420
421 if (LOG.isInfoEnabled()) { LOG.info("Dedup: starting"); }
422
423 Path outDir1 =
424 new Path("dedup-urls-"+
425 Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
426
427 JobConf job = new NutchJob(getConf());
428
429 for (int i = 0; i < indexDirs.length; i++) {
430 if (LOG.isInfoEnabled()) {
431 LOG.info("Dedup: adding indexes in: " + indexDirs[i]);
432 }
433 FileInputFormat.addInputPath(job, indexDirs[i]);
434 }
435 job.setJobName("dedup 1: urls by time");
436
437 job.setInputFormat(InputFormat.class);
438 job.setMapOutputKeyClass(Text.class);
439 job.setMapOutputValueClass(IndexDoc.class);
440
441 job.setReducerClass(UrlsReducer.class);
442 FileOutputFormat.setOutputPath(job, outDir1);
443
444 job.setOutputKeyClass(MD5Hash.class);
445 job.setOutputValueClass(IndexDoc.class);
446 job.setOutputFormat(SequenceFileOutputFormat.class);
447
448 JobClient.runJob(job);
449
450 Path outDir2 =
451 new Path("dedup-hash-"+
452 Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
453 job = new NutchJob(getConf());
454 job.setJobName("dedup 2: content by hash");
455
456 FileInputFormat.addInputPath(job, outDir1);
457 job.setInputFormat(SequenceFileInputFormat.class);
458 job.setMapOutputKeyClass(MD5Hash.class);
459 job.setMapOutputValueClass(IndexDoc.class);
460 job.setPartitionerClass(HashPartitioner.class);
461 job.setSpeculativeExecution(false);
462
463 job.setReducerClass(HashReducer.class);
464 FileOutputFormat.setOutputPath(job, outDir2);
465
466 job.setOutputKeyClass(Text.class);
467 job.setOutputValueClass(IndexDoc.class);
468 job.setOutputFormat(SequenceFileOutputFormat.class);
469
470 JobClient.runJob(job);
471
472 // remove outDir1 - no longer needed
473 fs.delete(outDir1, true);
474
475 job = new NutchJob(getConf());
476 job.setJobName("dedup 3: delete from index(es)");
477
478 FileInputFormat.addInputPath(job, outDir2);
479 job.setInputFormat(SequenceFileInputFormat.class);
480 //job.setInputKeyClass(Text.class);
481 //job.setInputValueClass(IndexDoc.class);
482
483 job.setInt("io.file.buffer.size", 4096);
484 job.setMapperClass(DeleteDuplicates.class);
485 job.setReducerClass(DeleteDuplicates.class);
486
487 job.setOutputFormat(DeleteDuplicates.class);
488 job.setOutputKeyClass(Text.class);
489 job.setOutputValueClass(IntWritable.class);
490
491 JobClient.runJob(job);
492
493 fs.delete(outDir2, true);
494
495 if (LOG.isInfoEnabled()) { LOG.info("Dedup: done"); }
496 }
497
498 public static void main(String[] args) throws Exception {
499 int res = ToolRunner.run(NutchConfiguration.create(), new DeleteDuplicates(), args);
500 System.exit(res);
501 }
502
503 public int run(String[] args) throws Exception {
504
505 if (args.length < 1) {
506 System.err.println("Usage: DeleteDuplicates <indexes> ...");
507 return -1;
508 }
509
510 Path[] indexes = new Path[args.length];
511 for (int i = 0; i < args.length; i++) {
512 indexes[i] = new Path(args[i]);
513 }
514 try {
515 dedup(indexes);
516 return 0;
517 } catch (Exception e) {
518 LOG.fatal("DeleteDuplicates: " + StringUtils.stringifyException(e));
519 return -1;
520 }
521 }
522
523 }