001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.hadoop.mapred.lib;
019
020 import java.util.Collections;
021 import java.util.HashMap;
022 import java.util.Map;
023
024 import org.apache.hadoop.classification.InterfaceAudience;
025 import org.apache.hadoop.classification.InterfaceStability;
026 import org.apache.hadoop.fs.Path;
027 import org.apache.hadoop.mapred.InputFormat;
028 import org.apache.hadoop.mapred.JobConf;
029 import org.apache.hadoop.mapred.Mapper;
030 import org.apache.hadoop.util.ReflectionUtils;
031
032 /**
033 * This class supports MapReduce jobs that have multiple input paths with
034 * a different {@link InputFormat} and {@link Mapper} for each path
035 */
036 @InterfaceAudience.Public
037 @InterfaceStability.Stable
038 public class MultipleInputs {
039 /**
040 * Add a {@link Path} with a custom {@link InputFormat} to the list of
041 * inputs for the map-reduce job.
042 *
043 * @param conf The configuration of the job
044 * @param path {@link Path} to be added to the list of inputs for the job
045 * @param inputFormatClass {@link InputFormat} class to use for this path
046 */
047 public static void addInputPath(JobConf conf, Path path,
048 Class<? extends InputFormat> inputFormatClass) {
049
050 String inputFormatMapping = path.toString() + ";"
051 + inputFormatClass.getName();
052 String inputFormats = conf.get("mapreduce.input.multipleinputs.dir.formats");
053 conf.set("mapreduce.input.multipleinputs.dir.formats",
054 inputFormats == null ? inputFormatMapping : inputFormats + ","
055 + inputFormatMapping);
056
057 conf.setInputFormat(DelegatingInputFormat.class);
058 }
059
060 /**
061 * Add a {@link Path} with a custom {@link InputFormat} and
062 * {@link Mapper} to the list of inputs for the map-reduce job.
063 *
064 * @param conf The configuration of the job
065 * @param path {@link Path} to be added to the list of inputs for the job
066 * @param inputFormatClass {@link InputFormat} class to use for this path
067 * @param mapperClass {@link Mapper} class to use for this path
068 */
069 public static void addInputPath(JobConf conf, Path path,
070 Class<? extends InputFormat> inputFormatClass,
071 Class<? extends Mapper> mapperClass) {
072
073 addInputPath(conf, path, inputFormatClass);
074
075 String mapperMapping = path.toString() + ";" + mapperClass.getName();
076 String mappers = conf.get("mapreduce.input.multipleinputs.dir.mappers");
077 conf.set("mapreduce.input.multipleinputs.dir.mappers", mappers == null ? mapperMapping
078 : mappers + "," + mapperMapping);
079
080 conf.setMapperClass(DelegatingMapper.class);
081 }
082
083 /**
084 * Retrieves a map of {@link Path}s to the {@link InputFormat} class
085 * that should be used for them.
086 *
087 * @param conf The confuration of the job
088 * @see #addInputPath(JobConf, Path, Class)
089 * @return A map of paths to inputformats for the job
090 */
091 static Map<Path, InputFormat> getInputFormatMap(JobConf conf) {
092 Map<Path, InputFormat> m = new HashMap<Path, InputFormat>();
093 String[] pathMappings = conf.get("mapreduce.input.multipleinputs.dir.formats").split(",");
094 for (String pathMapping : pathMappings) {
095 String[] split = pathMapping.split(";");
096 InputFormat inputFormat;
097 try {
098 inputFormat = (InputFormat) ReflectionUtils.newInstance(conf
099 .getClassByName(split[1]), conf);
100 } catch (ClassNotFoundException e) {
101 throw new RuntimeException(e);
102 }
103 m.put(new Path(split[0]), inputFormat);
104 }
105 return m;
106 }
107
108 /**
109 * Retrieves a map of {@link Path}s to the {@link Mapper} class that
110 * should be used for them.
111 *
112 * @param conf The confuration of the job
113 * @see #addInputPath(JobConf, Path, Class, Class)
114 * @return A map of paths to mappers for the job
115 */
116 @SuppressWarnings("unchecked")
117 static Map<Path, Class<? extends Mapper>> getMapperTypeMap(JobConf conf) {
118 if (conf.get("mapreduce.input.multipleinputs.dir.mappers") == null) {
119 return Collections.emptyMap();
120 }
121 Map<Path, Class<? extends Mapper>> m = new HashMap<Path, Class<? extends Mapper>>();
122 String[] pathMappings = conf.get("mapreduce.input.multipleinputs.dir.mappers").split(",");
123 for (String pathMapping : pathMappings) {
124 String[] split = pathMapping.split(";");
125 Class<? extends Mapper> mapClass;
126 try {
127 mapClass = (Class<? extends Mapper>) conf.getClassByName(split[1]);
128 } catch (ClassNotFoundException e) {
129 throw new RuntimeException(e);
130 }
131 m.put(new Path(split[0]), mapClass);
132 }
133 return m;
134 }
135 }