001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.hadoop.mapreduce.lib.input;
019    
020    import java.io.IOException;
021    
022    import org.apache.hadoop.classification.InterfaceAudience;
023    import org.apache.hadoop.classification.InterfaceStability;
024    import org.apache.hadoop.conf.Configuration;
025    import org.apache.hadoop.fs.FileSystem;
026    import org.apache.hadoop.fs.Path;
027    import org.apache.hadoop.io.BytesWritable;
028    import org.apache.hadoop.io.DataOutputBuffer;
029    import org.apache.hadoop.io.SequenceFile;
030    import org.apache.hadoop.mapreduce.InputSplit;
031    import org.apache.hadoop.mapreduce.RecordReader;
032    import org.apache.hadoop.mapreduce.TaskAttemptContext;
033    
034    /**
035     * InputFormat reading keys, values from SequenceFiles in binary (raw)
036     * format.
037     */
038    @InterfaceAudience.Public
039    @InterfaceStability.Stable
040    public class SequenceFileAsBinaryInputFormat
041        extends SequenceFileInputFormat<BytesWritable,BytesWritable> {
042    
043      public SequenceFileAsBinaryInputFormat() {
044        super();
045      }
046    
047      public RecordReader<BytesWritable,BytesWritable> createRecordReader(
048          InputSplit split, TaskAttemptContext context)
049          throws IOException {
050        return new SequenceFileAsBinaryRecordReader();
051      }
052    
053      /**
054       * Read records from a SequenceFile as binary (raw) bytes.
055       */
056      public static class SequenceFileAsBinaryRecordReader
057          extends RecordReader<BytesWritable,BytesWritable> {
058        private SequenceFile.Reader in;
059        private long start;
060        private long end;
061        private boolean done = false;
062        private DataOutputBuffer buffer = new DataOutputBuffer();
063        private SequenceFile.ValueBytes vbytes;
064        private BytesWritable key = null;
065        private BytesWritable value = null;
066    
067        public void initialize(InputSplit split, TaskAttemptContext context) 
068            throws IOException, InterruptedException {
069          Path path = ((FileSplit)split).getPath();
070          Configuration conf = context.getConfiguration();
071          FileSystem fs = path.getFileSystem(conf);
072          this.in = new SequenceFile.Reader(fs, path, conf);
073          this.end = ((FileSplit)split).getStart() + split.getLength();
074          if (((FileSplit)split).getStart() > in.getPosition()) {
075            in.sync(((FileSplit)split).getStart());    // sync to start
076          }
077          this.start = in.getPosition();
078          vbytes = in.createValueBytes();
079          done = start >= end;
080        }
081        
082        @Override
083        public BytesWritable getCurrentKey() 
084            throws IOException, InterruptedException {
085          return key;
086        }
087        
088        @Override
089        public BytesWritable getCurrentValue() 
090            throws IOException, InterruptedException {
091          return value;
092        }
093    
094        /**
095         * Retrieve the name of the key class for this SequenceFile.
096         * @see org.apache.hadoop.io.SequenceFile.Reader#getKeyClassName
097         */
098        public String getKeyClassName() {
099          return in.getKeyClassName();
100        }
101    
102        /**
103         * Retrieve the name of the value class for this SequenceFile.
104         * @see org.apache.hadoop.io.SequenceFile.Reader#getValueClassName
105         */
106        public String getValueClassName() {
107          return in.getValueClassName();
108        }
109    
110        /**
111         * Read raw bytes from a SequenceFile.
112         */
113        public synchronized boolean nextKeyValue()
114            throws IOException, InterruptedException {
115          if (done) {
116            return false;
117          }
118          long pos = in.getPosition();
119          boolean eof = -1 == in.nextRawKey(buffer);
120          if (!eof) {
121            if (key == null) {
122              key = new BytesWritable();
123            }
124            if (value == null) {
125              value = new BytesWritable();
126            }
127            key.set(buffer.getData(), 0, buffer.getLength());
128            buffer.reset();
129            in.nextRawValue(vbytes);
130            vbytes.writeUncompressedBytes(buffer);
131            value.set(buffer.getData(), 0, buffer.getLength());
132            buffer.reset();
133          }
134          return !(done = (eof || (pos >= end && in.syncSeen())));
135        }
136    
137        public void close() throws IOException {
138          in.close();
139        }
140    
141        /**
142         * Return the progress within the input split
143         * @return 0.0 to 1.0 of the input byte range
144         */
145        public float getProgress() throws IOException, InterruptedException {
146          if (end == start) {
147            return 0.0f;
148          } else {
149            return Math.min(1.0f, (float)((in.getPosition() - start) /
150                                          (double)(end - start)));
151          }
152        }
153      }
154    }