1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18 package org.apache.nutch.crawl;
19
20 import java.io;
21 import java.util;
22 import java.net;
23
24 // Commons Logging imports
25 import org.apache.commons.logging.Log;
26 import org.apache.commons.logging.LogFactory;
27
28 import org.apache.hadoop.io;
29 import org.apache.hadoop.fs;
30 import org.apache.hadoop.fs.FileSystem;
31 import org.apache.hadoop.conf;
32 import org.apache.hadoop.mapred;
33 import org.apache.hadoop.util;
34
35 import org.apache.nutch.net.URLFilters;
36 import org.apache.nutch.net.URLNormalizers;
37 import org.apache.nutch.parse;
38 import org.apache.nutch.util.HadoopFSUtil;
39 import org.apache.nutch.util.LockUtil;
40 import org.apache.nutch.util.NutchConfiguration;
41 import org.apache.nutch.util.NutchJob;
42
43 /** Maintains an inverted link map, listing incoming links for each url. */
44 public class LinkDb extends Configured implements Tool, Mapper<Text, ParseData, Text, Inlinks> {
45
46 public static final Log LOG = LogFactory.getLog(LinkDb.class);
47
48 public static final String CURRENT_NAME = "current";
49 public static final String LOCK_NAME = ".locked";
50
51 private int maxAnchorLength;
52 private boolean ignoreInternalLinks;
53 private URLFilters urlFilters;
54 private URLNormalizers urlNormalizers;
55
56 public LinkDb() {}
57
58 public LinkDb(Configuration conf) {
59 setConf(conf);
60 }
61
62 public void configure(JobConf job) {
63 maxAnchorLength = job.getInt("db.max.anchor.length", 100);
64 ignoreInternalLinks = job.getBoolean("db.ignore.internal.links", true);
65 if (job.getBoolean(LinkDbFilter.URL_FILTERING, false)) {
66 urlFilters = new URLFilters(job);
67 }
68 if (job.getBoolean(LinkDbFilter.URL_NORMALIZING, false)) {
69 urlNormalizers = new URLNormalizers(job, URLNormalizers.SCOPE_LINKDB);
70 }
71 }
72
73 public void close() {}
74
75 public void map(Text key, ParseData parseData,
76 OutputCollector<Text, Inlinks> output, Reporter reporter)
77 throws IOException {
78 String fromUrl = key.toString();
79 String fromHost = getHost(fromUrl);
80 if (urlNormalizers != null) {
81 try {
82 fromUrl = urlNormalizers.normalize(fromUrl, URLNormalizers.SCOPE_LINKDB); // normalize the url
83 } catch (Exception e) {
84 LOG.warn("Skipping " + fromUrl + ":" + e);
85 fromUrl = null;
86 }
87 }
88 if (fromUrl != null && urlFilters != null) {
89 try {
90 fromUrl = urlFilters.filter(fromUrl); // filter the url
91 } catch (Exception e) {
92 LOG.warn("Skipping " + fromUrl + ":" + e);
93 fromUrl = null;
94 }
95 }
96 if (fromUrl == null) return; // discard all outlinks
97 Outlink[] outlinks = parseData.getOutlinks();
98 Inlinks inlinks = new Inlinks();
99 for (int i = 0; i < outlinks.length; i++) {
100 Outlink outlink = outlinks[i];
101 String toUrl = outlink.getToUrl();
102
103 if (ignoreInternalLinks) {
104 String toHost = getHost(toUrl);
105 if (toHost == null || toHost.equals(fromHost)) { // internal link
106 continue; // skip it
107 }
108 }
109 if (urlNormalizers != null) {
110 try {
111 toUrl = urlNormalizers.normalize(toUrl, URLNormalizers.SCOPE_LINKDB); // normalize the url
112 } catch (Exception e) {
113 LOG.warn("Skipping " + toUrl + ":" + e);
114 toUrl = null;
115 }
116 }
117 if (toUrl != null && urlFilters != null) {
118 try {
119 toUrl = urlFilters.filter(toUrl); // filter the url
120 } catch (Exception e) {
121 LOG.warn("Skipping " + toUrl + ":" + e);
122 toUrl = null;
123 }
124 }
125 if (toUrl == null) continue;
126 inlinks.clear();
127 String anchor = outlink.getAnchor(); // truncate long anchors
128 if (anchor.length() > maxAnchorLength) {
129 anchor = anchor.substring(0, maxAnchorLength);
130 }
131 inlinks.add(new Inlink(fromUrl, anchor)); // collect inverted link
132 output.collect(new Text(toUrl), inlinks);
133 }
134 }
135
136 private String getHost(String url) {
137 try {
138 return new URL(url).getHost().toLowerCase();
139 } catch (MalformedURLException e) {
140 return null;
141 }
142 }
143
144 public void invert(Path linkDb, final Path segmentsDir, boolean normalize, boolean filter, boolean force) throws IOException {
145 final FileSystem fs = FileSystem.get(getConf());
146 FileStatus[] files = fs.listStatus(segmentsDir, HadoopFSUtil.getPassDirectoriesFilter(fs));
147 invert(linkDb, HadoopFSUtil.getPaths(files), normalize, filter, force);
148 }
149
150 public void invert(Path linkDb, Path[] segments, boolean normalize, boolean filter, boolean force) throws IOException {
151
152 Path lock = new Path(linkDb, LOCK_NAME);
153 FileSystem fs = FileSystem.get(getConf());
154 LockUtil.createLockFile(fs, lock, force);
155 Path currentLinkDb = new Path(linkDb, CURRENT_NAME);
156 if (LOG.isInfoEnabled()) {
157 LOG.info("LinkDb: starting");
158 LOG.info("LinkDb: linkdb: " + linkDb);
159 LOG.info("LinkDb: URL normalize: " + normalize);
160 LOG.info("LinkDb: URL filter: " + filter);
161 }
162 JobConf job = LinkDb.createJob(getConf(), linkDb, normalize, filter);
163 for (int i = 0; i < segments.length; i++) {
164 if (LOG.isInfoEnabled()) {
165 LOG.info("LinkDb: adding segment: " + segments[i]);
166 }
167 FileInputFormat.addInputPath(job, new Path(segments[i], ParseData.DIR_NAME));
168 }
169 try {
170 JobClient.runJob(job);
171 } catch (IOException e) {
172 LockUtil.removeLockFile(fs, lock);
173 throw e;
174 }
175 if (fs.exists(currentLinkDb)) {
176 if (LOG.isInfoEnabled()) {
177 LOG.info("LinkDb: merging with existing linkdb: " + linkDb);
178 }
179 // try to merge
180 Path newLinkDb = FileOutputFormat.getOutputPath(job);
181 job = LinkDbMerger.createMergeJob(getConf(), linkDb, normalize, filter);
182 FileInputFormat.addInputPath(job, currentLinkDb);
183 FileInputFormat.addInputPath(job, newLinkDb);
184 try {
185 JobClient.runJob(job);
186 } catch (IOException e) {
187 LockUtil.removeLockFile(fs, lock);
188 fs.delete(newLinkDb, true);
189 throw e;
190 }
191 fs.delete(newLinkDb, true);
192 }
193 LinkDb.install(job, linkDb);
194 if (LOG.isInfoEnabled()) { LOG.info("LinkDb: done"); }
195 }
196
197 private static JobConf createJob(Configuration config, Path linkDb, boolean normalize, boolean filter) {
198 Path newLinkDb =
199 new Path("linkdb-" +
200 Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
201
202 JobConf job = new NutchJob(config);
203 job.setJobName("linkdb " + linkDb);
204
205 job.setInputFormat(SequenceFileInputFormat.class);
206
207 job.setMapperClass(LinkDb.class);
208 job.setCombinerClass(LinkDbMerger.class);
209 // if we don't run the mergeJob, perform normalization/filtering now
210 if (normalize || filter) {
211 try {
212 FileSystem fs = FileSystem.get(config);
213 if (!fs.exists(linkDb)) {
214 job.setBoolean(LinkDbFilter.URL_FILTERING, filter);
215 job.setBoolean(LinkDbFilter.URL_NORMALIZING, normalize);
216 }
217 } catch (Exception e) {
218 LOG.warn("LinkDb createJob: " + e);
219 }
220 }
221 job.setReducerClass(LinkDbMerger.class);
222
223 FileOutputFormat.setOutputPath(job, newLinkDb);
224 job.setOutputFormat(MapFileOutputFormat.class);
225 job.setBoolean("mapred.output.compress", true);
226 job.setOutputKeyClass(Text.class);
227 job.setOutputValueClass(Inlinks.class);
228
229 return job;
230 }
231
232 public static void install(JobConf job, Path linkDb) throws IOException {
233 Path newLinkDb = FileOutputFormat.getOutputPath(job);
234 FileSystem fs = new JobClient(job).getFs();
235 Path old = new Path(linkDb, "old");
236 Path current = new Path(linkDb, CURRENT_NAME);
237 if (fs.exists(current)) {
238 if (fs.exists(old)) fs.delete(old, true);
239 fs.rename(current, old);
240 }
241 fs.mkdirs(linkDb);
242 fs.rename(newLinkDb, current);
243 if (fs.exists(old)) fs.delete(old, true);
244 LockUtil.removeLockFile(fs, new Path(linkDb, LOCK_NAME));
245 }
246
247 public static void main(String[] args) throws Exception {
248 int res = ToolRunner.run(NutchConfiguration.create(), new LinkDb(), args);
249 System.exit(res);
250 }
251
252 public int run(String[] args) throws Exception {
253 if (args.length < 2) {
254 System.err.println("Usage: LinkDb <linkdb> (-dir <segmentsDir> | <seg1> <seg2> ...) [-force] [-noNormalize] [-noFilter]");
255 System.err.println("\tlinkdb\toutput LinkDb to create or update");
256 System.err.println("\t-dir segmentsDir\tparent directory of several segments, OR");
257 System.err.println("\tseg1 seg2 ...\t list of segment directories");
258 System.err.println("\t-force\tforce update even if LinkDb appears to be locked (CAUTION advised)");
259 System.err.println("\t-noNormalize\tdon't normalize link URLs");
260 System.err.println("\t-noFilter\tdon't apply URLFilters to link URLs");
261 return -1;
262 }
263 Path segDir = null;
264 final FileSystem fs = FileSystem.get(getConf());
265 Path db = new Path(args[0]);
266 ArrayList<Path> segs = new ArrayList<Path>();
267 boolean filter = true;
268 boolean normalize = true;
269 boolean force = false;
270 for (int i = 1; i < args.length; i++) {
271 if (args[i].equals("-dir")) {
272 segDir = new Path(args[++i]);
273 FileStatus[] files = fs.listStatus(segDir, HadoopFSUtil.getPassDirectoriesFilter(fs));
274 if (files != null) segs.addAll(Arrays.asList(HadoopFSUtil.getPaths(files)));
275 break;
276 } else if (args[i].equalsIgnoreCase("-noNormalize")) {
277 normalize = false;
278 } else if (args[i].equalsIgnoreCase("-noFilter")) {
279 filter = false;
280 } else if (args[i].equalsIgnoreCase("-force")) {
281 force = true;
282 } else segs.add(new Path(args[i]));
283 }
284 try {
285 invert(db, segs.toArray(new Path[segs.size()]), normalize, filter, force);
286 return 0;
287 } catch (Exception e) {
288 LOG.fatal("LinkDb: " + StringUtils.stringifyException(e));
289 return -1;
290 }
291 }
292
293 }