Save This Page
Home » nutch-1.0 » org.apache.nutch » indexer » [javadoc | source]
    1   /**
    2    * Licensed to the Apache Software Foundation (ASF) under one or more
    3    * contributor license agreements.  See the NOTICE file distributed with
    4    * this work for additional information regarding copyright ownership.
    5    * The ASF licenses this file to You under the Apache License, Version 2.0
    6    * (the "License"); you may not use this file except in compliance with
    7    * the License.  You may obtain a copy of the License at
    8    *
    9    *     http://www.apache.org/licenses/LICENSE-2.0
   10    *
   11    * Unless required by applicable law or agreed to in writing, software
   12    * distributed under the License is distributed on an "AS IS" BASIS,
   13    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   14    * See the License for the specific language governing permissions and
   15    * limitations under the License.
   16    */
   17   
   18   package org.apache.nutch.indexer;
   19   
   20   import java.io;
   21   import java.util;
   22   
   23   import org.apache.commons.logging.Log;
   24   import org.apache.commons.logging.LogFactory;
   25   
   26   import org.apache.hadoop.fs;
   27   import org.apache.hadoop.mapred.FileAlreadyExistsException;
   28   import org.apache.hadoop.util;
   29   import org.apache.hadoop.conf;
   30   
   31   import org.apache.nutch.util.HadoopFSUtil;
   32   import org.apache.nutch.util.LogUtil;
   33   import org.apache.nutch.util.NutchConfiguration;
   34   
   35   import org.apache.lucene.store.Directory;
   36   import org.apache.lucene.index.IndexWriter;
   37   
   38   /*************************************************************************
   39    * IndexMerger creates an index for the output corresponding to a 
   40    * single fetcher run.
   41    * 
   42    * @author Doug Cutting
   43    * @author Mike Cafarella
   44    *************************************************************************/
   45   public class IndexMerger extends Configured implements Tool {
   46     public static final Log LOG = LogFactory.getLog(IndexMerger.class);
   47   
   48     public static final String DONE_NAME = "merge.done";
   49   
   50     public IndexMerger() {
   51       
   52     }
   53     
   54     public IndexMerger(Configuration conf) {
   55       setConf(conf);
   56     }
   57     
   58     /**
   59      * Merge all input indexes to the single output index
   60      */
   61     public void merge(Path[] indexes, Path outputIndex, Path localWorkingDir) throws IOException {
   62       LOG.info("merging indexes to: " + outputIndex);
   63   
   64       FileSystem localFs = FileSystem.getLocal(getConf());  
   65       if (localFs.exists(localWorkingDir)) {
   66         localFs.delete(localWorkingDir, true);
   67       }
   68       localFs.mkdirs(localWorkingDir);
   69   
   70       // Get local output target
   71       //
   72       FileSystem fs = FileSystem.get(getConf());
   73       if (fs.exists(outputIndex)) {
   74         throw new FileAlreadyExistsException("Output directory " + outputIndex + " already exists!");
   75       }
   76   
   77       Path tmpLocalOutput = new Path(localWorkingDir, "merge-output");
   78       Path localOutput = fs.startLocalOutput(outputIndex, tmpLocalOutput);
   79   
   80       Directory[] dirs = new Directory[indexes.length];
   81       for (int i = 0; i < indexes.length; i++) {
   82         if (LOG.isInfoEnabled()) { LOG.info("Adding " + indexes[i]); }
   83         dirs[i] = new FsDirectory(fs, indexes[i], false, getConf());
   84       }
   85   
   86       //
   87       // Merge indices
   88       //
   89       IndexWriter writer = new IndexWriter(localOutput.toString(), null, true);
   90       writer.setMergeFactor(getConf().getInt("indexer.mergeFactor", IndexWriter.DEFAULT_MERGE_FACTOR));
   91       writer.setMaxBufferedDocs(getConf().getInt("indexer.minMergeDocs", IndexWriter.DEFAULT_MAX_BUFFERED_DOCS));
   92       writer.setMaxMergeDocs(getConf().getInt("indexer.maxMergeDocs", IndexWriter.DEFAULT_MAX_MERGE_DOCS));
   93       writer.setTermIndexInterval(getConf().getInt("indexer.termIndexInterval", IndexWriter.DEFAULT_TERM_INDEX_INTERVAL));
   94       writer.setInfoStream(LogUtil.getDebugStream(LOG));
   95       writer.setUseCompoundFile(false);
   96       writer.setSimilarity(new NutchSimilarity());
   97       writer.addIndexes(dirs);
   98       writer.close();
   99   
  100       //
  101       // Put target back
  102       //
  103       fs.completeLocalOutput(outputIndex, tmpLocalOutput);
  104       LOG.info("done merging");
  105     }
  106   
  107     /** 
  108      * Create an index for the input files in the named directory. 
  109      */
  110     public static void main(String[] args) throws Exception {
  111       int res = ToolRunner.run(NutchConfiguration.create(), new IndexMerger(), args);
  112       System.exit(res);
  113     }
  114     
  115     public int run(String[] args) throws Exception {
  116       String usage = "IndexMerger [-workingdir <workingdir>] outputIndex indexesDir...";
  117       if (args.length < 2) {
  118         System.err.println("Usage: " + usage);
  119         return -1;
  120       }
  121   
  122       //
  123       // Parse args, read all index directories to be processed
  124       //
  125       FileSystem fs = FileSystem.get(getConf());
  126       List<Path> indexDirs = new ArrayList<Path>();
  127   
  128       Path workDir = new Path("indexmerger-" + System.currentTimeMillis());  
  129       int i = 0;
  130       if ("-workingdir".equals(args[i])) {
  131         i++;
  132         workDir = new Path(args[i++], "indexmerger-" + System.currentTimeMillis());
  133       }
  134   
  135       Path outputIndex = new Path(args[i++]);
  136   
  137       for (; i < args.length; i++) {
  138         FileStatus[] fstats = fs.listStatus(new Path(args[i]), HadoopFSUtil.getPassDirectoriesFilter(fs));
  139         indexDirs.addAll(Arrays.asList(HadoopFSUtil.getPaths(fstats)));
  140       }
  141   
  142       //
  143       // Merge the indices
  144       //
  145   
  146       Path[] indexFiles = (Path[])indexDirs.toArray(new Path[indexDirs.size()]);
  147   
  148       try {
  149         merge(indexFiles, outputIndex, workDir);
  150         return 0;
  151       } catch (Exception e) {
  152         LOG.fatal("IndexMerger: " + StringUtils.stringifyException(e));
  153         return -1;
  154       } finally {
  155         FileSystem.getLocal(getConf()).delete(workDir, true);
  156       }
  157     }
  158   }

Save This Page
Home » nutch-1.0 » org.apache.nutch » indexer » [javadoc | source]