1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18 package org.apache.nutch.crawl;
19
20 import java.util.ArrayList;
21 import java.util.Iterator;
22 import java.io.IOException;
23
24 // Commons Logging imports
25 import org.apache.commons.logging.Log;
26 import org.apache.commons.logging.LogFactory;
27
28 import org.apache.hadoop.io;
29 import org.apache.hadoop.mapred;
30 import org.apache.nutch.metadata.Nutch;
31 import org.apache.nutch.scoring.ScoringFilterException;
32 import org.apache.nutch.scoring.ScoringFilters;
33
34 /** Merge new page entries with existing entries. */
35 public class CrawlDbReducer implements Reducer<Text, CrawlDatum, Text, CrawlDatum> {
36 public static final Log LOG = LogFactory.getLog(CrawlDbReducer.class);
37
38 private int retryMax;
39 private CrawlDatum result = new CrawlDatum();
40 private ArrayList<CrawlDatum> linked = new ArrayList<CrawlDatum>();
41 private ScoringFilters scfilters = null;
42 private boolean additionsAllowed;
43 private int maxInterval;
44 private FetchSchedule schedule;
45 private CrawlDatum fetch = new CrawlDatum();
46 private CrawlDatum old = new CrawlDatum();
47
48 public void configure(JobConf job) {
49 retryMax = job.getInt("db.fetch.retry.max", 3);
50 scfilters = new ScoringFilters(job);
51 additionsAllowed = job.getBoolean(CrawlDb.CRAWLDB_ADDITIONS_ALLOWED, true);
52 int oldMaxInterval = job.getInt("db.max.fetch.interval", 0);
53 maxInterval = job.getInt("db.fetch.interval.max", 0 );
54 if (oldMaxInterval > 0 && maxInterval == 0) maxInterval = oldMaxInterval * FetchSchedule.SECONDS_PER_DAY;
55 schedule = FetchScheduleFactory.getFetchSchedule(job);
56 }
57
58 public void close() {}
59
60 public void reduce(Text key, Iterator<CrawlDatum> values,
61 OutputCollector<Text, CrawlDatum> output, Reporter reporter)
62 throws IOException {
63
64 boolean fetchSet = false;
65 boolean oldSet = false;
66 byte[] signature = null;
67 linked.clear();
68
69 while (values.hasNext()) {
70 CrawlDatum datum = (CrawlDatum)values.next();
71 if (CrawlDatum.hasDbStatus(datum)) {
72 if (!oldSet) {
73 old.set(datum);
74 oldSet = true;
75 } else {
76 // always take the latest version
77 if (old.getFetchTime() < datum.getFetchTime()) old.set(datum);
78 }
79 continue;
80 }
81
82 if (CrawlDatum.hasFetchStatus(datum)) {
83 if (!fetchSet) {
84 fetch.set(datum);
85 fetchSet = true;
86 } else {
87 // always take the latest version
88 if (fetch.getFetchTime() < datum.getFetchTime()) fetch.set(datum);
89 }
90 continue;
91 }
92
93 switch (datum.getStatus()) { // collect other info
94 case CrawlDatum.STATUS_LINKED:
95 CrawlDatum link = new CrawlDatum();
96 link.set(datum);
97 linked.add(link);
98 break;
99 case CrawlDatum.STATUS_SIGNATURE:
100 signature = datum.getSignature();
101 break;
102 default:
103 LOG.warn("Unknown status, key: " + key + ", datum: " + datum);
104 }
105 }
106
107 // if it doesn't already exist, skip it
108 if (!oldSet && !additionsAllowed) return;
109
110 // if there is no fetched datum, perhaps there is a link
111 if (!fetchSet && linked.size() > 0) {
112 fetch = linked.get(0);
113 fetchSet = true;
114 }
115
116 // still no new data - record only unchanged old data, if exists, and return
117 if (!fetchSet) {
118 if (oldSet) // at this point at least "old" should be present
119 output.collect(key, old);
120 else
121 LOG.warn("Missing fetch and old value, signature=" + signature);
122 return;
123 }
124
125 if (signature == null) signature = fetch.getSignature();
126 long prevModifiedTime = oldSet ? old.getModifiedTime() : 0L;
127 long prevFetchTime = oldSet ? old.getFetchTime() : 0L;
128
129 // initialize with the latest version, be it fetch or link
130 result.set(fetch);
131 if (oldSet) {
132 // copy metadata from old, if exists
133 if (old.getMetaData().size() > 0) {
134 result.putAllMetaData(old);
135 // overlay with new, if any
136 if (fetch.getMetaData().size() > 0)
137 result.putAllMetaData(fetch);
138 }
139 // set the most recent valid value of modifiedTime
140 if (old.getModifiedTime() > 0 && fetch.getModifiedTime() == 0) {
141 result.setModifiedTime(old.getModifiedTime());
142 }
143 }
144
145 switch (fetch.getStatus()) { // determine new status
146
147 case CrawlDatum.STATUS_LINKED: // it was link
148 if (oldSet) { // if old exists
149 result.set(old); // use it
150 } else {
151 result = schedule.initializeSchedule((Text)key, result);
152 result.setStatus(CrawlDatum.STATUS_DB_UNFETCHED);
153 try {
154 scfilters.initialScore((Text)key, result);
155 } catch (ScoringFilterException e) {
156 if (LOG.isWarnEnabled()) {
157 LOG.warn("Cannot filter init score for url " + key +
158 ", using default: " + e.getMessage());
159 }
160 result.setScore(0.0f);
161 }
162 }
163 break;
164
165 case CrawlDatum.STATUS_FETCH_SUCCESS: // succesful fetch
166 case CrawlDatum.STATUS_FETCH_REDIR_TEMP: // successful fetch, redirected
167 case CrawlDatum.STATUS_FETCH_REDIR_PERM:
168 case CrawlDatum.STATUS_FETCH_NOTMODIFIED: // successful fetch, notmodified
169 // determine the modification status
170 int modified = FetchSchedule.STATUS_UNKNOWN;
171 if (fetch.getStatus() == CrawlDatum.STATUS_FETCH_NOTMODIFIED) {
172 modified = FetchSchedule.STATUS_NOTMODIFIED;
173 } else {
174 if (oldSet && old.getSignature() != null && signature != null) {
175 if (SignatureComparator._compare(old.getSignature(), signature) != 0) {
176 modified = FetchSchedule.STATUS_MODIFIED;
177 } else {
178 modified = FetchSchedule.STATUS_NOTMODIFIED;
179 }
180 }
181 }
182 // set the schedule
183 result = schedule.setFetchSchedule((Text)key, result, prevFetchTime,
184 prevModifiedTime, fetch.getFetchTime(), fetch.getModifiedTime(), modified);
185 // set the result status and signature
186 if (modified == FetchSchedule.STATUS_NOTMODIFIED) {
187 result.setStatus(CrawlDatum.STATUS_DB_NOTMODIFIED);
188 if (oldSet) result.setSignature(old.getSignature());
189 } else {
190 switch (fetch.getStatus()) {
191 case CrawlDatum.STATUS_FETCH_SUCCESS:
192 result.setStatus(CrawlDatum.STATUS_DB_FETCHED);
193 break;
194 case CrawlDatum.STATUS_FETCH_REDIR_PERM:
195 result.setStatus(CrawlDatum.STATUS_DB_REDIR_PERM);
196 break;
197 case CrawlDatum.STATUS_FETCH_REDIR_TEMP:
198 result.setStatus(CrawlDatum.STATUS_DB_REDIR_TEMP);
199 break;
200 default:
201 LOG.warn("Unexpected status: " + fetch.getStatus() + " resetting to old status.");
202 if (oldSet) result.setStatus(old.getStatus());
203 else result.setStatus(CrawlDatum.STATUS_DB_UNFETCHED);
204 }
205 result.setSignature(signature);
206 }
207 // if fetchInterval is larger than the system-wide maximum, trigger
208 // an unconditional recrawl. This prevents the page to be stuck at
209 // NOTMODIFIED state, when the old fetched copy was already removed with
210 // old segments.
211 if (maxInterval < result.getFetchInterval())
212 result = schedule.forceRefetch((Text)key, result, false);
213 break;
214 case CrawlDatum.STATUS_SIGNATURE:
215 if (LOG.isWarnEnabled()) {
216 LOG.warn("Lone CrawlDatum.STATUS_SIGNATURE: " + key);
217 }
218 return;
219 case CrawlDatum.STATUS_FETCH_RETRY: // temporary failure
220 if (oldSet) {
221 result.setSignature(old.getSignature()); // use old signature
222 }
223 result = schedule.setPageRetrySchedule((Text)key, result, prevFetchTime,
224 prevModifiedTime, fetch.getFetchTime());
225 if (result.getRetriesSinceFetch() < retryMax) {
226 result.setStatus(CrawlDatum.STATUS_DB_UNFETCHED);
227 } else {
228 result.setStatus(CrawlDatum.STATUS_DB_GONE);
229 }
230 break;
231
232 case CrawlDatum.STATUS_FETCH_GONE: // permanent failure
233 if (oldSet)
234 result.setSignature(old.getSignature()); // use old signature
235 result.setStatus(CrawlDatum.STATUS_DB_GONE);
236 result = schedule.setPageGoneSchedule((Text)key, result, prevFetchTime,
237 prevModifiedTime, fetch.getFetchTime());
238 break;
239
240 default:
241 throw new RuntimeException("Unknown status: " + fetch.getStatus() + " " + key);
242 }
243
244 try {
245 scfilters.updateDbScore((Text)key, oldSet ? old : null, result, linked);
246 } catch (Exception e) {
247 if (LOG.isWarnEnabled()) {
248 LOG.warn("Couldn't update score, key=" + key + ": " + e);
249 }
250 }
251 // remove generation time, if any
252 result.getMetaData().remove(Nutch.WRITABLE_GENERATE_TIME_KEY);
253 output.collect(key, result);
254 }
255
256 }