1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18 package org.apache.nutch.searcher;
19
20 import java.io;
21 import java.net.InetSocketAddress;
22 import java.util;
23
24 import javax.servlet;
25
26 import org.apache.commons.logging.Log;
27 import org.apache.commons.logging.LogFactory;
28
29 import org.apache.hadoop.fs;
30 import org.apache.hadoop.fs.FileSystem;
31 import org.apache.hadoop.conf;
32 import org.apache.hadoop.util.StringUtils;
33 import org.apache.nutch.parse;
34 import org.apache.nutch.crawl.Inlinks;
35 import org.apache.nutch.util.NutchConfiguration;
36
37 /**
38 * One stop shopping for search-related functionality.
39 * @version $Id: NutchBean.java,v 1.19 2005/02/07 19:10:08 cutting Exp $
40 */
41 public class NutchBean
42 implements SearchBean, SegmentBean, HitInlinks, Closeable {
43
44 public static final Log LOG = LogFactory.getLog(NutchBean.class);
45 public static final String KEY = "nutchBean";
46
47 // static {
48 // LogFormatter.setShowThreadIDs(true);
49 // }
50
51 private SearchBean searchBean;
52 private SegmentBean segmentBean;
53 private final HitInlinks linkDb;
54
55 /** BooleanQuery won't permit more than 32 required/prohibited clauses. We
56 * don't want to use too many of those. */
57 private static final int MAX_PROHIBITED_TERMS = 20;
58
59 private final Configuration conf;
60
61 private final FileSystem fs;
62
63 /** Returns the cached instance in the servlet context.
64 * @see NutchBeanConstructor*/
65 public static NutchBean get(ServletContext app, Configuration conf) throws IOException {
66 final NutchBean bean = (NutchBean)app.getAttribute(KEY);
67 return bean;
68 }
69
70
71 /**
72 *
73 * @param conf
74 * @throws IOException
75 */
76 public NutchBean(Configuration conf) throws IOException {
77 this(conf, null);
78 }
79
80 /**
81 * Construct in a named directory.
82 *
83 * @param conf
84 * @param dir
85 * @throws IOException
86 */
87 public NutchBean(Configuration conf, Path dir) throws IOException {
88 this.conf = conf;
89 this.fs = FileSystem.get(this.conf);
90 if (dir == null) {
91 dir = new Path(this.conf.get("searcher.dir", "crawl"));
92 }
93 final Path luceneConfig = new Path(dir, "search-servers.txt");
94 final Path solrConfig = new Path(dir, "solr-servers.txt");
95 final Path segmentConfig = new Path(dir, "segment-servers.txt");
96
97 if (fs.exists(luceneConfig) || fs.exists(solrConfig)) {
98 searchBean = new DistributedSearchBean(conf, luceneConfig, solrConfig);
99 } else {
100 final Path indexDir = new Path(dir, "index");
101 final Path indexesDir = new Path(dir, "indexes");
102 searchBean = new LuceneSearchBean(conf, indexDir, indexesDir);
103 }
104
105 if (fs.exists(segmentConfig)) {
106 segmentBean = new DistributedSegmentBean(conf, segmentConfig);
107 } else if (fs.exists(luceneConfig)) {
108 segmentBean = new DistributedSegmentBean(conf, luceneConfig);
109 } else {
110 segmentBean = new FetchedSegments(conf, new Path(dir, "segments"));
111 }
112
113 linkDb = new LinkDbInlinks(fs, new Path(dir, "linkdb"), conf);
114 }
115
116 public static List<InetSocketAddress> readAddresses(Path path,
117 Configuration conf) throws IOException {
118 final List<InetSocketAddress> addrs = new ArrayList<InetSocketAddress>();
119 for (final String line : readConfig(path, conf)) {
120 final StringTokenizer tokens = new StringTokenizer(line);
121 if (tokens.hasMoreTokens()) {
122 final String host = tokens.nextToken();
123 if (tokens.hasMoreTokens()) {
124 final String port = tokens.nextToken();
125 addrs.add(new InetSocketAddress(host, Integer.parseInt(port)));
126 }
127 }
128 }
129 return addrs;
130 }
131
132 public static List<String> readConfig(Path path, Configuration conf)
133 throws IOException {
134 final FileSystem fs = FileSystem.get(conf);
135 final BufferedReader reader =
136 new BufferedReader(new InputStreamReader(fs.open(path)));
137 try {
138 final ArrayList<String> addrs = new ArrayList<String>();
139 String line;
140 while ((line = reader.readLine()) != null) {
141 addrs.add(line);
142 }
143 return addrs;
144 } finally {
145 reader.close();
146 }
147 }
148
149 public String[] getSegmentNames() throws IOException {
150 return segmentBean.getSegmentNames();
151 }
152
153 public Hits search(Query query, int numHits) throws IOException {
154 return search(query, numHits, null, null, false);
155 }
156
157 public Hits search(Query query, int numHits,
158 String dedupField, String sortField, boolean reverse)
159 throws IOException {
160
161 return searchBean.search(query, numHits, dedupField, sortField, reverse);
162 }
163
164 @SuppressWarnings("serial")
165 private class DupHits extends ArrayList<Hit> {
166 private boolean maxSizeExceeded;
167 }
168
169 /** Search for pages matching a query, eliminating excessive hits from the
170 * same site. Hits after the first <code>maxHitsPerDup</code> from the same
171 * site are removed from results. The remaining hits have {@link
172 * Hit#moreFromDupExcluded()} set. <p> If maxHitsPerDup is zero then all
173 * hits are returned.
174 *
175 * @param query query
176 * @param numHits number of requested hits
177 * @param maxHitsPerDup the maximum hits returned with matching values, or zero
178 * @return Hits the matching hits
179 * @throws IOException
180 */
181 public Hits search(Query query, int numHits, int maxHitsPerDup)
182 throws IOException {
183 return search(query, numHits, maxHitsPerDup, "site", null, false);
184 }
185
186 /** Search for pages matching a query, eliminating excessive hits with
187 * matching values for a named field. Hits after the first
188 * <code>maxHitsPerDup</code> are removed from results. The remaining hits
189 * have {@link Hit#moreFromDupExcluded()} set. <p> If maxHitsPerDup is zero
190 * then all hits are returned.
191 *
192 * @param query query
193 * @param numHits number of requested hits
194 * @param maxHitsPerDup the maximum hits returned with matching values, or zero
195 * @param dedupField field name to check for duplicates
196 * @return Hits the matching hits
197 * @throws IOException
198 */
199 public Hits search(Query query, int numHits,
200 int maxHitsPerDup, String dedupField)
201 throws IOException {
202 return search(query, numHits, maxHitsPerDup, dedupField, null, false);
203 }
204 /** Search for pages matching a query, eliminating excessive hits with
205 * matching values for a named field. Hits after the first
206 * <code>maxHitsPerDup</code> are removed from results. The remaining hits
207 * have {@link Hit#moreFromDupExcluded()} set. <p> If maxHitsPerDup is zero
208 * then all hits are returned.
209 *
210 * @param query query
211 * @param numHits number of requested hits
212 * @param maxHitsPerDup the maximum hits returned with matching values, or zero
213 * @param dedupField field name to check for duplicates
214 * @param sortField Field to sort on (or null if no sorting).
215 * @param reverse True if we are to reverse sort by <code>sortField</code>.
216 * @return Hits the matching hits
217 * @throws IOException
218 */
219 public Hits search(Query query, int numHits,
220 int maxHitsPerDup, String dedupField,
221 String sortField, boolean reverse)
222 throws IOException {
223 if (maxHitsPerDup <= 0) // disable dup checking
224 return search(query, numHits, dedupField, sortField, reverse);
225
226 final float rawHitsFactor = this.conf.getFloat("searcher.hostgrouping.rawhits.factor", 2.0f);
227 int numHitsRaw = (int)(numHits * rawHitsFactor);
228 if (LOG.isInfoEnabled()) {
229 LOG.info("searching for "+numHitsRaw+" raw hits");
230 }
231 Hits hits = searchBean.search(query, numHitsRaw,
232 dedupField, sortField, reverse);
233 final long total = hits.getTotal();
234 final Map<String, DupHits> dupToHits = new HashMap<String, DupHits>();
235 final List<Hit> resultList = new ArrayList<Hit>();
236 final Set<Hit> seen = new HashSet<Hit>();
237 final List<String> excludedValues = new ArrayList<String>();
238 boolean totalIsExact = true;
239 for (int rawHitNum = 0; rawHitNum < hits.getTotal(); rawHitNum++) {
240 // get the next raw hit
241 if (rawHitNum >= hits.getLength()) {
242 // optimize query by prohibiting more matches on some excluded values
243 final Query optQuery = (Query)query.clone();
244 for (int i = 0; i < excludedValues.size(); i++) {
245 if (i == MAX_PROHIBITED_TERMS)
246 break;
247 optQuery.addProhibitedTerm(excludedValues.get(i),
248 dedupField);
249 }
250 numHitsRaw = (int)(numHitsRaw * rawHitsFactor);
251 if (LOG.isInfoEnabled()) {
252 LOG.info("re-searching for "+numHitsRaw+" raw hits, query: "+optQuery);
253 }
254 hits = searchBean.search(optQuery, numHitsRaw,
255 dedupField, sortField, reverse);
256 if (LOG.isInfoEnabled()) {
257 LOG.info("found "+hits.getTotal()+" raw hits");
258 }
259 rawHitNum = -1;
260 continue;
261 }
262
263 final Hit hit = hits.getHit(rawHitNum);
264 if (seen.contains(hit))
265 continue;
266 seen.add(hit);
267
268 // get dup hits for its value
269 final String value = hit.getDedupValue();
270 DupHits dupHits = dupToHits.get(value);
271 if (dupHits == null)
272 dupToHits.put(value, dupHits = new DupHits());
273
274 // does this hit exceed maxHitsPerDup?
275 if (dupHits.size() == maxHitsPerDup) { // yes -- ignore the hit
276 if (!dupHits.maxSizeExceeded) {
277
278 // mark prior hits with moreFromDupExcluded
279 for (int i = 0; i < dupHits.size(); i++) {
280 dupHits.get(i).setMoreFromDupExcluded(true);
281 }
282 dupHits.maxSizeExceeded = true;
283
284 excludedValues.add(value); // exclude dup
285 }
286 totalIsExact = false;
287 } else { // no -- collect the hit
288 resultList.add(hit);
289 dupHits.add(hit);
290
291 // are we done?
292 // we need to find one more than asked for, so that we can tell if
293 // there are more hits to be shown
294 if (resultList.size() > numHits)
295 break;
296 }
297 }
298
299 final Hits results =
300 new Hits(total,
301 resultList.toArray(new Hit[resultList.size()]));
302 results.setTotalIsExact(totalIsExact);
303 return results;
304 }
305
306
307 public String getExplanation(Query query, Hit hit) throws IOException {
308 return searchBean.getExplanation(query, hit);
309 }
310
311 public HitDetails getDetails(Hit hit) throws IOException {
312 return searchBean.getDetails(hit);
313 }
314
315 public HitDetails[] getDetails(Hit[] hits) throws IOException {
316 return searchBean.getDetails(hits);
317 }
318
319 public Summary getSummary(HitDetails hit, Query query) throws IOException {
320 return segmentBean.getSummary(hit, query);
321 }
322
323 public Summary[] getSummary(HitDetails[] hits, Query query)
324 throws IOException {
325 return segmentBean.getSummary(hits, query);
326 }
327
328 public byte[] getContent(HitDetails hit) throws IOException {
329 return segmentBean.getContent(hit);
330 }
331
332 public ParseData getParseData(HitDetails hit) throws IOException {
333 return segmentBean.getParseData(hit);
334 }
335
336 public ParseText getParseText(HitDetails hit) throws IOException {
337 return segmentBean.getParseText(hit);
338 }
339
340 public String[] getAnchors(HitDetails hit) throws IOException {
341 return linkDb.getAnchors(hit);
342 }
343
344 public Inlinks getInlinks(HitDetails hit) throws IOException {
345 return linkDb.getInlinks(hit);
346 }
347
348 public long getFetchDate(HitDetails hit) throws IOException {
349 return segmentBean.getFetchDate(hit);
350 }
351
352 public void close() throws IOException {
353 if (searchBean != null) { searchBean.close(); }
354 if (segmentBean != null) { segmentBean.close(); }
355 if (linkDb != null) { linkDb.close(); }
356 if (fs != null) { fs.close(); }
357 }
358
359 public boolean ping() {
360 return true;
361 }
362
363 /** For debugging. */
364 public static void main(String[] args) throws Exception {
365 final String usage = "NutchBean query";
366
367 if (args.length == 0) {
368 System.err.println(usage);
369 System.exit(-1);
370 }
371
372 final Configuration conf = NutchConfiguration.create();
373 final NutchBean bean = new NutchBean(conf);
374 try {
375 final Query query = Query.parse(args[0], conf);
376 final Hits hits = bean.search(query, 10);
377 System.out.println("Total hits: " + hits.getTotal());
378 final int length = (int)Math.min(hits.getTotal(), 10);
379 final Hit[] show = hits.getHits(0, length);
380 final HitDetails[] details = bean.getDetails(show);
381 final Summary[] summaries = bean.getSummary(details, query);
382
383 for (int i = 0; i < hits.getLength(); i++) {
384 System.out.println(" " + i + " " + details[i] + "\n" + summaries[i]);
385 }
386 } catch (Throwable t) {
387 LOG.error("Exception occured while executing search: " + t, t);
388 System.exit(1);
389 }
390 System.exit(0);
391 }
392
393 public long getProtocolVersion(String className, long clientVersion)
394 throws IOException {
395 if(RPCSearchBean.class.getName().equals(className) &&
396 searchBean instanceof RPCSearchBean) {
397
398 final RPCSearchBean rpcBean = (RPCSearchBean)searchBean;
399 return rpcBean.getProtocolVersion(className, clientVersion);
400 } else if (RPCSegmentBean.class.getName().equals(className) &&
401 segmentBean instanceof RPCSegmentBean) {
402
403 final RPCSegmentBean rpcBean = (RPCSegmentBean)segmentBean;
404 return rpcBean.getProtocolVersion(className, clientVersion);
405 } else {
406 throw new IOException("Unknown Protocol classname:" + className);
407 }
408 }
409
410 /** Responsible for constructing a NutchBean singleton instance and
411 * caching it in the servlet context. This class should be registered in
412 * the deployment descriptor as a listener
413 */
414 public static class NutchBeanConstructor implements ServletContextListener {
415
416 public void contextDestroyed(ServletContextEvent sce) { }
417
418 public void contextInitialized(ServletContextEvent sce) {
419 final ServletContext app = sce.getServletContext();
420 final Configuration conf = NutchConfiguration.get(app);
421
422 LOG.info("creating new bean");
423 NutchBean bean = null;
424 try {
425 bean = new NutchBean(conf);
426 app.setAttribute(KEY, bean);
427 }
428 catch (final IOException ex) {
429 LOG.error(StringUtils.stringifyException(ex));
430 }
431 }
432 }
433
434 }