001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.mapreduce.lib.input;
020
021import java.io.IOException;
022
023
024import org.apache.hadoop.classification.InterfaceAudience;
025import org.apache.hadoop.classification.InterfaceStability;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.fs.FileSystem;
028import org.apache.hadoop.fs.Path;
029import org.apache.hadoop.io.*;
030import org.apache.hadoop.mapreduce.InputSplit;
031import org.apache.hadoop.mapreduce.RecordReader;
032import org.apache.hadoop.mapreduce.TaskAttemptContext;
033
034/** An {@link RecordReader} for {@link SequenceFile}s. */
035@InterfaceAudience.Public
036@InterfaceStability.Stable
037public class SequenceFileRecordReader<K, V> extends RecordReader<K, V> {
038  private SequenceFile.Reader in;
039  private long start;
040  private long end;
041  private boolean more = true;
042  private K key = null;
043  private V value = null;
044  protected Configuration conf;
045
046  @Override
047  public void initialize(InputSplit split, 
048                         TaskAttemptContext context
049                         ) throws IOException, InterruptedException {
050    FileSplit fileSplit = (FileSplit) split;
051    conf = context.getConfiguration();    
052    Path path = fileSplit.getPath();
053    FileSystem fs = path.getFileSystem(conf);
054    this.in = new SequenceFile.Reader(fs, path, conf);
055    this.end = fileSplit.getStart() + fileSplit.getLength();
056
057    if (fileSplit.getStart() > in.getPosition()) {
058      in.sync(fileSplit.getStart());                  // sync to start
059    }
060
061    this.start = in.getPosition();
062    more = start < end;
063  }
064
065  @Override
066  @SuppressWarnings("unchecked")
067  public boolean nextKeyValue() throws IOException, InterruptedException {
068    if (!more) {
069      return false;
070    }
071    long pos = in.getPosition();
072    key = (K) in.next(key);
073    if (key == null || (pos >= end && in.syncSeen())) {
074      more = false;
075      key = null;
076      value = null;
077    } else {
078      value = (V) in.getCurrentValue(value);
079    }
080    return more;
081  }
082
083  @Override
084  public K getCurrentKey() {
085    return key;
086  }
087  
088  @Override
089  public V getCurrentValue() {
090    return value;
091  }
092  
093  /**
094   * Return the progress within the input split
095   * @return 0.0 to 1.0 of the input byte range
096   */
097  public float getProgress() throws IOException {
098    if (end == start) {
099      return 0.0f;
100    } else {
101      return Math.min(1.0f, (in.getPosition() - start) / (float)(end - start));
102    }
103  }
104  
105  public synchronized void close() throws IOException { in.close(); }
106  
107}
108