001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.mapred.lib; 019 020import java.util.Collections; 021import java.util.HashMap; 022import java.util.Map; 023 024import org.apache.hadoop.classification.InterfaceAudience; 025import org.apache.hadoop.classification.InterfaceStability; 026import org.apache.hadoop.fs.Path; 027import org.apache.hadoop.mapred.InputFormat; 028import org.apache.hadoop.mapred.JobConf; 029import org.apache.hadoop.mapred.Mapper; 030import org.apache.hadoop.util.ReflectionUtils; 031 032/** 033 * This class supports MapReduce jobs that have multiple input paths with 034 * a different {@link InputFormat} and {@link Mapper} for each path 035 */ 036@InterfaceAudience.Public 037@InterfaceStability.Stable 038public class MultipleInputs { 039 /** 040 * Add a {@link Path} with a custom {@link InputFormat} to the list of 041 * inputs for the map-reduce job. 042 * 043 * @param conf The configuration of the job 044 * @param path {@link Path} to be added to the list of inputs for the job 045 * @param inputFormatClass {@link InputFormat} class to use for this path 046 */ 047 public static void addInputPath(JobConf conf, Path path, 048 Class<? extends InputFormat> inputFormatClass) { 049 050 String inputFormatMapping = path.toString() + ";" 051 + inputFormatClass.getName(); 052 String inputFormats = conf.get("mapreduce.input.multipleinputs.dir.formats"); 053 conf.set("mapreduce.input.multipleinputs.dir.formats", 054 inputFormats == null ? inputFormatMapping : inputFormats + "," 055 + inputFormatMapping); 056 057 conf.setInputFormat(DelegatingInputFormat.class); 058 } 059 060 /** 061 * Add a {@link Path} with a custom {@link InputFormat} and 062 * {@link Mapper} to the list of inputs for the map-reduce job. 063 * 064 * @param conf The configuration of the job 065 * @param path {@link Path} to be added to the list of inputs for the job 066 * @param inputFormatClass {@link InputFormat} class to use for this path 067 * @param mapperClass {@link Mapper} class to use for this path 068 */ 069 public static void addInputPath(JobConf conf, Path path, 070 Class<? extends InputFormat> inputFormatClass, 071 Class<? extends Mapper> mapperClass) { 072 073 addInputPath(conf, path, inputFormatClass); 074 075 String mapperMapping = path.toString() + ";" + mapperClass.getName(); 076 String mappers = conf.get("mapreduce.input.multipleinputs.dir.mappers"); 077 conf.set("mapreduce.input.multipleinputs.dir.mappers", mappers == null ? mapperMapping 078 : mappers + "," + mapperMapping); 079 080 conf.setMapperClass(DelegatingMapper.class); 081 } 082 083 /** 084 * Retrieves a map of {@link Path}s to the {@link InputFormat} class 085 * that should be used for them. 086 * 087 * @param conf The confuration of the job 088 * @see #addInputPath(JobConf, Path, Class) 089 * @return A map of paths to inputformats for the job 090 */ 091 static Map<Path, InputFormat> getInputFormatMap(JobConf conf) { 092 Map<Path, InputFormat> m = new HashMap<Path, InputFormat>(); 093 String[] pathMappings = conf.get("mapreduce.input.multipleinputs.dir.formats").split(","); 094 for (String pathMapping : pathMappings) { 095 String[] split = pathMapping.split(";"); 096 InputFormat inputFormat; 097 try { 098 inputFormat = (InputFormat) ReflectionUtils.newInstance(conf 099 .getClassByName(split[1]), conf); 100 } catch (ClassNotFoundException e) { 101 throw new RuntimeException(e); 102 } 103 m.put(new Path(split[0]), inputFormat); 104 } 105 return m; 106 } 107 108 /** 109 * Retrieves a map of {@link Path}s to the {@link Mapper} class that 110 * should be used for them. 111 * 112 * @param conf The confuration of the job 113 * @see #addInputPath(JobConf, Path, Class, Class) 114 * @return A map of paths to mappers for the job 115 */ 116 @SuppressWarnings("unchecked") 117 static Map<Path, Class<? extends Mapper>> getMapperTypeMap(JobConf conf) { 118 if (conf.get("mapreduce.input.multipleinputs.dir.mappers") == null) { 119 return Collections.emptyMap(); 120 } 121 Map<Path, Class<? extends Mapper>> m = new HashMap<Path, Class<? extends Mapper>>(); 122 String[] pathMappings = conf.get("mapreduce.input.multipleinputs.dir.mappers").split(","); 123 for (String pathMapping : pathMappings) { 124 String[] split = pathMapping.split(";"); 125 Class<? extends Mapper> mapClass; 126 try { 127 mapClass = (Class<? extends Mapper>) conf.getClassByName(split[1]); 128 } catch (ClassNotFoundException e) { 129 throw new RuntimeException(e); 130 } 131 m.put(new Path(split[0]), mapClass); 132 } 133 return m; 134 } 135}