001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.hadoop.mapreduce.lib.chain;
019
020 import java.io.IOException;
021
022 import org.apache.hadoop.classification.InterfaceAudience;
023 import org.apache.hadoop.classification.InterfaceStability;
024 import org.apache.hadoop.conf.Configuration;
025 import org.apache.hadoop.mapreduce.Job;
026 import org.apache.hadoop.mapreduce.Mapper;
027 import org.apache.hadoop.mapreduce.lib.chain.Chain.ChainBlockingQueue;
028
029 /**
030 * The ChainMapper class allows to use multiple Mapper classes within a single
031 * Map task.
032 *
033 * <p>
034 * The Mapper classes are invoked in a chained (or piped) fashion, the output of
035 * the first becomes the input of the second, and so on until the last Mapper,
036 * the output of the last Mapper will be written to the task's output.
037 * </p>
038 * <p>
039 * The key functionality of this feature is that the Mappers in the chain do not
040 * need to be aware that they are executed in a chain. This enables having
041 * reusable specialized Mappers that can be combined to perform composite
042 * operations within a single task.
043 * </p>
044 * <p>
045 * Special care has to be taken when creating chains that the key/values output
046 * by a Mapper are valid for the following Mapper in the chain. It is assumed
047 * all Mappers and the Reduce in the chain use matching output and input key and
048 * value classes as no conversion is done by the chaining code.
049 * </p>
050 * <p>
051 * Using the ChainMapper and the ChainReducer classes is possible to compose
052 * Map/Reduce jobs that look like <code>[MAP+ / REDUCE MAP*]</code>. And
053 * immediate benefit of this pattern is a dramatic reduction in disk IO.
054 * </p>
055 * <p>
056 * IMPORTANT: There is no need to specify the output key/value classes for the
057 * ChainMapper, this is done by the addMapper for the last mapper in the chain.
058 * </p>
059 * ChainMapper usage pattern:
060 * <p/>
061 *
062 * <pre>
063 * ...
064 * Job = new Job(conf);
065 * <p/>
066 * Configuration mapAConf = new Configuration(false);
067 * ...
068 * ChainMapper.addMapper(job, AMap.class, LongWritable.class, Text.class,
069 * Text.class, Text.class, true, mapAConf);
070 * <p/>
071 * Configuration mapBConf = new Configuration(false);
072 * ...
073 * ChainMapper.addMapper(job, BMap.class, Text.class, Text.class,
074 * LongWritable.class, Text.class, false, mapBConf);
075 * <p/>
076 * ...
077 * <p/>
078 * job.waitForComplettion(true);
079 * ...
080 * </pre>
081 */
082 @InterfaceAudience.Public
083 @InterfaceStability.Stable
084 public class ChainMapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends
085 Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
086
087 /**
088 * Adds a {@link Mapper} class to the chain mapper.
089 *
090 * <p>
091 * The key and values are passed from one element of the chain to the next, by
092 * value. For the added Mapper the configuration given for it,
093 * <code>mapperConf</code>, have precedence over the job's Configuration. This
094 * precedence is in effect when the task is running.
095 * </p>
096 * <p>
097 * IMPORTANT: There is no need to specify the output key/value classes for the
098 * ChainMapper, this is done by the addMapper for the last mapper in the chain
099 * </p>
100 *
101 * @param job
102 * The job.
103 * @param klass
104 * the Mapper class to add.
105 * @param inputKeyClass
106 * mapper input key class.
107 * @param inputValueClass
108 * mapper input value class.
109 * @param outputKeyClass
110 * mapper output key class.
111 * @param outputValueClass
112 * mapper output value class.
113 * @param mapperConf
114 * a configuration for the Mapper class. It is recommended to use a
115 * Configuration without default values using the
116 * <code>Configuration(boolean loadDefaults)</code> constructor with
117 * FALSE.
118 */
119 public static void addMapper(Job job, Class<? extends Mapper> klass,
120 Class<?> inputKeyClass, Class<?> inputValueClass,
121 Class<?> outputKeyClass, Class<?> outputValueClass,
122 Configuration mapperConf) throws IOException {
123 job.setMapperClass(ChainMapper.class);
124 job.setMapOutputKeyClass(outputKeyClass);
125 job.setMapOutputValueClass(outputValueClass);
126 Chain.addMapper(true, job, klass, inputKeyClass, inputValueClass,
127 outputKeyClass, outputValueClass, mapperConf);
128 }
129
130 private Chain chain;
131
132 protected void setup(Context context) {
133 chain = new Chain(true);
134 chain.setup(context.getConfiguration());
135 }
136
137 public void run(Context context) throws IOException, InterruptedException {
138
139 setup(context);
140
141 int numMappers = chain.getAllMappers().size();
142 if (numMappers == 0) {
143 return;
144 }
145
146 ChainBlockingQueue<Chain.KeyValuePair<?, ?>> inputqueue;
147 ChainBlockingQueue<Chain.KeyValuePair<?, ?>> outputqueue;
148 if (numMappers == 1) {
149 chain.runMapper(context, 0);
150 } else {
151 // add all the mappers with proper context
152 // add first mapper
153 outputqueue = chain.createBlockingQueue();
154 chain.addMapper(context, outputqueue, 0);
155 // add other mappers
156 for (int i = 1; i < numMappers - 1; i++) {
157 inputqueue = outputqueue;
158 outputqueue = chain.createBlockingQueue();
159 chain.addMapper(inputqueue, outputqueue, context, i);
160 }
161 // add last mapper
162 chain.addMapper(outputqueue, context, numMappers - 1);
163 }
164
165 // start all threads
166 chain.startAllThreads();
167
168 // wait for all threads
169 chain.joinAllThreads();
170 }
171 }