001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.hadoop.mapreduce.lib.input; 019 020 import java.util.Collections; 021 import java.util.HashMap; 022 import java.util.Map; 023 024 import org.apache.hadoop.classification.InterfaceAudience; 025 import org.apache.hadoop.classification.InterfaceStability; 026 import org.apache.hadoop.conf.Configuration; 027 import org.apache.hadoop.fs.Path; 028 import org.apache.hadoop.mapreduce.InputFormat; 029 import org.apache.hadoop.mapreduce.Job; 030 import org.apache.hadoop.mapreduce.JobContext; 031 import org.apache.hadoop.mapreduce.Mapper; 032 import org.apache.hadoop.util.ReflectionUtils; 033 034 /** 035 * This class supports MapReduce jobs that have multiple input paths with 036 * a different {@link InputFormat} and {@link Mapper} for each path 037 */ 038 @InterfaceAudience.Public 039 @InterfaceStability.Stable 040 public class MultipleInputs { 041 public static final String DIR_FORMATS = 042 "mapreduce.input.multipleinputs.dir.formats"; 043 public static final String DIR_MAPPERS = 044 "mapreduce.input.multipleinputs.dir.mappers"; 045 046 /** 047 * Add a {@link Path} with a custom {@link InputFormat} to the list of 048 * inputs for the map-reduce job. 049 * 050 * @param job The {@link Job} 051 * @param path {@link Path} to be added to the list of inputs for the job 052 * @param inputFormatClass {@link InputFormat} class to use for this path 053 */ 054 @SuppressWarnings("unchecked") 055 public static void addInputPath(Job job, Path path, 056 Class<? extends InputFormat> inputFormatClass) { 057 String inputFormatMapping = path.toString() + ";" 058 + inputFormatClass.getName(); 059 Configuration conf = job.getConfiguration(); 060 String inputFormats = conf.get(DIR_FORMATS); 061 conf.set(DIR_FORMATS, 062 inputFormats == null ? inputFormatMapping : inputFormats + "," 063 + inputFormatMapping); 064 065 job.setInputFormatClass(DelegatingInputFormat.class); 066 } 067 068 /** 069 * Add a {@link Path} with a custom {@link InputFormat} and 070 * {@link Mapper} to the list of inputs for the map-reduce job. 071 * 072 * @param job The {@link Job} 073 * @param path {@link Path} to be added to the list of inputs for the job 074 * @param inputFormatClass {@link InputFormat} class to use for this path 075 * @param mapperClass {@link Mapper} class to use for this path 076 */ 077 @SuppressWarnings("unchecked") 078 public static void addInputPath(Job job, Path path, 079 Class<? extends InputFormat> inputFormatClass, 080 Class<? extends Mapper> mapperClass) { 081 082 addInputPath(job, path, inputFormatClass); 083 Configuration conf = job.getConfiguration(); 084 String mapperMapping = path.toString() + ";" + mapperClass.getName(); 085 String mappers = conf.get(DIR_MAPPERS); 086 conf.set(DIR_MAPPERS, mappers == null ? mapperMapping 087 : mappers + "," + mapperMapping); 088 089 job.setMapperClass(DelegatingMapper.class); 090 } 091 092 /** 093 * Retrieves a map of {@link Path}s to the {@link InputFormat} class 094 * that should be used for them. 095 * 096 * @param job The {@link JobContext} 097 * @see #addInputPath(JobConf, Path, Class) 098 * @return A map of paths to inputformats for the job 099 */ 100 @SuppressWarnings("unchecked") 101 static Map<Path, InputFormat> getInputFormatMap(JobContext job) { 102 Map<Path, InputFormat> m = new HashMap<Path, InputFormat>(); 103 Configuration conf = job.getConfiguration(); 104 String[] pathMappings = conf.get(DIR_FORMATS).split(","); 105 for (String pathMapping : pathMappings) { 106 String[] split = pathMapping.split(";"); 107 InputFormat inputFormat; 108 try { 109 inputFormat = (InputFormat) ReflectionUtils.newInstance(conf 110 .getClassByName(split[1]), conf); 111 } catch (ClassNotFoundException e) { 112 throw new RuntimeException(e); 113 } 114 m.put(new Path(split[0]), inputFormat); 115 } 116 return m; 117 } 118 119 /** 120 * Retrieves a map of {@link Path}s to the {@link Mapper} class that 121 * should be used for them. 122 * 123 * @param job The {@link JobContext} 124 * @see #addInputPath(JobConf, Path, Class, Class) 125 * @return A map of paths to mappers for the job 126 */ 127 @SuppressWarnings("unchecked") 128 static Map<Path, Class<? extends Mapper>> 129 getMapperTypeMap(JobContext job) { 130 Configuration conf = job.getConfiguration(); 131 if (conf.get(DIR_MAPPERS) == null) { 132 return Collections.emptyMap(); 133 } 134 Map<Path, Class<? extends Mapper>> m = 135 new HashMap<Path, Class<? extends Mapper>>(); 136 String[] pathMappings = conf.get(DIR_MAPPERS).split(","); 137 for (String pathMapping : pathMappings) { 138 String[] split = pathMapping.split(";"); 139 Class<? extends Mapper> mapClass; 140 try { 141 mapClass = 142 (Class<? extends Mapper>) conf.getClassByName(split[1]); 143 } catch (ClassNotFoundException e) { 144 throw new RuntimeException(e); 145 } 146 m.put(new Path(split[0]), mapClass); 147 } 148 return m; 149 } 150 }