1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18 package org.apache.nutch.indexer;
19
20 import java.io;
21 import java.util;
22
23 import org.apache.commons.logging.Log;
24 import org.apache.commons.logging.LogFactory;
25
26 import org.apache.hadoop.fs;
27 import org.apache.hadoop.mapred.FileAlreadyExistsException;
28 import org.apache.hadoop.util;
29 import org.apache.hadoop.conf;
30
31 import org.apache.nutch.util.HadoopFSUtil;
32 import org.apache.nutch.util.LogUtil;
33 import org.apache.nutch.util.NutchConfiguration;
34
35 import org.apache.lucene.store.Directory;
36 import org.apache.lucene.index.IndexWriter;
37
38 /*************************************************************************
39 * IndexMerger creates an index for the output corresponding to a
40 * single fetcher run.
41 *
42 * @author Doug Cutting
43 * @author Mike Cafarella
44 *************************************************************************/
45 public class IndexMerger extends Configured implements Tool {
46 public static final Log LOG = LogFactory.getLog(IndexMerger.class);
47
48 public static final String DONE_NAME = "merge.done";
49
50 public IndexMerger() {
51
52 }
53
54 public IndexMerger(Configuration conf) {
55 setConf(conf);
56 }
57
58 /**
59 * Merge all input indexes to the single output index
60 */
61 public void merge(Path[] indexes, Path outputIndex, Path localWorkingDir) throws IOException {
62 LOG.info("merging indexes to: " + outputIndex);
63
64 FileSystem localFs = FileSystem.getLocal(getConf());
65 if (localFs.exists(localWorkingDir)) {
66 localFs.delete(localWorkingDir, true);
67 }
68 localFs.mkdirs(localWorkingDir);
69
70 // Get local output target
71 //
72 FileSystem fs = FileSystem.get(getConf());
73 if (fs.exists(outputIndex)) {
74 throw new FileAlreadyExistsException("Output directory " + outputIndex + " already exists!");
75 }
76
77 Path tmpLocalOutput = new Path(localWorkingDir, "merge-output");
78 Path localOutput = fs.startLocalOutput(outputIndex, tmpLocalOutput);
79
80 Directory[] dirs = new Directory[indexes.length];
81 for (int i = 0; i < indexes.length; i++) {
82 if (LOG.isInfoEnabled()) { LOG.info("Adding " + indexes[i]); }
83 dirs[i] = new FsDirectory(fs, indexes[i], false, getConf());
84 }
85
86 //
87 // Merge indices
88 //
89 IndexWriter writer = new IndexWriter(localOutput.toString(), null, true);
90 writer.setMergeFactor(getConf().getInt("indexer.mergeFactor", IndexWriter.DEFAULT_MERGE_FACTOR));
91 writer.setMaxBufferedDocs(getConf().getInt("indexer.minMergeDocs", IndexWriter.DEFAULT_MAX_BUFFERED_DOCS));
92 writer.setMaxMergeDocs(getConf().getInt("indexer.maxMergeDocs", IndexWriter.DEFAULT_MAX_MERGE_DOCS));
93 writer.setTermIndexInterval(getConf().getInt("indexer.termIndexInterval", IndexWriter.DEFAULT_TERM_INDEX_INTERVAL));
94 writer.setInfoStream(LogUtil.getDebugStream(LOG));
95 writer.setUseCompoundFile(false);
96 writer.setSimilarity(new NutchSimilarity());
97 writer.addIndexes(dirs);
98 writer.close();
99
100 //
101 // Put target back
102 //
103 fs.completeLocalOutput(outputIndex, tmpLocalOutput);
104 LOG.info("done merging");
105 }
106
107 /**
108 * Create an index for the input files in the named directory.
109 */
110 public static void main(String[] args) throws Exception {
111 int res = ToolRunner.run(NutchConfiguration.create(), new IndexMerger(), args);
112 System.exit(res);
113 }
114
115 public int run(String[] args) throws Exception {
116 String usage = "IndexMerger [-workingdir <workingdir>] outputIndex indexesDir...";
117 if (args.length < 2) {
118 System.err.println("Usage: " + usage);
119 return -1;
120 }
121
122 //
123 // Parse args, read all index directories to be processed
124 //
125 FileSystem fs = FileSystem.get(getConf());
126 List<Path> indexDirs = new ArrayList<Path>();
127
128 Path workDir = new Path("indexmerger-" + System.currentTimeMillis());
129 int i = 0;
130 if ("-workingdir".equals(args[i])) {
131 i++;
132 workDir = new Path(args[i++], "indexmerger-" + System.currentTimeMillis());
133 }
134
135 Path outputIndex = new Path(args[i++]);
136
137 for (; i < args.length; i++) {
138 FileStatus[] fstats = fs.listStatus(new Path(args[i]), HadoopFSUtil.getPassDirectoriesFilter(fs));
139 indexDirs.addAll(Arrays.asList(HadoopFSUtil.getPaths(fstats)));
140 }
141
142 //
143 // Merge the indices
144 //
145
146 Path[] indexFiles = (Path[])indexDirs.toArray(new Path[indexDirs.size()]);
147
148 try {
149 merge(indexFiles, outputIndex, workDir);
150 return 0;
151 } catch (Exception e) {
152 LOG.fatal("IndexMerger: " + StringUtils.stringifyException(e));
153 return -1;
154 } finally {
155 FileSystem.getLocal(getConf()).delete(workDir, true);
156 }
157 }
158 }