001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.hadoop.mapred;
019    
020    import java.io.IOException;
021    
022    import org.apache.hadoop.classification.InterfaceAudience;
023    import org.apache.hadoop.classification.InterfaceStability;
024    import org.apache.hadoop.conf.Configuration;
025    import org.apache.hadoop.fs.FileSystem;
026    import org.apache.hadoop.fs.Path;
027    import org.apache.hadoop.io.BytesWritable;
028    import org.apache.hadoop.io.DataOutputBuffer;
029    import org.apache.hadoop.io.SequenceFile;
030    
031    /**
032     * InputFormat reading keys, values from SequenceFiles in binary (raw)
033     * format.
034     */
035    @InterfaceAudience.Public
036    @InterfaceStability.Stable
037    public class SequenceFileAsBinaryInputFormat
038        extends SequenceFileInputFormat<BytesWritable,BytesWritable> {
039    
040      public SequenceFileAsBinaryInputFormat() {
041        super();
042      }
043    
044      public RecordReader<BytesWritable,BytesWritable> getRecordReader(
045          InputSplit split, JobConf job, Reporter reporter)
046          throws IOException {
047        return new SequenceFileAsBinaryRecordReader(job, (FileSplit)split);
048      }
049    
050      /**
051       * Read records from a SequenceFile as binary (raw) bytes.
052       */
053      public static class SequenceFileAsBinaryRecordReader
054          implements RecordReader<BytesWritable,BytesWritable> {
055        private SequenceFile.Reader in;
056        private long start;
057        private long end;
058        private boolean done = false;
059        private DataOutputBuffer buffer = new DataOutputBuffer();
060        private SequenceFile.ValueBytes vbytes;
061    
062        public SequenceFileAsBinaryRecordReader(Configuration conf, FileSplit split)
063            throws IOException {
064          Path path = split.getPath();
065          FileSystem fs = path.getFileSystem(conf);
066          this.in = new SequenceFile.Reader(fs, path, conf);
067          this.end = split.getStart() + split.getLength();
068          if (split.getStart() > in.getPosition())
069            in.sync(split.getStart());                  // sync to start
070          this.start = in.getPosition();
071          vbytes = in.createValueBytes();
072          done = start >= end;
073        }
074    
075        public BytesWritable createKey() {
076          return new BytesWritable();
077        }
078    
079        public BytesWritable createValue() {
080          return new BytesWritable();
081        }
082    
083        /**
084         * Retrieve the name of the key class for this SequenceFile.
085         * @see org.apache.hadoop.io.SequenceFile.Reader#getKeyClassName
086         */
087        public String getKeyClassName() {
088          return in.getKeyClassName();
089        }
090    
091        /**
092         * Retrieve the name of the value class for this SequenceFile.
093         * @see org.apache.hadoop.io.SequenceFile.Reader#getValueClassName
094         */
095        public String getValueClassName() {
096          return in.getValueClassName();
097        }
098    
099        /**
100         * Read raw bytes from a SequenceFile.
101         */
102        public synchronized boolean next(BytesWritable key, BytesWritable val)
103            throws IOException {
104          if (done) return false;
105          long pos = in.getPosition();
106          boolean eof = -1 == in.nextRawKey(buffer);
107          if (!eof) {
108            key.set(buffer.getData(), 0, buffer.getLength());
109            buffer.reset();
110            in.nextRawValue(vbytes);
111            vbytes.writeUncompressedBytes(buffer);
112            val.set(buffer.getData(), 0, buffer.getLength());
113            buffer.reset();
114          }
115          return !(done = (eof || (pos >= end && in.syncSeen())));
116        }
117    
118        public long getPos() throws IOException {
119          return in.getPosition();
120        }
121    
122        public void close() throws IOException {
123          in.close();
124        }
125    
126        /**
127         * Return the progress within the input split
128         * @return 0.0 to 1.0 of the input byte range
129         */
130        public float getProgress() throws IOException {
131          if (end == start) {
132            return 0.0f;
133          } else {
134            return Math.min(1.0f, (float)((in.getPosition() - start) /
135                                          (double)(end - start)));
136          }
137        }
138      }
139    }