001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.mapred.lib;
020    
021    import java.io.*;
022    import java.lang.reflect.*;
023    
024    import org.apache.hadoop.fs.FileSystem;
025    
026    import org.apache.hadoop.mapred.*;
027    import org.apache.hadoop.classification.InterfaceAudience;
028    import org.apache.hadoop.classification.InterfaceStability;
029    import org.apache.hadoop.conf.Configuration;
030    
031    /**
032     * A generic RecordReader that can hand out different recordReaders
033     * for each chunk in a {@link CombineFileSplit}.
034     * A CombineFileSplit can combine data chunks from multiple files. 
035     * This class allows using different RecordReaders for processing
036     * these data chunks from different files.
037     * @see CombineFileSplit
038     */
039    @InterfaceAudience.Public
040    @InterfaceStability.Stable
041    public class CombineFileRecordReader<K, V> implements RecordReader<K, V> {
042    
043      static final Class [] constructorSignature = new Class [] 
044                                             {CombineFileSplit.class, 
045                                              Configuration.class, 
046                                              Reporter.class,
047                                              Integer.class};
048    
049      protected CombineFileSplit split;
050      protected JobConf jc;
051      protected Reporter reporter;
052      protected Class<RecordReader<K, V>> rrClass;
053      protected Constructor<RecordReader<K, V>> rrConstructor;
054      protected FileSystem fs;
055      
056      protected int idx;
057      protected long progress;
058      protected RecordReader<K, V> curReader;
059      
060      public boolean next(K key, V value) throws IOException {
061    
062        while ((curReader == null) || !curReader.next(key, value)) {
063          if (!initNextRecordReader()) {
064            return false;
065          }
066        }
067        return true;
068      }
069    
070      public K createKey() {
071        return curReader.createKey();
072      }
073      
074      public V createValue() {
075        return curReader.createValue();
076      }
077      
078      /**
079       * return the amount of data processed
080       */
081      public long getPos() throws IOException {
082        return progress;
083      }
084      
085      public void close() throws IOException {
086        if (curReader != null) {
087          curReader.close();
088          curReader = null;
089        }
090      }
091      
092      /**
093       * return progress based on the amount of data processed so far.
094       */
095      public float getProgress() throws IOException {
096        return Math.min(1.0f,  progress/(float)(split.getLength()));
097      }
098      
099      /**
100       * A generic RecordReader that can hand out different recordReaders
101       * for each chunk in the CombineFileSplit.
102       */
103      public CombineFileRecordReader(JobConf job, CombineFileSplit split, 
104                                     Reporter reporter,
105                                     Class<RecordReader<K, V>> rrClass)
106        throws IOException {
107        this.split = split;
108        this.jc = job;
109        this.rrClass = rrClass;
110        this.reporter = reporter;
111        this.idx = 0;
112        this.curReader = null;
113        this.progress = 0;
114    
115        try {
116          rrConstructor = rrClass.getDeclaredConstructor(constructorSignature);
117          rrConstructor.setAccessible(true);
118        } catch (Exception e) {
119          throw new RuntimeException(rrClass.getName() + 
120                                     " does not have valid constructor", e);
121        }
122        initNextRecordReader();
123      }
124      
125      /**
126       * Get the record reader for the next chunk in this CombineFileSplit.
127       */
128      protected boolean initNextRecordReader() throws IOException {
129    
130        if (curReader != null) {
131          curReader.close();
132          curReader = null;
133          if (idx > 0) {
134            progress += split.getLength(idx-1);    // done processing so far
135          }
136        }
137    
138        // if all chunks have been processed, nothing more to do.
139        if (idx == split.getNumPaths()) {
140          return false;
141        }
142    
143        reporter.progress();
144    
145        // get a record reader for the idx-th chunk
146        try {
147          curReader =  rrConstructor.newInstance(new Object [] 
148                                {split, jc, reporter, Integer.valueOf(idx)});
149    
150          // setup some helper config variables.
151          jc.set(JobContext.MAP_INPUT_FILE, split.getPath(idx).toString());
152          jc.setLong(JobContext.MAP_INPUT_START, split.getOffset(idx));
153          jc.setLong(JobContext.MAP_INPUT_PATH, split.getLength(idx));
154        } catch (Exception e) {
155          throw new RuntimeException (e);
156        }
157        idx++;
158        return true;
159      }
160    }