| Method from org.apache.nutch.segment.SegmentReader Detail: |
public void close() {
}
|
public void configure(JobConf job) {
setConf(job);
this.co = getConf().getBoolean("segment.reader.co", true);
this.fe = getConf().getBoolean("segment.reader.fe", true);
this.ge = getConf().getBoolean("segment.reader.ge", true);
this.pa = getConf().getBoolean("segment.reader.pa", true);
this.pd = getConf().getBoolean("segment.reader.pd", true);
this.pt = getConf().getBoolean("segment.reader.pt", true);
try {
this.fs = FileSystem.get(getConf());
} catch (IOException e) {
e.printStackTrace(LogUtil.getWarnStream(LOG));
}
}
|
public void dump(Path segment,
Path output) throws IOException {
if (LOG.isInfoEnabled()) {
LOG.info("SegmentReader: dump segment: " + segment);
}
JobConf job = createJobConf();
job.setJobName("read " + segment);
if (ge) FileInputFormat.addInputPath(job, new Path(segment, CrawlDatum.GENERATE_DIR_NAME));
if (fe) FileInputFormat.addInputPath(job, new Path(segment, CrawlDatum.FETCH_DIR_NAME));
if (pa) FileInputFormat.addInputPath(job, new Path(segment, CrawlDatum.PARSE_DIR_NAME));
if (co) FileInputFormat.addInputPath(job, new Path(segment, Content.DIR_NAME));
if (pd) FileInputFormat.addInputPath(job, new Path(segment, ParseData.DIR_NAME));
if (pt) FileInputFormat.addInputPath(job, new Path(segment, ParseText.DIR_NAME));
job.setInputFormat(SequenceFileInputFormat.class);
job.setMapperClass(InputCompatMapper.class);
job.setReducerClass(SegmentReader.class);
Path tempDir = new Path(job.get("hadoop.tmp.dir", "/tmp") + "/segread-" + new java.util.Random().nextInt());
fs.delete(tempDir, true);
FileOutputFormat.setOutputPath(job, tempDir);
job.setOutputFormat(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NutchWritable.class);
JobClient.runJob(job);
// concatenate the output
Path dumpFile = new Path(output, job.get("segment.dump.dir", "dump"));
// remove the old file
fs.delete(dumpFile, true);
FileStatus[] fstats = fs.listStatus(tempDir, HadoopFSUtil.getPassAllFilter());
Path[] files = HadoopFSUtil.getPaths(fstats);
PrintWriter writer = null;
int currentRecordNumber = 0;
if (files.length > 0) {
writer = new PrintWriter(new BufferedWriter(new OutputStreamWriter(fs.create(dumpFile))));
try {
for (int i = 0; i < files.length; i++) {
Path partFile = (Path) files[i];
try {
currentRecordNumber = append(fs, job, partFile, writer, currentRecordNumber);
} catch (IOException exception) {
if (LOG.isWarnEnabled()) {
LOG.warn("Couldn't copy the content of " + partFile.toString() +
" into " + dumpFile.toString());
LOG.warn(exception.getMessage());
}
}
}
} finally {
writer.close();
}
}
fs.delete(tempDir);
if (LOG.isInfoEnabled()) { LOG.info("SegmentReader: done"); }
}
|
public void get(Path segment,
Text key,
Writer writer,
Map results) throws Exception {
LOG.info("SegmentReader: get '" + key + "'");
ArrayList< Thread > threads = new ArrayList< Thread >();
if (co) threads.add(new Thread() {
public void run() {
try {
List< Writable > res = getMapRecords(new Path(segment, Content.DIR_NAME), key);
results.put("co", res);
} catch (Exception e) {
e.printStackTrace(LogUtil.getWarnStream(LOG));
}
}
});
if (fe) threads.add(new Thread() {
public void run() {
try {
List< Writable > res = getMapRecords(new Path(segment, CrawlDatum.FETCH_DIR_NAME), key);
results.put("fe", res);
} catch (Exception e) {
e.printStackTrace(LogUtil.getWarnStream(LOG));
}
}
});
if (ge) threads.add(new Thread() {
public void run() {
try {
List< Writable > res = getSeqRecords(new Path(segment, CrawlDatum.GENERATE_DIR_NAME), key);
results.put("ge", res);
} catch (Exception e) {
e.printStackTrace(LogUtil.getWarnStream(LOG));
}
}
});
if (pa) threads.add(new Thread() {
public void run() {
try {
List< Writable > res = getSeqRecords(new Path(segment, CrawlDatum.PARSE_DIR_NAME), key);
results.put("pa", res);
} catch (Exception e) {
e.printStackTrace(LogUtil.getWarnStream(LOG));
}
}
});
if (pd) threads.add(new Thread() {
public void run() {
try {
List< Writable > res = getMapRecords(new Path(segment, ParseData.DIR_NAME), key);
results.put("pd", res);
} catch (Exception e) {
e.printStackTrace(LogUtil.getWarnStream(LOG));
}
}
});
if (pt) threads.add(new Thread() {
public void run() {
try {
List< Writable > res = getMapRecords(new Path(segment, ParseText.DIR_NAME), key);
results.put("pt", res);
} catch (Exception e) {
e.printStackTrace(LogUtil.getWarnStream(LOG));
}
}
});
Iterator< Thread > it = threads.iterator();
while (it.hasNext()) it.next().start();
int cnt;
do {
cnt = 0;
try {
Thread.sleep(5000);
} catch (Exception e) {};
it = threads.iterator();
while (it.hasNext()) {
if (it.next().isAlive()) cnt++;
}
if ((cnt > 0) && (LOG.isDebugEnabled())) {
LOG.debug("(" + cnt + " to retrieve)");
}
} while (cnt > 0);
for (int i = 0; i < keys.length; i++) {
List< Writable > res = results.get(keys[i][0]);
if (res != null && res.size() > 0) {
for (int k = 0; k < res.size(); k++) {
writer.write(keys[i][1]);
writer.write(res.get(k) + "\n");
}
}
writer.flush();
}
}
|
public void getStats(Path segment,
SegmentReader.SegmentReaderStats stats) throws Exception {
SequenceFile.Reader[] readers = SequenceFileOutputFormat.getReaders(getConf(), new Path(segment, CrawlDatum.GENERATE_DIR_NAME));
long cnt = 0L;
Text key = new Text();
for (int i = 0; i < readers.length; i++) {
while (readers[i].next(key)) cnt++;
readers[i].close();
}
stats.generated = cnt;
Path fetchDir = new Path(segment, CrawlDatum.FETCH_DIR_NAME);
if (fs.exists(fetchDir) && fs.getFileStatus(fetchDir).isDir()) {
cnt = 0L;
long start = Long.MAX_VALUE;
long end = Long.MIN_VALUE;
CrawlDatum value = new CrawlDatum();
MapFile.Reader[] mreaders = MapFileOutputFormat.getReaders(fs, fetchDir, getConf());
for (int i = 0; i < mreaders.length; i++) {
while (mreaders[i].next(key, value)) {
cnt++;
if (value.getFetchTime() < start) start = value.getFetchTime();
if (value.getFetchTime() > end) end = value.getFetchTime();
}
mreaders[i].close();
}
stats.start = start;
stats.end = end;
stats.fetched = cnt;
}
Path parseDir = new Path(segment, ParseData.DIR_NAME);
if (fs.exists(fetchDir) && fs.getFileStatus(fetchDir).isDir()) {
cnt = 0L;
long errors = 0L;
ParseData value = new ParseData();
MapFile.Reader[] mreaders = MapFileOutputFormat.getReaders(fs, parseDir, getConf());
for (int i = 0; i < mreaders.length; i++) {
while (mreaders[i].next(key, value)) {
cnt++;
if (!value.getStatus().isSuccess()) errors++;
}
mreaders[i].close();
}
stats.parsed = cnt;
stats.parseErrors = errors;
}
}
|
public void list(List dirs,
Writer writer) throws Exception {
writer.write("NAME\t\tGENERATED\tFETCHER START\t\tFETCHER END\t\tFETCHED\tPARSED\n");
for (int i = 0; i < dirs.size(); i++) {
Path dir = dirs.get(i);
SegmentReaderStats stats = new SegmentReaderStats();
getStats(dir, stats);
writer.write(dir.getName() + "\t");
if (stats.generated == -1) writer.write("?");
else writer.write(stats.generated + "");
writer.write("\t\t");
if (stats.start == -1) writer.write("?\t");
else writer.write(sdf.format(new Date(stats.start)));
writer.write("\t");
if (stats.end == -1) writer.write("?");
else writer.write(sdf.format(new Date(stats.end)));
writer.write("\t");
if (stats.fetched == -1) writer.write("?");
else writer.write(stats.fetched + "");
writer.write("\t");
if (stats.parsed == -1) writer.write("?");
else writer.write(stats.parsed + "");
writer.write("\n");
writer.flush();
}
}
|
public static void main(String[] args) throws Exception {
if (args.length < 2) {
usage();
return;
}
int mode = -1;
if (args[0].equals("-dump"))
mode = MODE_DUMP;
else if (args[0].equals("-list"))
mode = MODE_LIST;
else if (args[0].equals("-get")) mode = MODE_GET;
boolean co = true;
boolean fe = true;
boolean ge = true;
boolean pa = true;
boolean pd = true;
boolean pt = true;
// collect general options
for (int i = 1; i < args.length; i++) {
if (args[i].equals("-nocontent")) {
co = false;
args[i] = null;
} else if (args[i].equals("-nofetch")) {
fe = false;
args[i] = null;
} else if (args[i].equals("-nogenerate")) {
ge = false;
args[i] = null;
} else if (args[i].equals("-noparse")) {
pa = false;
args[i] = null;
} else if (args[i].equals("-noparsedata")) {
pd = false;
args[i] = null;
} else if (args[i].equals("-noparsetext")) {
pt = false;
args[i] = null;
}
}
Configuration conf = NutchConfiguration.create();
final FileSystem fs = FileSystem.get(conf);
SegmentReader segmentReader = new SegmentReader(conf, co, fe, ge, pa, pd, pt);
// collect required args
switch (mode) {
case MODE_DUMP:
String input = args[1];
if (input == null) {
System.err.println("Missing required argument: < segment_dir >");
usage();
return;
}
String output = args.length > 2 ? args[2] : null;
if (output == null) {
System.err.println("Missing required argument: < output >");
usage();
return;
}
segmentReader.dump(new Path(input), new Path(output));
return;
case MODE_LIST:
ArrayList< Path > dirs = new ArrayList< Path >();
for (int i = 1; i < args.length; i++) {
if (args[i] == null) continue;
if (args[i].equals("-dir")) {
Path dir = new Path(args[++i]);
FileStatus[] fstats = fs.listStatus(dir, HadoopFSUtil.getPassDirectoriesFilter(fs));
Path[] files = HadoopFSUtil.getPaths(fstats);
if (files != null && files.length > 0) {
dirs.addAll(Arrays.asList(files));
}
} else dirs.add(new Path(args[i]));
}
segmentReader.list(dirs, new OutputStreamWriter(System.out, "UTF-8"));
return;
case MODE_GET:
input = args[1];
if (input == null) {
System.err.println("Missing required argument: < segment_dir >");
usage();
return;
}
String key = args.length > 2 ? args[2] : null;
if (key == null) {
System.err.println("Missing required argument: < keyValue >");
usage();
return;
}
segmentReader.get(new Path(input), new Text(key), new OutputStreamWriter(System.out, "UTF-8"), new HashMap< String, List< Writable > >());
return;
default:
System.err.println("Invalid operation: " + args[0]);
usage();
return;
}
}
|
public void reduce(Text key,
Iterator values,
OutputCollector output,
Reporter reporter) throws IOException {
StringBuffer dump = new StringBuffer();
dump.append("\nRecno:: ").append(recNo++).append("\n");
dump.append("URL:: " + key.toString() + "\n");
while (values.hasNext()) {
Writable value = values.next().get(); // unwrap
if (value instanceof CrawlDatum) {
dump.append("\nCrawlDatum::\n").append(((CrawlDatum) value).toString());
} else if (value instanceof Content) {
dump.append("\nContent::\n").append(((Content) value).toString());
} else if (value instanceof ParseData) {
dump.append("\nParseData::\n").append(((ParseData) value).toString());
} else if (value instanceof ParseText) {
dump.append("\nParseText::\n").append(((ParseText) value).toString());
} else if (LOG.isWarnEnabled()) {
LOG.warn("Unrecognized type: " + value.getClass());
}
}
output.collect(key, new Text(dump.toString()));
}
|