Save This Page
Home » nutch-1.0 » org.apache.nutch » indexer » [javadoc | source]
    1   /**
    2    * Licensed to the Apache Software Foundation (ASF) under one or more
    3    * contributor license agreements.  See the NOTICE file distributed with
    4    * this work for additional information regarding copyright ownership.
    5    * The ASF licenses this file to You under the Apache License, Version 2.0
    6    * (the "License"); you may not use this file except in compliance with
    7    * the License.  You may obtain a copy of the License at
    8    *
    9    *     http://www.apache.org/licenses/LICENSE-2.0
   10    *
   11    * Unless required by applicable law or agreed to in writing, software
   12    * distributed under the License is distributed on an "AS IS" BASIS,
   13    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   14    * See the License for the specific language governing permissions and
   15    * limitations under the License.
   16    */
   17   
   18   package org.apache.nutch.indexer;
   19   
   20   import java.io;
   21   import java.text.SimpleDateFormat;
   22   import java.util;
   23   
   24   import org.apache.commons.logging.Log;
   25   import org.apache.commons.logging.LogFactory;
   26   
   27   import org.apache.hadoop.io;
   28   import org.apache.hadoop.fs;
   29   import org.apache.hadoop.conf;
   30   import org.apache.hadoop.mapred;
   31   import org.apache.hadoop.util;
   32   
   33   import org.apache.nutch.util.NutchConfiguration;
   34   import org.apache.nutch.util.NutchJob;
   35   
   36   import org.apache.lucene.index.IndexReader;
   37   import org.apache.lucene.document.DateTools;
   38   import org.apache.lucene.document.Document;
   39   
   40   /**
   41    * Delete duplicate documents in a set of Lucene indexes.
   42    * Duplicates have either the same contents (via MD5 hash) or the same URL.
   43    * 
   44    * This tool uses the following algorithm:
   45    * 
   46    * <ul>
   47    * <li><b>Phase 1 - remove URL duplicates:</b><br/>
   48    * In this phase documents with the same URL
   49    * are compared, and only the most recent document is retained -
   50    * all other URL duplicates are scheduled for deletion.</li>
   51    * <li><b>Phase 2 - remove content duplicates:</b><br/>
   52    * In this phase documents with the same content hash are compared. If
   53    * property "dedup.keep.highest.score" is set to true (default) then only
   54    * the document with the highest score is retained. If this property is set
   55    * to false, only the document with the shortest URL is retained - all other
   56    * content duplicates are scheduled for deletion.</li>
   57    * <li><b>Phase 3 - delete documents:</b><br/>
   58    * In this phase documents scheduled for deletion are marked as deleted in
   59    * Lucene index(es).</li>
   60    * </ul>
   61    * 
   62    * @author Andrzej Bialecki
   63    */
   64   public class DeleteDuplicates extends Configured
   65     implements Tool, Mapper<WritableComparable, Writable, Text, IntWritable>, Reducer<Text, IntWritable, WritableComparable, Writable>, OutputFormat<WritableComparable, Writable> {
   66     private static final Log LOG = LogFactory.getLog(DeleteDuplicates.class);
   67   
   68   //   Algorithm:
   69   //      
   70   //   1. map indexes -> <url, <md5, url, time, urlLen, index,doc>>
   71   //      reduce, deleting all but most recent
   72   //
   73   //   2. map indexes -> <md5, <md5, url, time, urlLen, index,doc>>
   74   //      partition by md5
   75   //      reduce, deleting all but with highest score (or shortest url).
   76   
   77     public static class IndexDoc implements WritableComparable {
   78       private Text url = new Text();
   79       private int urlLen;
   80       private float score;
   81       private long time;
   82       private MD5Hash hash = new MD5Hash();
   83       private Text index = new Text();              // the segment index
   84       private int doc;                              // within the index
   85       private boolean keep = true;                  // keep or discard
   86   
   87       public String toString() {
   88         return "[url=" + url + ",score=" + score + ",time=" + time
   89           + ",hash=" + hash + ",index=" + index + ",doc=" + doc
   90           + ",keep=" + keep + "]";
   91       }
   92       
   93       public void write(DataOutput out) throws IOException {
   94         url.write(out);
   95         out.writeFloat(score);
   96         out.writeLong(time);
   97         hash.write(out);
   98         index.write(out);
   99         out.writeInt(doc);
  100         out.writeBoolean(keep);
  101       }
  102   
  103       public void readFields(DataInput in) throws IOException {
  104         url.readFields(in);
  105         urlLen = url.getLength();
  106         score = in.readFloat();
  107         time = in.readLong();
  108         hash.readFields(in);
  109         index.readFields(in);
  110         doc = in.readInt();
  111         keep = in.readBoolean();
  112       }
  113   
  114       public int compareTo(Object o) {
  115         IndexDoc that = (IndexDoc)o;
  116         if (this.keep != that.keep) {
  117           return this.keep ? 1 : -1; 
  118         } else if (!this.hash.equals(that.hash)) {       // order first by hash
  119           return this.hash.compareTo(that.hash);
  120         } else if (this.time != that.time) {      // prefer more recent docs
  121           return this.time > that.time ? 1 : -1 ;
  122         } else if (this.urlLen != that.urlLen) {  // prefer shorter urls
  123           return this.urlLen - that.urlLen;
  124         } else {
  125           return this.score > that.score ? 1 : -1;
  126         }
  127       }
  128   
  129       public boolean equals(Object o) {
  130         IndexDoc that = (IndexDoc)o;
  131         return this.keep == that.keep
  132           && this.hash.equals(that.hash)
  133           && this.time == that.time
  134           && this.score == that.score
  135           && this.urlLen == that.urlLen
  136           && this.index.equals(that.index) 
  137           && this.doc == that.doc;
  138       }
  139   
  140     }
  141   
  142     public static class InputFormat extends FileInputFormat<Text, IndexDoc> {
  143       private static final long INDEX_LENGTH = Integer.MAX_VALUE;
  144   
  145       /** Return each index as a split. */
  146       public InputSplit[] getSplits(JobConf job, int numSplits)
  147         throws IOException {
  148         FileStatus[] files = listStatus(job);
  149         InputSplit[] splits = new InputSplit[files.length];
  150         for (int i = 0; i < files.length; i++) {
  151           FileStatus cur = files[i];
  152           splits[i] = new FileSplit(cur.getPath(), 0, INDEX_LENGTH, (String[])null);
  153         }
  154         return splits;
  155       }
  156   
  157       public class DDRecordReader implements RecordReader<Text, IndexDoc> {
  158   
  159         private IndexReader indexReader;
  160         private int maxDoc = 0;
  161         private int doc = 0;
  162         private Text index;
  163         
  164         public DDRecordReader(FileSplit split, JobConf job,
  165             Text index) throws IOException {
  166           try {
  167             indexReader = IndexReader.open(new FsDirectory(FileSystem.get(job), split.getPath(), false, job));
  168             maxDoc = indexReader.maxDoc();
  169           } catch (IOException ioe) {
  170             LOG.warn("Can't open index at " + split + ", skipping. (" + ioe.getMessage() + ")");
  171             indexReader = null;
  172           }
  173           this.index = index;
  174         }
  175   
  176         public boolean next(Text key, IndexDoc indexDoc)
  177           throws IOException {
  178           
  179           // skip empty indexes
  180           if (indexReader == null || maxDoc <= 0)
  181             return false;
  182   
  183           // skip deleted documents
  184           while (doc < maxDoc && indexReader.isDeleted(doc)) doc++;
  185           if (doc >= maxDoc)
  186             return false;
  187   
  188           Document document = indexReader.document(doc);
  189   
  190           // fill in key
  191           key.set(document.get("url"));
  192           // fill in value
  193           indexDoc.keep = true;
  194           indexDoc.url.set(document.get("url"));
  195           indexDoc.hash.setDigest(document.get("digest"));
  196           indexDoc.score = Float.parseFloat(document.get("boost"));
  197           try {
  198             indexDoc.time = DateTools.stringToTime(document.get("tstamp"));
  199           } catch (Exception e) {
  200             // try to figure out the time from segment name
  201             try {
  202               String segname = document.get("segment");
  203               indexDoc.time = new SimpleDateFormat("yyyyMMddHHmmss").parse(segname).getTime();
  204               // make it unique
  205               indexDoc.time += doc;
  206             } catch (Exception e1) {
  207               // use current time
  208               indexDoc.time = System.currentTimeMillis();
  209             }
  210           }
  211           indexDoc.index = index;
  212           indexDoc.doc = doc;
  213   
  214           doc++;
  215   
  216           return true;
  217         }
  218   
  219         public long getPos() throws IOException {
  220           return maxDoc == 0 ? 0 : (doc*INDEX_LENGTH)/maxDoc;
  221         }
  222   
  223         public void close() throws IOException {
  224           if (indexReader != null) indexReader.close();
  225         }
  226         
  227         public Text createKey() {
  228           return new Text();
  229         }
  230         
  231         public IndexDoc createValue() {
  232           return new IndexDoc();
  233         }
  234   
  235         public float getProgress() throws IOException {
  236           return maxDoc == 0 ? 0.0f : (float)doc / (float)maxDoc;
  237         }
  238       }
  239       
  240       /** Return each index as a split. */
  241       public RecordReader<Text, IndexDoc> getRecordReader(InputSplit split,
  242                                           JobConf job,
  243                                           Reporter reporter) throws IOException {
  244         FileSplit fsplit = (FileSplit)split;
  245         Text index = new Text(fsplit.getPath().toString());
  246         reporter.setStatus(index.toString());
  247         return new DDRecordReader(fsplit, job, index);
  248       }
  249     }
  250     
  251     public static class HashPartitioner implements Partitioner<MD5Hash, Writable> {
  252       public void configure(JobConf job) {}
  253       public void close() {}
  254       public int getPartition(MD5Hash key, Writable value,
  255                               int numReduceTasks) {
  256         int hashCode = key.hashCode();
  257         return (hashCode & Integer.MAX_VALUE) % numReduceTasks;
  258       }
  259     }
  260   
  261     public static class UrlsReducer implements Reducer<Text, IndexDoc, MD5Hash, IndexDoc> {
  262       
  263       public void configure(JobConf job) {}
  264       
  265       public void close() {}
  266       
  267       private IndexDoc latest = new IndexDoc();
  268       
  269       public void reduce(Text key, Iterator<IndexDoc> values,
  270           OutputCollector<MD5Hash, IndexDoc> output, Reporter reporter) throws IOException {
  271         WritableUtils.cloneInto(latest, values.next());
  272         while (values.hasNext()) {
  273           IndexDoc value = values.next();
  274           if (value.time > latest.time) {
  275             // discard current and use more recent
  276             latest.keep = false;
  277             LOG.debug("-discard " + latest + ", keep " + value);
  278             output.collect(latest.hash, latest);
  279             WritableUtils.cloneInto(latest, value);
  280           } else {
  281             // discard
  282             value.keep = false;
  283             LOG.debug("-discard " + value + ", keep " + latest);
  284             output.collect(value.hash, value);
  285           }
  286           
  287         }
  288         // keep the latest
  289         latest.keep = true;
  290         output.collect(latest.hash, latest);
  291         
  292       }
  293     }
  294     
  295     public static class HashReducer implements Reducer<MD5Hash, IndexDoc, Text, IndexDoc> {
  296       boolean byScore;
  297       
  298       public void configure(JobConf job) {
  299         byScore = job.getBoolean("dedup.keep.highest.score", true);
  300       }
  301       
  302       public void close() {}
  303       
  304       private IndexDoc highest = new IndexDoc();
  305       
  306       public void reduce(MD5Hash key, Iterator<IndexDoc> values,
  307                          OutputCollector<Text, IndexDoc> output, Reporter reporter)
  308         throws IOException {
  309         boolean highestSet = false;
  310         while (values.hasNext()) {
  311           IndexDoc value = values.next();
  312           // skip already deleted
  313           if (!value.keep) {
  314             LOG.debug("-discard " + value + " (already marked)");
  315             output.collect(value.url, value);
  316             continue;
  317           }
  318           if (!highestSet) {
  319             WritableUtils.cloneInto(highest, value);
  320             highestSet = true;
  321             continue;
  322           }
  323           IndexDoc toDelete = null, toKeep = null;
  324           boolean metric = byScore ? (value.score > highest.score) : 
  325                                      (value.urlLen < highest.urlLen);
  326           if (metric) {
  327             toDelete = highest;
  328             toKeep = value;
  329           } else {
  330             toDelete = value;
  331             toKeep = highest;
  332           }
  333           
  334           if (LOG.isDebugEnabled()) {
  335             LOG.debug("-discard " + toDelete + ", keep " + toKeep);
  336           }
  337           
  338           toDelete.keep = false;
  339           output.collect(toDelete.url, toDelete);
  340           WritableUtils.cloneInto(highest, toKeep);
  341         }    
  342         LOG.debug("-keep " + highest);
  343         // no need to add this - in phase 2 we only process docs to delete them
  344         // highest.keep = true;
  345         // output.collect(key, highest);
  346       }
  347     }
  348       
  349     private FileSystem fs;
  350   
  351     public void configure(JobConf job) {
  352       setConf(job);
  353     }
  354     
  355     public void setConf(Configuration conf) {
  356       super.setConf(conf);
  357       try {
  358         if(conf != null) fs = FileSystem.get(conf);
  359       } catch (IOException e) {
  360         throw new RuntimeException(e);
  361       }
  362     }
  363   
  364     public void close() {}
  365   
  366     /** Map [*,IndexDoc] pairs to [index,doc] pairs. */
  367     public void map(WritableComparable key, Writable value,
  368                     OutputCollector<Text, IntWritable> output, Reporter reporter)
  369       throws IOException {
  370       IndexDoc indexDoc = (IndexDoc)value;
  371       // don't delete these
  372       if (indexDoc.keep) return;
  373       // delete all others
  374       output.collect(indexDoc.index, new IntWritable(indexDoc.doc));
  375     }
  376   
  377     /** Delete docs named in values from index named in key. */
  378     public void reduce(Text key, Iterator<IntWritable> values,
  379                        OutputCollector<WritableComparable, Writable> output, Reporter reporter)
  380       throws IOException {
  381       Path index = new Path(key.toString());
  382       IndexReader reader = IndexReader.open(new FsDirectory(fs, index, false, getConf()));
  383       try {
  384         while (values.hasNext()) {
  385           IntWritable value = values.next();
  386           LOG.debug("-delete " + index + " doc=" + value);
  387           reader.deleteDocument(value.get());
  388         }
  389       } finally {
  390         reader.close();
  391       }
  392     }
  393   
  394     /** Write nothing. */
  395     public RecordWriter<WritableComparable, Writable> getRecordWriter(final FileSystem fs,
  396                                         final JobConf job,
  397                                         final String name,
  398                                         final Progressable progress) throws IOException {
  399       return new RecordWriter<WritableComparable, Writable>() {                   
  400           public void write(WritableComparable key, Writable value)
  401             throws IOException {
  402             throw new UnsupportedOperationException();
  403           }        
  404           public void close(Reporter reporter) throws IOException {}
  405         };
  406     }
  407   
  408     public DeleteDuplicates() {
  409       
  410     }
  411     
  412     public DeleteDuplicates(Configuration conf) {
  413       setConf(conf);
  414     }
  415     
  416     public void checkOutputSpecs(FileSystem fs, JobConf job) {}
  417   
  418     public void dedup(Path[] indexDirs)
  419       throws IOException {
  420   
  421       if (LOG.isInfoEnabled()) { LOG.info("Dedup: starting"); }
  422   
  423       Path outDir1 =
  424         new Path("dedup-urls-"+
  425                  Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
  426   
  427       JobConf job = new NutchJob(getConf());
  428   
  429       for (int i = 0; i < indexDirs.length; i++) {
  430         if (LOG.isInfoEnabled()) {
  431           LOG.info("Dedup: adding indexes in: " + indexDirs[i]);
  432         }
  433         FileInputFormat.addInputPath(job, indexDirs[i]);
  434       }
  435       job.setJobName("dedup 1: urls by time");
  436   
  437       job.setInputFormat(InputFormat.class);
  438       job.setMapOutputKeyClass(Text.class);
  439       job.setMapOutputValueClass(IndexDoc.class);
  440   
  441       job.setReducerClass(UrlsReducer.class);
  442       FileOutputFormat.setOutputPath(job, outDir1);
  443   
  444       job.setOutputKeyClass(MD5Hash.class);
  445       job.setOutputValueClass(IndexDoc.class);
  446       job.setOutputFormat(SequenceFileOutputFormat.class);
  447   
  448       JobClient.runJob(job);
  449   
  450       Path outDir2 =
  451         new Path("dedup-hash-"+
  452                  Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
  453       job = new NutchJob(getConf());
  454       job.setJobName("dedup 2: content by hash");
  455   
  456       FileInputFormat.addInputPath(job, outDir1);
  457       job.setInputFormat(SequenceFileInputFormat.class);
  458       job.setMapOutputKeyClass(MD5Hash.class);
  459       job.setMapOutputValueClass(IndexDoc.class);
  460       job.setPartitionerClass(HashPartitioner.class);
  461       job.setSpeculativeExecution(false);
  462       
  463       job.setReducerClass(HashReducer.class);
  464       FileOutputFormat.setOutputPath(job, outDir2);
  465   
  466       job.setOutputKeyClass(Text.class);
  467       job.setOutputValueClass(IndexDoc.class);
  468       job.setOutputFormat(SequenceFileOutputFormat.class);
  469   
  470       JobClient.runJob(job);
  471   
  472       // remove outDir1 - no longer needed
  473       fs.delete(outDir1, true);
  474       
  475       job = new NutchJob(getConf());
  476       job.setJobName("dedup 3: delete from index(es)");
  477   
  478       FileInputFormat.addInputPath(job, outDir2);
  479       job.setInputFormat(SequenceFileInputFormat.class);
  480       //job.setInputKeyClass(Text.class);
  481       //job.setInputValueClass(IndexDoc.class);
  482   
  483       job.setInt("io.file.buffer.size", 4096);
  484       job.setMapperClass(DeleteDuplicates.class);
  485       job.setReducerClass(DeleteDuplicates.class);
  486   
  487       job.setOutputFormat(DeleteDuplicates.class);
  488       job.setOutputKeyClass(Text.class);
  489       job.setOutputValueClass(IntWritable.class);
  490   
  491       JobClient.runJob(job);
  492   
  493       fs.delete(outDir2, true);
  494   
  495       if (LOG.isInfoEnabled()) { LOG.info("Dedup: done"); }
  496     }
  497   
  498     public static void main(String[] args) throws Exception {
  499       int res = ToolRunner.run(NutchConfiguration.create(), new DeleteDuplicates(), args);
  500       System.exit(res);
  501     }
  502     
  503     public int run(String[] args) throws Exception {
  504       
  505       if (args.length < 1) {
  506         System.err.println("Usage: DeleteDuplicates <indexes> ...");
  507         return -1;
  508       }
  509       
  510       Path[] indexes = new Path[args.length];
  511       for (int i = 0; i < args.length; i++) {
  512         indexes[i] = new Path(args[i]);
  513       }
  514       try {
  515         dedup(indexes);
  516         return 0;
  517       } catch (Exception e) {
  518         LOG.fatal("DeleteDuplicates: " + StringUtils.stringifyException(e));
  519         return -1;
  520       }
  521     }
  522   
  523   }

Save This Page
Home » nutch-1.0 » org.apache.nutch » indexer » [javadoc | source]