001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.mapred; 020 021import java.io.IOException; 022 023 024import org.apache.hadoop.classification.InterfaceAudience; 025import org.apache.hadoop.classification.InterfaceStability; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.fs.FileSystem; 028import org.apache.hadoop.fs.Path; 029import org.apache.hadoop.io.*; 030import org.apache.hadoop.util.ReflectionUtils; 031 032/** 033 * An {@link RecordReader} for {@link SequenceFile}s. 034 */ 035@InterfaceAudience.Public 036@InterfaceStability.Stable 037public class SequenceFileRecordReader<K, V> implements RecordReader<K, V> { 038 039 private SequenceFile.Reader in; 040 private long start; 041 private long end; 042 private boolean more = true; 043 protected Configuration conf; 044 045 public SequenceFileRecordReader(Configuration conf, FileSplit split) 046 throws IOException { 047 Path path = split.getPath(); 048 FileSystem fs = path.getFileSystem(conf); 049 this.in = new SequenceFile.Reader(fs, path, conf); 050 this.end = split.getStart() + split.getLength(); 051 this.conf = conf; 052 053 if (split.getStart() > in.getPosition()) 054 in.sync(split.getStart()); // sync to start 055 056 this.start = in.getPosition(); 057 more = start < end; 058 } 059 060 061 /** The class of key that must be passed to {@link 062 * #next(Object, Object)}.. */ 063 public Class getKeyClass() { return in.getKeyClass(); } 064 065 /** The class of value that must be passed to {@link 066 * #next(Object, Object)}.. */ 067 public Class getValueClass() { return in.getValueClass(); } 068 069 @SuppressWarnings("unchecked") 070 public K createKey() { 071 return (K) ReflectionUtils.newInstance(getKeyClass(), conf); 072 } 073 074 @SuppressWarnings("unchecked") 075 public V createValue() { 076 return (V) ReflectionUtils.newInstance(getValueClass(), conf); 077 } 078 079 public synchronized boolean next(K key, V value) throws IOException { 080 if (!more) return false; 081 long pos = in.getPosition(); 082 boolean remaining = (in.next(key) != null); 083 if (remaining) { 084 getCurrentValue(value); 085 } 086 if (pos >= end && in.syncSeen()) { 087 more = false; 088 } else { 089 more = remaining; 090 } 091 return more; 092 } 093 094 protected synchronized boolean next(K key) 095 throws IOException { 096 if (!more) return false; 097 long pos = in.getPosition(); 098 boolean remaining = (in.next(key) != null); 099 if (pos >= end && in.syncSeen()) { 100 more = false; 101 } else { 102 more = remaining; 103 } 104 return more; 105 } 106 107 protected synchronized void getCurrentValue(V value) 108 throws IOException { 109 in.getCurrentValue(value); 110 } 111 112 /** 113 * Return the progress within the input split 114 * @return 0.0 to 1.0 of the input byte range 115 */ 116 public float getProgress() throws IOException { 117 if (end == start) { 118 return 0.0f; 119 } else { 120 return Math.min(1.0f, (in.getPosition() - start) / (float)(end - start)); 121 } 122 } 123 124 public synchronized long getPos() throws IOException { 125 return in.getPosition(); 126 } 127 128 protected synchronized void seek(long pos) throws IOException { 129 in.seek(pos); 130 } 131 public synchronized void close() throws IOException { in.close(); } 132 133} 134