Save This Page
Home » hadoop-0.14.4 » org.apache » hadoop » mapred » [javadoc | source]
    1   /**
    2    * Licensed to the Apache Software Foundation (ASF) under one
    3    * or more contributor license agreements.  See the NOTICE file
    4    * distributed with this work for additional information
    5    * regarding copyright ownership.  The ASF licenses this file
    6    * to you under the Apache License, Version 2.0 (the
    7    * "License"); you may not use this file except in compliance
    8    * with the License.  You may obtain a copy of the License at
    9    *
   10    *     http://www.apache.org/licenses/LICENSE-2.0
   11    *
   12    * Unless required by applicable law or agreed to in writing, software
   13    * distributed under the License is distributed on an "AS IS" BASIS,
   14    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   15    * See the License for the specific language governing permissions and
   16    * limitations under the License.
   17    */
   18   
   19   package org.apache.hadoop.mapred;
   20   
   21   import java.io.IOException;
   22   
   23   
   24   import org.apache.hadoop.conf.Configuration;
   25   import org.apache.hadoop.fs.FileSystem;
   26   import org.apache.hadoop.fs.Path;
   27   import org.apache.hadoop.io;
   28   import org.apache.hadoop.util.ReflectionUtils;
   29   
   30   /** An {@link RecordReader} for {@link SequenceFile}s. */
   31   public class SequenceFileRecordReader implements RecordReader {
   32     private SequenceFile.Reader in;
   33     private long start;
   34     private long end;
   35     private boolean more = true;
   36     protected Configuration conf;
   37   
   38     public SequenceFileRecordReader(Configuration conf, FileSplit split)
   39       throws IOException {
   40       Path path = split.getPath();
   41       FileSystem fs = path.getFileSystem(conf);
   42       this.in = new SequenceFile.Reader(fs, path, conf);
   43       this.end = split.getStart() + split.getLength();
   44       this.conf = conf;
   45   
   46       if (split.getStart() > in.getPosition())
   47         in.sync(split.getStart());                  // sync to start
   48   
   49       this.start = in.getPosition();
   50       more = start < end;
   51     }
   52   
   53   
   54     /** The class of key that must be passed to {@link
   55      * #next(Writable,Writable)}.. */
   56     public Class getKeyClass() { return in.getKeyClass(); }
   57   
   58     /** The class of value that must be passed to {@link
   59      * #next(Writable,Writable)}.. */
   60     public Class getValueClass() { return in.getValueClass(); }
   61     
   62     public WritableComparable createKey() {
   63       return (WritableComparable) ReflectionUtils.newInstance(getKeyClass(), 
   64                                                               conf);
   65     }
   66     
   67     public Writable createValue() {
   68       return (Writable) ReflectionUtils.newInstance(getValueClass(), conf);
   69     }
   70       
   71     public synchronized boolean next(Writable key, Writable value)
   72       throws IOException {
   73       if (!more) return false;
   74       long pos = in.getPosition();
   75       boolean eof = in.next(key, value);
   76       if (pos >= end && in.syncSeen()) {
   77         more = false;
   78       } else {
   79         more = eof;
   80       }
   81       return more;
   82     }
   83     
   84     protected synchronized boolean next(Writable key)
   85       throws IOException {
   86       if (!more) return false;
   87       long pos = in.getPosition();
   88       boolean eof = in.next(key);
   89       if (pos >= end && in.syncSeen()) {
   90         more = false;
   91       } else {
   92         more = eof;
   93       }
   94       return more;
   95     }
   96     
   97     protected synchronized void getCurrentValue(Writable value)
   98       throws IOException {
   99       in.getCurrentValue(value);
  100     }
  101     
  102     /**
  103      * Return the progress within the input split
  104      * @return 0.0 to 1.0 of the input byte range
  105      */
  106     public float getProgress() throws IOException {
  107       if (end == start) {
  108         return 0.0f;
  109       } else {
  110         return Math.min(1.0f, (in.getPosition() - start) / (float)(end - start));
  111       }
  112     }
  113     
  114     public synchronized long getPos() throws IOException {
  115       return in.getPosition();
  116     }
  117     
  118     protected synchronized void seek(long pos) throws IOException {
  119       in.seek(pos);
  120     }
  121     public synchronized void close() throws IOException { in.close(); }
  122     
  123   }
  124   

Save This Page
Home » hadoop-0.14.4 » org.apache » hadoop » mapred » [javadoc | source]