001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.mapred.lib; 020 021import java.io.*; 022import java.lang.reflect.*; 023 024import org.apache.hadoop.mapred.*; 025import org.apache.hadoop.classification.InterfaceAudience; 026import org.apache.hadoop.classification.InterfaceStability; 027import org.apache.hadoop.conf.Configuration; 028 029/** 030 * A generic RecordReader that can hand out different recordReaders 031 * for each chunk in a {@link CombineFileSplit}. 032 * A CombineFileSplit can combine data chunks from multiple files. 033 * This class allows using different RecordReaders for processing 034 * these data chunks from different files. 035 * @see CombineFileSplit 036 */ 037@InterfaceAudience.Public 038@InterfaceStability.Stable 039public class CombineFileRecordReader<K, V> implements RecordReader<K, V> { 040 041 static final Class [] constructorSignature = new Class [] 042 {CombineFileSplit.class, 043 Configuration.class, 044 Reporter.class, 045 Integer.class}; 046 047 protected CombineFileSplit split; 048 protected JobConf jc; 049 protected Reporter reporter; 050 protected Constructor<RecordReader<K, V>> rrConstructor; 051 052 protected int idx; 053 protected long progress; 054 protected RecordReader<K, V> curReader; 055 056 public boolean next(K key, V value) throws IOException { 057 058 while ((curReader == null) || !curReader.next(key, value)) { 059 if (!initNextRecordReader()) { 060 return false; 061 } 062 } 063 return true; 064 } 065 066 public K createKey() { 067 return curReader.createKey(); 068 } 069 070 public V createValue() { 071 return curReader.createValue(); 072 } 073 074 /** 075 * return the amount of data processed 076 */ 077 public long getPos() throws IOException { 078 return progress; 079 } 080 081 public void close() throws IOException { 082 if (curReader != null) { 083 curReader.close(); 084 curReader = null; 085 } 086 } 087 088 /** 089 * return progress based on the amount of data processed so far. 090 */ 091 public float getProgress() throws IOException { 092 return Math.min(1.0f, progress/(float)(split.getLength())); 093 } 094 095 /** 096 * A generic RecordReader that can hand out different recordReaders 097 * for each chunk in the CombineFileSplit. 098 */ 099 public CombineFileRecordReader(JobConf job, CombineFileSplit split, 100 Reporter reporter, 101 Class<RecordReader<K, V>> rrClass) 102 throws IOException { 103 this.split = split; 104 this.jc = job; 105 this.reporter = reporter; 106 this.idx = 0; 107 this.curReader = null; 108 this.progress = 0; 109 110 try { 111 rrConstructor = rrClass.getDeclaredConstructor(constructorSignature); 112 rrConstructor.setAccessible(true); 113 } catch (Exception e) { 114 throw new RuntimeException(rrClass.getName() + 115 " does not have valid constructor", e); 116 } 117 initNextRecordReader(); 118 } 119 120 /** 121 * Get the record reader for the next chunk in this CombineFileSplit. 122 */ 123 protected boolean initNextRecordReader() throws IOException { 124 125 if (curReader != null) { 126 curReader.close(); 127 curReader = null; 128 if (idx > 0) { 129 progress += split.getLength(idx-1); // done processing so far 130 } 131 } 132 133 // if all chunks have been processed, nothing more to do. 134 if (idx == split.getNumPaths()) { 135 return false; 136 } 137 138 reporter.progress(); 139 140 // get a record reader for the idx-th chunk 141 try { 142 curReader = rrConstructor.newInstance(new Object [] 143 {split, jc, reporter, Integer.valueOf(idx)}); 144 145 // setup some helper config variables. 146 jc.set(JobContext.MAP_INPUT_FILE, split.getPath(idx).toString()); 147 jc.setLong(JobContext.MAP_INPUT_START, split.getOffset(idx)); 148 jc.setLong(JobContext.MAP_INPUT_PATH, split.getLength(idx)); 149 } catch (Exception e) { 150 throw new RuntimeException (e); 151 } 152 idx++; 153 return true; 154 } 155}