001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.mapreduce.lib.input; 020 021import java.io.IOException; 022 023 024import org.apache.hadoop.classification.InterfaceAudience; 025import org.apache.hadoop.classification.InterfaceStability; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.fs.FileSystem; 028import org.apache.hadoop.fs.Path; 029import org.apache.hadoop.io.*; 030import org.apache.hadoop.mapreduce.InputSplit; 031import org.apache.hadoop.mapreduce.RecordReader; 032import org.apache.hadoop.mapreduce.TaskAttemptContext; 033 034/** An {@link RecordReader} for {@link SequenceFile}s. */ 035@InterfaceAudience.Public 036@InterfaceStability.Stable 037public class SequenceFileRecordReader<K, V> extends RecordReader<K, V> { 038 private SequenceFile.Reader in; 039 private long start; 040 private long end; 041 private boolean more = true; 042 private K key = null; 043 private V value = null; 044 protected Configuration conf; 045 046 @Override 047 public void initialize(InputSplit split, 048 TaskAttemptContext context 049 ) throws IOException, InterruptedException { 050 FileSplit fileSplit = (FileSplit) split; 051 conf = context.getConfiguration(); 052 Path path = fileSplit.getPath(); 053 FileSystem fs = path.getFileSystem(conf); 054 this.in = new SequenceFile.Reader(fs, path, conf); 055 this.end = fileSplit.getStart() + fileSplit.getLength(); 056 057 if (fileSplit.getStart() > in.getPosition()) { 058 in.sync(fileSplit.getStart()); // sync to start 059 } 060 061 this.start = in.getPosition(); 062 more = start < end; 063 } 064 065 @Override 066 @SuppressWarnings("unchecked") 067 public boolean nextKeyValue() throws IOException, InterruptedException { 068 if (!more) { 069 return false; 070 } 071 long pos = in.getPosition(); 072 key = (K) in.next(key); 073 if (key == null || (pos >= end && in.syncSeen())) { 074 more = false; 075 key = null; 076 value = null; 077 } else { 078 value = (V) in.getCurrentValue(value); 079 } 080 return more; 081 } 082 083 @Override 084 public K getCurrentKey() { 085 return key; 086 } 087 088 @Override 089 public V getCurrentValue() { 090 return value; 091 } 092 093 /** 094 * Return the progress within the input split 095 * @return 0.0 to 1.0 of the input byte range 096 */ 097 public float getProgress() throws IOException { 098 if (end == start) { 099 return 0.0f; 100 } else { 101 return Math.min(1.0f, (in.getPosition() - start) / (float)(end - start)); 102 } 103 } 104 105 public synchronized void close() throws IOException { in.close(); } 106 107} 108