1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18 package org.apache.nutch.crawl;
19
20 import java.io;
21 import java.util;
22
23 // Commons Logging imports
24 import org.apache.commons.logging.Log;
25 import org.apache.commons.logging.LogFactory;
26
27 import org.apache.hadoop.io;
28 import org.apache.hadoop.fs;
29 import org.apache.hadoop.conf;
30 import org.apache.hadoop.mapred;
31 import org.apache.hadoop.util;
32
33 import org.apache.nutch.net;
34 import org.apache.nutch.scoring.ScoringFilterException;
35 import org.apache.nutch.scoring.ScoringFilters;
36 import org.apache.nutch.util.NutchConfiguration;
37 import org.apache.nutch.util.NutchJob;
38
39 /** This class takes a flat file of URLs and adds them to the of pages to be
40 * crawled. Useful for bootstrapping the system. */
41 public class Injector extends Configured implements Tool {
42 public static final Log LOG = LogFactory.getLog(Injector.class);
43
44
45 /** Normalize and filter injected urls. */
46 public static class InjectMapper implements Mapper<WritableComparable, Text, Text, CrawlDatum> {
47 private URLNormalizers urlNormalizers;
48 private int interval;
49 private float scoreInjected;
50 private JobConf jobConf;
51 private URLFilters filters;
52 private ScoringFilters scfilters;
53 private long curTime;
54
55 public void configure(JobConf job) {
56 this.jobConf = job;
57 urlNormalizers = new URLNormalizers(job, URLNormalizers.SCOPE_INJECT);
58 interval = jobConf.getInt("db.fetch.interval.default", 2592000);
59 filters = new URLFilters(jobConf);
60 scfilters = new ScoringFilters(jobConf);
61 scoreInjected = jobConf.getFloat("db.score.injected", 1.0f);
62 curTime = job.getLong("injector.current.time", System.currentTimeMillis());
63 }
64
65 public void close() {}
66
67 public void map(WritableComparable key, Text value,
68 OutputCollector<Text, CrawlDatum> output, Reporter reporter)
69 throws IOException {
70 String url = value.toString(); // value is line of text
71 try {
72 url = urlNormalizers.normalize(url, URLNormalizers.SCOPE_INJECT);
73 url = filters.filter(url); // filter the url
74 } catch (Exception e) {
75 if (LOG.isWarnEnabled()) { LOG.warn("Skipping " +url+":"+e); }
76 url = null;
77 }
78 if (url != null) { // if it passes
79 value.set(url); // collect it
80 CrawlDatum datum = new CrawlDatum(CrawlDatum.STATUS_INJECTED, interval);
81 datum.setFetchTime(curTime);
82 datum.setScore(scoreInjected);
83 try {
84 scfilters.injectedScore(value, datum);
85 } catch (ScoringFilterException e) {
86 if (LOG.isWarnEnabled()) {
87 LOG.warn("Cannot filter injected score for url " + url +
88 ", using default (" + e.getMessage() + ")");
89 }
90 datum.setScore(scoreInjected);
91 }
92 output.collect(value, datum);
93 }
94 }
95 }
96
97 /** Combine multiple new entries for a url. */
98 public static class InjectReducer implements Reducer<Text, CrawlDatum, Text, CrawlDatum> {
99 public void configure(JobConf job) {}
100 public void close() {}
101
102 private CrawlDatum old = new CrawlDatum();
103 private CrawlDatum injected = new CrawlDatum();
104
105 public void reduce(Text key, Iterator<CrawlDatum> values,
106 OutputCollector<Text, CrawlDatum> output, Reporter reporter)
107 throws IOException {
108 boolean oldSet = false;
109 while (values.hasNext()) {
110 CrawlDatum val = values.next();
111 if (val.getStatus() == CrawlDatum.STATUS_INJECTED) {
112 injected.set(val);
113 injected.setStatus(CrawlDatum.STATUS_DB_UNFETCHED);
114 } else {
115 old.set(val);
116 oldSet = true;
117 }
118 }
119 CrawlDatum res = null;
120 if (oldSet) res = old; // don't overwrite existing value
121 else res = injected;
122
123 output.collect(key, res);
124 }
125 }
126
127 public Injector() {}
128
129 public Injector(Configuration conf) {
130 setConf(conf);
131 }
132
133 public void inject(Path crawlDb, Path urlDir) throws IOException {
134
135 if (LOG.isInfoEnabled()) {
136 LOG.info("Injector: starting");
137 LOG.info("Injector: crawlDb: " + crawlDb);
138 LOG.info("Injector: urlDir: " + urlDir);
139 }
140
141 Path tempDir =
142 new Path(getConf().get("mapred.temp.dir", ".") +
143 "/inject-temp-"+
144 Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
145
146 // map text input file to a <url,CrawlDatum> file
147 if (LOG.isInfoEnabled()) {
148 LOG.info("Injector: Converting injected urls to crawl db entries.");
149 }
150 JobConf sortJob = new NutchJob(getConf());
151 sortJob.setJobName("inject " + urlDir);
152 FileInputFormat.addInputPath(sortJob, urlDir);
153 sortJob.setMapperClass(InjectMapper.class);
154
155 FileOutputFormat.setOutputPath(sortJob, tempDir);
156 sortJob.setOutputFormat(SequenceFileOutputFormat.class);
157 sortJob.setOutputKeyClass(Text.class);
158 sortJob.setOutputValueClass(CrawlDatum.class);
159 sortJob.setLong("injector.current.time", System.currentTimeMillis());
160 JobClient.runJob(sortJob);
161
162 // merge with existing crawl db
163 if (LOG.isInfoEnabled()) {
164 LOG.info("Injector: Merging injected urls into crawl db.");
165 }
166 JobConf mergeJob = CrawlDb.createJob(getConf(), crawlDb);
167 FileInputFormat.addInputPath(mergeJob, tempDir);
168 mergeJob.setReducerClass(InjectReducer.class);
169 JobClient.runJob(mergeJob);
170 CrawlDb.install(mergeJob, crawlDb);
171
172 // clean up
173 FileSystem fs = FileSystem.get(getConf());
174 fs.delete(tempDir, true);
175 if (LOG.isInfoEnabled()) { LOG.info("Injector: done"); }
176
177 }
178
179 public static void main(String[] args) throws Exception {
180 int res = ToolRunner.run(NutchConfiguration.create(), new Injector(), args);
181 System.exit(res);
182 }
183
184 public int run(String[] args) throws Exception {
185 if (args.length < 2) {
186 System.err.println("Usage: Injector <crawldb> <url_dir>");
187 return -1;
188 }
189 try {
190 inject(new Path(args[0]), new Path(args[1]));
191 return 0;
192 } catch (Exception e) {
193 LOG.fatal("Injector: " + StringUtils.stringifyException(e));
194 return -1;
195 }
196 }
197
198 }