001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.mapred;
020
021 import java.io.IOException;
022
023
024 import org.apache.hadoop.classification.InterfaceAudience;
025 import org.apache.hadoop.classification.InterfaceStability;
026 import org.apache.hadoop.conf.Configuration;
027 import org.apache.hadoop.fs.FileSystem;
028 import org.apache.hadoop.fs.Path;
029 import org.apache.hadoop.io.*;
030 import org.apache.hadoop.util.ReflectionUtils;
031
032 /**
033 * An {@link RecordReader} for {@link SequenceFile}s.
034 */
035 @InterfaceAudience.Public
036 @InterfaceStability.Stable
037 public class SequenceFileRecordReader<K, V> implements RecordReader<K, V> {
038
039 private SequenceFile.Reader in;
040 private long start;
041 private long end;
042 private boolean more = true;
043 protected Configuration conf;
044
045 public SequenceFileRecordReader(Configuration conf, FileSplit split)
046 throws IOException {
047 Path path = split.getPath();
048 FileSystem fs = path.getFileSystem(conf);
049 this.in = new SequenceFile.Reader(fs, path, conf);
050 this.end = split.getStart() + split.getLength();
051 this.conf = conf;
052
053 if (split.getStart() > in.getPosition())
054 in.sync(split.getStart()); // sync to start
055
056 this.start = in.getPosition();
057 more = start < end;
058 }
059
060
061 /** The class of key that must be passed to {@link
062 * #next(Object, Object)}.. */
063 public Class getKeyClass() { return in.getKeyClass(); }
064
065 /** The class of value that must be passed to {@link
066 * #next(Object, Object)}.. */
067 public Class getValueClass() { return in.getValueClass(); }
068
069 @SuppressWarnings("unchecked")
070 public K createKey() {
071 return (K) ReflectionUtils.newInstance(getKeyClass(), conf);
072 }
073
074 @SuppressWarnings("unchecked")
075 public V createValue() {
076 return (V) ReflectionUtils.newInstance(getValueClass(), conf);
077 }
078
079 public synchronized boolean next(K key, V value) throws IOException {
080 if (!more) return false;
081 long pos = in.getPosition();
082 boolean remaining = (in.next(key) != null);
083 if (remaining) {
084 getCurrentValue(value);
085 }
086 if (pos >= end && in.syncSeen()) {
087 more = false;
088 } else {
089 more = remaining;
090 }
091 return more;
092 }
093
094 protected synchronized boolean next(K key)
095 throws IOException {
096 if (!more) return false;
097 long pos = in.getPosition();
098 boolean remaining = (in.next(key) != null);
099 if (pos >= end && in.syncSeen()) {
100 more = false;
101 } else {
102 more = remaining;
103 }
104 return more;
105 }
106
107 protected synchronized void getCurrentValue(V value)
108 throws IOException {
109 in.getCurrentValue(value);
110 }
111
112 /**
113 * Return the progress within the input split
114 * @return 0.0 to 1.0 of the input byte range
115 */
116 public float getProgress() throws IOException {
117 if (end == start) {
118 return 0.0f;
119 } else {
120 return Math.min(1.0f, (in.getPosition() - start) / (float)(end - start));
121 }
122 }
123
124 public synchronized long getPos() throws IOException {
125 return in.getPosition();
126 }
127
128 protected synchronized void seek(long pos) throws IOException {
129 in.seek(pos);
130 }
131 public synchronized void close() throws IOException { in.close(); }
132
133 }
134