001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.hadoop.mapreduce.lib.input;
019
020 import java.util.Collections;
021 import java.util.HashMap;
022 import java.util.Map;
023
024 import org.apache.hadoop.classification.InterfaceAudience;
025 import org.apache.hadoop.classification.InterfaceStability;
026 import org.apache.hadoop.conf.Configuration;
027 import org.apache.hadoop.fs.Path;
028 import org.apache.hadoop.mapreduce.InputFormat;
029 import org.apache.hadoop.mapreduce.Job;
030 import org.apache.hadoop.mapreduce.JobContext;
031 import org.apache.hadoop.mapreduce.Mapper;
032 import org.apache.hadoop.util.ReflectionUtils;
033
034 /**
035 * This class supports MapReduce jobs that have multiple input paths with
036 * a different {@link InputFormat} and {@link Mapper} for each path
037 */
038 @InterfaceAudience.Public
039 @InterfaceStability.Stable
040 public class MultipleInputs {
041 public static final String DIR_FORMATS =
042 "mapreduce.input.multipleinputs.dir.formats";
043 public static final String DIR_MAPPERS =
044 "mapreduce.input.multipleinputs.dir.mappers";
045
046 /**
047 * Add a {@link Path} with a custom {@link InputFormat} to the list of
048 * inputs for the map-reduce job.
049 *
050 * @param job The {@link Job}
051 * @param path {@link Path} to be added to the list of inputs for the job
052 * @param inputFormatClass {@link InputFormat} class to use for this path
053 */
054 @SuppressWarnings("unchecked")
055 public static void addInputPath(Job job, Path path,
056 Class<? extends InputFormat> inputFormatClass) {
057 String inputFormatMapping = path.toString() + ";"
058 + inputFormatClass.getName();
059 Configuration conf = job.getConfiguration();
060 String inputFormats = conf.get(DIR_FORMATS);
061 conf.set(DIR_FORMATS,
062 inputFormats == null ? inputFormatMapping : inputFormats + ","
063 + inputFormatMapping);
064
065 job.setInputFormatClass(DelegatingInputFormat.class);
066 }
067
068 /**
069 * Add a {@link Path} with a custom {@link InputFormat} and
070 * {@link Mapper} to the list of inputs for the map-reduce job.
071 *
072 * @param job The {@link Job}
073 * @param path {@link Path} to be added to the list of inputs for the job
074 * @param inputFormatClass {@link InputFormat} class to use for this path
075 * @param mapperClass {@link Mapper} class to use for this path
076 */
077 @SuppressWarnings("unchecked")
078 public static void addInputPath(Job job, Path path,
079 Class<? extends InputFormat> inputFormatClass,
080 Class<? extends Mapper> mapperClass) {
081
082 addInputPath(job, path, inputFormatClass);
083 Configuration conf = job.getConfiguration();
084 String mapperMapping = path.toString() + ";" + mapperClass.getName();
085 String mappers = conf.get(DIR_MAPPERS);
086 conf.set(DIR_MAPPERS, mappers == null ? mapperMapping
087 : mappers + "," + mapperMapping);
088
089 job.setMapperClass(DelegatingMapper.class);
090 }
091
092 /**
093 * Retrieves a map of {@link Path}s to the {@link InputFormat} class
094 * that should be used for them.
095 *
096 * @param job The {@link JobContext}
097 * @see #addInputPath(JobConf, Path, Class)
098 * @return A map of paths to inputformats for the job
099 */
100 @SuppressWarnings("unchecked")
101 static Map<Path, InputFormat> getInputFormatMap(JobContext job) {
102 Map<Path, InputFormat> m = new HashMap<Path, InputFormat>();
103 Configuration conf = job.getConfiguration();
104 String[] pathMappings = conf.get(DIR_FORMATS).split(",");
105 for (String pathMapping : pathMappings) {
106 String[] split = pathMapping.split(";");
107 InputFormat inputFormat;
108 try {
109 inputFormat = (InputFormat) ReflectionUtils.newInstance(conf
110 .getClassByName(split[1]), conf);
111 } catch (ClassNotFoundException e) {
112 throw new RuntimeException(e);
113 }
114 m.put(new Path(split[0]), inputFormat);
115 }
116 return m;
117 }
118
119 /**
120 * Retrieves a map of {@link Path}s to the {@link Mapper} class that
121 * should be used for them.
122 *
123 * @param job The {@link JobContext}
124 * @see #addInputPath(JobConf, Path, Class, Class)
125 * @return A map of paths to mappers for the job
126 */
127 @SuppressWarnings("unchecked")
128 static Map<Path, Class<? extends Mapper>>
129 getMapperTypeMap(JobContext job) {
130 Configuration conf = job.getConfiguration();
131 if (conf.get(DIR_MAPPERS) == null) {
132 return Collections.emptyMap();
133 }
134 Map<Path, Class<? extends Mapper>> m =
135 new HashMap<Path, Class<? extends Mapper>>();
136 String[] pathMappings = conf.get(DIR_MAPPERS).split(",");
137 for (String pathMapping : pathMappings) {
138 String[] split = pathMapping.split(";");
139 Class<? extends Mapper> mapClass;
140 try {
141 mapClass =
142 (Class<? extends Mapper>) conf.getClassByName(split[1]);
143 } catch (ClassNotFoundException e) {
144 throw new RuntimeException(e);
145 }
146 m.put(new Path(split[0]), mapClass);
147 }
148 return m;
149 }
150 }