001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.mapreduce.lib.input;
020    
021    import java.io.*;
022    import java.lang.reflect.*;
023    
024    import org.apache.hadoop.fs.FileSystem;
025    
026    import org.apache.hadoop.mapreduce.*;
027    import org.apache.hadoop.classification.InterfaceAudience;
028    import org.apache.hadoop.classification.InterfaceStability;
029    import org.apache.hadoop.conf.Configuration;
030    
031    /**
032     * A generic RecordReader that can hand out different recordReaders
033     * for each chunk in a {@link CombineFileSplit}.
034     * A CombineFileSplit can combine data chunks from multiple files. 
035     * This class allows using different RecordReaders for processing
036     * these data chunks from different files.
037     * @see CombineFileSplit
038     */
039    @InterfaceAudience.Public
040    @InterfaceStability.Stable
041    public class CombineFileRecordReader<K, V> extends RecordReader<K, V> {
042    
043      static final Class [] constructorSignature = new Class [] 
044                                             {CombineFileSplit.class,
045                                              TaskAttemptContext.class,
046                                              Integer.class};
047    
048      protected CombineFileSplit split;
049      protected Class<? extends RecordReader<K,V>> rrClass;
050      protected Constructor<? extends RecordReader<K,V>> rrConstructor;
051      protected FileSystem fs;
052      protected TaskAttemptContext context;
053      
054      protected int idx;
055      protected long progress;
056      protected RecordReader<K, V> curReader;
057    
058      public void initialize(InputSplit split,
059          TaskAttemptContext context) throws IOException, InterruptedException {
060        this.split = (CombineFileSplit)split;
061        this.context = context;
062        if (null != this.curReader) {
063          this.curReader.initialize(split, context);
064        }
065      }
066      
067      public boolean nextKeyValue() throws IOException, InterruptedException {
068    
069        while ((curReader == null) || !curReader.nextKeyValue()) {
070          if (!initNextRecordReader()) {
071            return false;
072          }
073        }
074        return true;
075      }
076    
077      public K getCurrentKey() throws IOException, InterruptedException {
078        return curReader.getCurrentKey();
079      }
080      
081      public V getCurrentValue() throws IOException, InterruptedException {
082        return curReader.getCurrentValue();
083      }
084      
085      public void close() throws IOException {
086        if (curReader != null) {
087          curReader.close();
088          curReader = null;
089        }
090      }
091      
092      /**
093       * return progress based on the amount of data processed so far.
094       */
095      public float getProgress() throws IOException, InterruptedException {
096        long subprogress = 0;    // bytes processed in current split
097        if (null != curReader) {
098          // idx is always one past the current subsplit's true index.
099          subprogress = (long)(curReader.getProgress() * split.getLength(idx - 1));
100        }
101        return Math.min(1.0f,  (progress + subprogress)/(float)(split.getLength()));
102      }
103      
104      /**
105       * A generic RecordReader that can hand out different recordReaders
106       * for each chunk in the CombineFileSplit.
107       */
108      public CombineFileRecordReader(CombineFileSplit split,
109                                     TaskAttemptContext context,
110                                     Class<? extends RecordReader<K,V>> rrClass)
111        throws IOException {
112        this.split = split;
113        this.context = context;
114        this.rrClass = rrClass;
115        this.idx = 0;
116        this.curReader = null;
117        this.progress = 0;
118    
119        try {
120          rrConstructor = rrClass.getDeclaredConstructor(constructorSignature);
121          rrConstructor.setAccessible(true);
122        } catch (Exception e) {
123          throw new RuntimeException(rrClass.getName() + 
124                                     " does not have valid constructor", e);
125        }
126        initNextRecordReader();
127      }
128      
129      /**
130       * Get the record reader for the next chunk in this CombineFileSplit.
131       */
132      protected boolean initNextRecordReader() throws IOException {
133    
134        if (curReader != null) {
135          curReader.close();
136          curReader = null;
137          if (idx > 0) {
138            progress += split.getLength(idx-1);    // done processing so far
139          }
140        }
141    
142        // if all chunks have been processed, nothing more to do.
143        if (idx == split.getNumPaths()) {
144          return false;
145        }
146    
147        context.progress();
148    
149        // get a record reader for the idx-th chunk
150        try {
151          Configuration conf = context.getConfiguration();
152          // setup some helper config variables.
153          conf.set(MRJobConfig.MAP_INPUT_FILE, split.getPath(idx).toString());
154          conf.setLong(MRJobConfig.MAP_INPUT_START, split.getOffset(idx));
155          conf.setLong(MRJobConfig.MAP_INPUT_PATH, split.getLength(idx));
156    
157          curReader =  rrConstructor.newInstance(new Object [] 
158                                {split, context, Integer.valueOf(idx)});
159    
160          if (idx > 0) {
161            // initialize() for the first RecordReader will be called by MapTask;
162            // we're responsible for initializing subsequent RecordReaders.
163            curReader.initialize(split, context);
164          }
165        } catch (Exception e) {
166          throw new RuntimeException (e);
167        }
168        idx++;
169        return true;
170      }
171    }