001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.mapreduce.lib.input;
020    
021    import java.io.IOException;
022    
023    import org.apache.hadoop.classification.InterfaceAudience;
024    import org.apache.hadoop.classification.InterfaceStability;
025    import org.apache.hadoop.conf.Configuration;
026    import org.apache.hadoop.mapreduce.InputSplit;
027    import org.apache.hadoop.mapreduce.MRJobConfig;
028    import org.apache.hadoop.mapreduce.RecordReader;
029    import org.apache.hadoop.mapreduce.TaskAttemptContext;
030    
031    /**
032     * A wrapper class for a record reader that handles a single file split. It
033     * delegates most of the methods to the wrapped instance. A concrete subclass
034     * needs to provide a constructor that calls this parent constructor with the
035     * appropriate input format. The subclass constructor must satisfy the specific
036     * constructor signature that is required by
037     * <code>CombineFileRecordReader</code>.
038     *
039     * Subclassing is needed to get a concrete record reader wrapper because of the
040     * constructor requirement.
041     *
042     * @see CombineFileRecordReader
043     * @see CombineFileInputFormat
044     */
045    @InterfaceAudience.Public
046    @InterfaceStability.Stable
047    public abstract class CombineFileRecordReaderWrapper<K,V>
048      extends RecordReader<K,V> {
049      private final FileSplit fileSplit;
050      private final RecordReader<K,V> delegate;
051    
052      protected CombineFileRecordReaderWrapper(FileInputFormat<K,V> inputFormat,
053        CombineFileSplit split, TaskAttemptContext context, Integer idx)
054        throws IOException, InterruptedException {
055        fileSplit = new FileSplit(split.getPath(idx),
056          split.getOffset(idx),
057          split.getLength(idx),
058          split.getLocations());
059    
060        delegate = inputFormat.createRecordReader(fileSplit, context);
061      }
062    
063      public void initialize(InputSplit split, TaskAttemptContext context)
064        throws IOException, InterruptedException {
065        // it really should be the same file split at the time the wrapper instance
066        // was created
067        assert fileSplitIsValid(context);
068    
069        delegate.initialize(fileSplit, context);
070      }
071    
072      private boolean fileSplitIsValid(TaskAttemptContext context) {
073        Configuration conf = context.getConfiguration();
074        long offset = conf.getLong(MRJobConfig.MAP_INPUT_START, 0L);
075        if (fileSplit.getStart() != offset) {
076          return false;
077        }
078        long length = conf.getLong(MRJobConfig.MAP_INPUT_PATH, 0L);
079        if (fileSplit.getLength() != length) {
080          return false;
081        }
082        String path = conf.get(MRJobConfig.MAP_INPUT_FILE);
083        if (!fileSplit.getPath().toString().equals(path)) {
084          return false;
085        }
086        return true;
087      }
088    
089      public boolean nextKeyValue() throws IOException, InterruptedException {
090        return delegate.nextKeyValue();
091      }
092    
093      public K getCurrentKey() throws IOException, InterruptedException {
094        return delegate.getCurrentKey();
095      }
096    
097      public V getCurrentValue() throws IOException, InterruptedException {
098        return delegate.getCurrentValue();
099      }
100    
101      public float getProgress() throws IOException, InterruptedException {
102        return delegate.getProgress();
103      }
104    
105      public void close() throws IOException {
106        delegate.close();
107      }
108    }