001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.hadoop.mapred;
019
020 import java.io.IOException;
021
022 import org.apache.hadoop.classification.InterfaceAudience;
023 import org.apache.hadoop.classification.InterfaceStability;
024 import org.apache.hadoop.conf.Configuration;
025 import org.apache.hadoop.fs.FileSystem;
026 import org.apache.hadoop.fs.Path;
027 import org.apache.hadoop.io.BytesWritable;
028 import org.apache.hadoop.io.DataOutputBuffer;
029 import org.apache.hadoop.io.SequenceFile;
030
031 /**
032 * InputFormat reading keys, values from SequenceFiles in binary (raw)
033 * format.
034 */
035 @InterfaceAudience.Public
036 @InterfaceStability.Stable
037 public class SequenceFileAsBinaryInputFormat
038 extends SequenceFileInputFormat<BytesWritable,BytesWritable> {
039
040 public SequenceFileAsBinaryInputFormat() {
041 super();
042 }
043
044 public RecordReader<BytesWritable,BytesWritable> getRecordReader(
045 InputSplit split, JobConf job, Reporter reporter)
046 throws IOException {
047 return new SequenceFileAsBinaryRecordReader(job, (FileSplit)split);
048 }
049
050 /**
051 * Read records from a SequenceFile as binary (raw) bytes.
052 */
053 public static class SequenceFileAsBinaryRecordReader
054 implements RecordReader<BytesWritable,BytesWritable> {
055 private SequenceFile.Reader in;
056 private long start;
057 private long end;
058 private boolean done = false;
059 private DataOutputBuffer buffer = new DataOutputBuffer();
060 private SequenceFile.ValueBytes vbytes;
061
062 public SequenceFileAsBinaryRecordReader(Configuration conf, FileSplit split)
063 throws IOException {
064 Path path = split.getPath();
065 FileSystem fs = path.getFileSystem(conf);
066 this.in = new SequenceFile.Reader(fs, path, conf);
067 this.end = split.getStart() + split.getLength();
068 if (split.getStart() > in.getPosition())
069 in.sync(split.getStart()); // sync to start
070 this.start = in.getPosition();
071 vbytes = in.createValueBytes();
072 done = start >= end;
073 }
074
075 public BytesWritable createKey() {
076 return new BytesWritable();
077 }
078
079 public BytesWritable createValue() {
080 return new BytesWritable();
081 }
082
083 /**
084 * Retrieve the name of the key class for this SequenceFile.
085 * @see org.apache.hadoop.io.SequenceFile.Reader#getKeyClassName
086 */
087 public String getKeyClassName() {
088 return in.getKeyClassName();
089 }
090
091 /**
092 * Retrieve the name of the value class for this SequenceFile.
093 * @see org.apache.hadoop.io.SequenceFile.Reader#getValueClassName
094 */
095 public String getValueClassName() {
096 return in.getValueClassName();
097 }
098
099 /**
100 * Read raw bytes from a SequenceFile.
101 */
102 public synchronized boolean next(BytesWritable key, BytesWritable val)
103 throws IOException {
104 if (done) return false;
105 long pos = in.getPosition();
106 boolean eof = -1 == in.nextRawKey(buffer);
107 if (!eof) {
108 key.set(buffer.getData(), 0, buffer.getLength());
109 buffer.reset();
110 in.nextRawValue(vbytes);
111 vbytes.writeUncompressedBytes(buffer);
112 val.set(buffer.getData(), 0, buffer.getLength());
113 buffer.reset();
114 }
115 return !(done = (eof || (pos >= end && in.syncSeen())));
116 }
117
118 public long getPos() throws IOException {
119 return in.getPosition();
120 }
121
122 public void close() throws IOException {
123 in.close();
124 }
125
126 /**
127 * Return the progress within the input split
128 * @return 0.0 to 1.0 of the input byte range
129 */
130 public float getProgress() throws IOException {
131 if (end == start) {
132 return 0.0f;
133 } else {
134 return Math.min(1.0f, (float)((in.getPosition() - start) /
135 (double)(end - start)));
136 }
137 }
138 }
139 }