001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.mapreduce.lib.input;
020    
021    import java.io.IOException;
022    
023    
024    import org.apache.hadoop.classification.InterfaceAudience;
025    import org.apache.hadoop.classification.InterfaceStability;
026    import org.apache.hadoop.conf.Configuration;
027    import org.apache.hadoop.fs.FileSystem;
028    import org.apache.hadoop.fs.Path;
029    import org.apache.hadoop.io.*;
030    import org.apache.hadoop.mapreduce.InputSplit;
031    import org.apache.hadoop.mapreduce.RecordReader;
032    import org.apache.hadoop.mapreduce.TaskAttemptContext;
033    
034    /** An {@link RecordReader} for {@link SequenceFile}s. */
035    @InterfaceAudience.Public
036    @InterfaceStability.Stable
037    public class SequenceFileRecordReader<K, V> extends RecordReader<K, V> {
038      private SequenceFile.Reader in;
039      private long start;
040      private long end;
041      private boolean more = true;
042      private K key = null;
043      private V value = null;
044      protected Configuration conf;
045    
046      @Override
047      public void initialize(InputSplit split, 
048                             TaskAttemptContext context
049                             ) throws IOException, InterruptedException {
050        FileSplit fileSplit = (FileSplit) split;
051        conf = context.getConfiguration();    
052        Path path = fileSplit.getPath();
053        FileSystem fs = path.getFileSystem(conf);
054        this.in = new SequenceFile.Reader(fs, path, conf);
055        this.end = fileSplit.getStart() + fileSplit.getLength();
056    
057        if (fileSplit.getStart() > in.getPosition()) {
058          in.sync(fileSplit.getStart());                  // sync to start
059        }
060    
061        this.start = in.getPosition();
062        more = start < end;
063      }
064    
065      @Override
066      @SuppressWarnings("unchecked")
067      public boolean nextKeyValue() throws IOException, InterruptedException {
068        if (!more) {
069          return false;
070        }
071        long pos = in.getPosition();
072        key = (K) in.next(key);
073        if (key == null || (pos >= end && in.syncSeen())) {
074          more = false;
075          key = null;
076          value = null;
077        } else {
078          value = (V) in.getCurrentValue(value);
079        }
080        return more;
081      }
082    
083      @Override
084      public K getCurrentKey() {
085        return key;
086      }
087      
088      @Override
089      public V getCurrentValue() {
090        return value;
091      }
092      
093      /**
094       * Return the progress within the input split
095       * @return 0.0 to 1.0 of the input byte range
096       */
097      public float getProgress() throws IOException {
098        if (end == start) {
099          return 0.0f;
100        } else {
101          return Math.min(1.0f, (in.getPosition() - start) / (float)(end - start));
102        }
103      }
104      
105      public synchronized void close() throws IOException { in.close(); }
106      
107    }
108