001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.mapred;
020
021 import org.apache.hadoop.classification.InterfaceAudience;
022 import org.apache.hadoop.classification.InterfaceStability;
023 import org.apache.hadoop.conf.Configuration;
024 import org.apache.hadoop.fs.Path;
025
026 /**
027 * Utility class for skip bad records functionality. It contains various
028 * settings related to skipping of bad records.
029 *
030 * <p>Hadoop provides an optional mode of execution in which the bad records
031 * are detected and skipped in further attempts.
032 *
033 * <p>This feature can be used when map/reduce tasks crashes deterministically on
034 * certain input. This happens due to bugs in the map/reduce function. The usual
035 * course would be to fix these bugs. But sometimes this is not possible;
036 * perhaps the bug is in third party libraries for which the source code is
037 * not available. Due to this, the task never reaches to completion even with
038 * multiple attempts and complete data for that task is lost.</p>
039 *
040 * <p>With this feature, only a small portion of data is lost surrounding
041 * the bad record, which may be acceptable for some user applications.
042 * see {@link SkipBadRecords#setMapperMaxSkipRecords(Configuration, long)}</p>
043 *
044 * <p>The skipping mode gets kicked off after certain no of failures
045 * see {@link SkipBadRecords#setAttemptsToStartSkipping(Configuration, int)}</p>
046 *
047 * <p>In the skipping mode, the map/reduce task maintains the record range which
048 * is getting processed at all times. Before giving the input to the
049 * map/reduce function, it sends this record range to the Task tracker.
050 * If task crashes, the Task tracker knows which one was the last reported
051 * range. On further attempts that range get skipped.</p>
052 */
053 @InterfaceAudience.Public
054 @InterfaceStability.Stable
055 public class SkipBadRecords {
056
057 /**
058 * Special counters which are written by the application and are
059 * used by the framework for detecting bad records. For detecting bad records
060 * these counters must be incremented by the application.
061 */
062 public static final String COUNTER_GROUP = "SkippingTaskCounters";
063
064 /**
065 * Number of processed map records.
066 * @see SkipBadRecords#getAutoIncrMapperProcCount(Configuration)
067 */
068 public static final String COUNTER_MAP_PROCESSED_RECORDS =
069 "MapProcessedRecords";
070
071 /**
072 * Number of processed reduce groups.
073 * @see SkipBadRecords#getAutoIncrReducerProcCount(Configuration)
074 */
075 public static final String COUNTER_REDUCE_PROCESSED_GROUPS =
076 "ReduceProcessedGroups";
077
078 private static final String ATTEMPTS_TO_START_SKIPPING =
079 JobContext.SKIP_START_ATTEMPTS;
080 private static final String AUTO_INCR_MAP_PROC_COUNT =
081 JobContext.MAP_SKIP_INCR_PROC_COUNT;
082 private static final String AUTO_INCR_REDUCE_PROC_COUNT =
083 JobContext.REDUCE_SKIP_INCR_PROC_COUNT;
084 private static final String OUT_PATH = JobContext.SKIP_OUTDIR;
085 private static final String MAPPER_MAX_SKIP_RECORDS =
086 JobContext.MAP_SKIP_MAX_RECORDS;
087 private static final String REDUCER_MAX_SKIP_GROUPS =
088 JobContext.REDUCE_SKIP_MAXGROUPS;
089
090 /**
091 * Get the number of Task attempts AFTER which skip mode
092 * will be kicked off. When skip mode is kicked off, the
093 * tasks reports the range of records which it will process
094 * next to the TaskTracker. So that on failures, TT knows which
095 * ones are possibly the bad records. On further executions,
096 * those are skipped.
097 * Default value is 2.
098 *
099 * @param conf the configuration
100 * @return attemptsToStartSkipping no of task attempts
101 */
102 public static int getAttemptsToStartSkipping(Configuration conf) {
103 return conf.getInt(ATTEMPTS_TO_START_SKIPPING, 2);
104 }
105
106 /**
107 * Set the number of Task attempts AFTER which skip mode
108 * will be kicked off. When skip mode is kicked off, the
109 * tasks reports the range of records which it will process
110 * next to the TaskTracker. So that on failures, TT knows which
111 * ones are possibly the bad records. On further executions,
112 * those are skipped.
113 * Default value is 2.
114 *
115 * @param conf the configuration
116 * @param attemptsToStartSkipping no of task attempts
117 */
118 public static void setAttemptsToStartSkipping(Configuration conf,
119 int attemptsToStartSkipping) {
120 conf.setInt(ATTEMPTS_TO_START_SKIPPING, attemptsToStartSkipping);
121 }
122
123 /**
124 * Get the flag which if set to true,
125 * {@link SkipBadRecords#COUNTER_MAP_PROCESSED_RECORDS} is incremented
126 * by MapRunner after invoking the map function. This value must be set to
127 * false for applications which process the records asynchronously
128 * or buffer the input records. For example streaming.
129 * In such cases applications should increment this counter on their own.
130 * Default value is true.
131 *
132 * @param conf the configuration
133 * @return <code>true</code> if auto increment
134 * {@link SkipBadRecords#COUNTER_MAP_PROCESSED_RECORDS}.
135 * <code>false</code> otherwise.
136 */
137 public static boolean getAutoIncrMapperProcCount(Configuration conf) {
138 return conf.getBoolean(AUTO_INCR_MAP_PROC_COUNT, true);
139 }
140
141 /**
142 * Set the flag which if set to true,
143 * {@link SkipBadRecords#COUNTER_MAP_PROCESSED_RECORDS} is incremented
144 * by MapRunner after invoking the map function. This value must be set to
145 * false for applications which process the records asynchronously
146 * or buffer the input records. For example streaming.
147 * In such cases applications should increment this counter on their own.
148 * Default value is true.
149 *
150 * @param conf the configuration
151 * @param autoIncr whether to auto increment
152 * {@link SkipBadRecords#COUNTER_MAP_PROCESSED_RECORDS}.
153 */
154 public static void setAutoIncrMapperProcCount(Configuration conf,
155 boolean autoIncr) {
156 conf.setBoolean(AUTO_INCR_MAP_PROC_COUNT, autoIncr);
157 }
158
159 /**
160 * Get the flag which if set to true,
161 * {@link SkipBadRecords#COUNTER_REDUCE_PROCESSED_GROUPS} is incremented
162 * by framework after invoking the reduce function. This value must be set to
163 * false for applications which process the records asynchronously
164 * or buffer the input records. For example streaming.
165 * In such cases applications should increment this counter on their own.
166 * Default value is true.
167 *
168 * @param conf the configuration
169 * @return <code>true</code> if auto increment
170 * {@link SkipBadRecords#COUNTER_REDUCE_PROCESSED_GROUPS}.
171 * <code>false</code> otherwise.
172 */
173 public static boolean getAutoIncrReducerProcCount(Configuration conf) {
174 return conf.getBoolean(AUTO_INCR_REDUCE_PROC_COUNT, true);
175 }
176
177 /**
178 * Set the flag which if set to true,
179 * {@link SkipBadRecords#COUNTER_REDUCE_PROCESSED_GROUPS} is incremented
180 * by framework after invoking the reduce function. This value must be set to
181 * false for applications which process the records asynchronously
182 * or buffer the input records. For example streaming.
183 * In such cases applications should increment this counter on their own.
184 * Default value is true.
185 *
186 * @param conf the configuration
187 * @param autoIncr whether to auto increment
188 * {@link SkipBadRecords#COUNTER_REDUCE_PROCESSED_GROUPS}.
189 */
190 public static void setAutoIncrReducerProcCount(Configuration conf,
191 boolean autoIncr) {
192 conf.setBoolean(AUTO_INCR_REDUCE_PROC_COUNT, autoIncr);
193 }
194
195 /**
196 * Get the directory to which skipped records are written. By default it is
197 * the sub directory of the output _logs directory.
198 * User can stop writing skipped records by setting the value null.
199 *
200 * @param conf the configuration.
201 * @return path skip output directory. Null is returned if this is not set
202 * and output directory is also not set.
203 */
204 public static Path getSkipOutputPath(Configuration conf) {
205 String name = conf.get(OUT_PATH);
206 if(name!=null) {
207 if("none".equals(name)) {
208 return null;
209 }
210 return new Path(name);
211 }
212 Path outPath = FileOutputFormat.getOutputPath(new JobConf(conf));
213 return outPath==null ? null : new Path(outPath,
214 "_logs"+Path.SEPARATOR+"skip");
215 }
216
217 /**
218 * Set the directory to which skipped records are written. By default it is
219 * the sub directory of the output _logs directory.
220 * User can stop writing skipped records by setting the value null.
221 *
222 * @param conf the configuration.
223 * @param path skip output directory path
224 */
225 public static void setSkipOutputPath(JobConf conf, Path path) {
226 String pathStr = null;
227 if(path==null) {
228 pathStr = "none";
229 } else {
230 pathStr = path.toString();
231 }
232 conf.set(OUT_PATH, pathStr);
233 }
234
235 /**
236 * Get the number of acceptable skip records surrounding the bad record PER
237 * bad record in mapper. The number includes the bad record as well.
238 * To turn the feature of detection/skipping of bad records off, set the
239 * value to 0.
240 * The framework tries to narrow down the skipped range by retrying
241 * until this threshold is met OR all attempts get exhausted for this task.
242 * Set the value to Long.MAX_VALUE to indicate that framework need not try to
243 * narrow down. Whatever records(depends on application) get skipped are
244 * acceptable.
245 * Default value is 0.
246 *
247 * @param conf the configuration
248 * @return maxSkipRecs acceptable skip records.
249 */
250 public static long getMapperMaxSkipRecords(Configuration conf) {
251 return conf.getLong(MAPPER_MAX_SKIP_RECORDS, 0);
252 }
253
254 /**
255 * Set the number of acceptable skip records surrounding the bad record PER
256 * bad record in mapper. The number includes the bad record as well.
257 * To turn the feature of detection/skipping of bad records off, set the
258 * value to 0.
259 * The framework tries to narrow down the skipped range by retrying
260 * until this threshold is met OR all attempts get exhausted for this task.
261 * Set the value to Long.MAX_VALUE to indicate that framework need not try to
262 * narrow down. Whatever records(depends on application) get skipped are
263 * acceptable.
264 * Default value is 0.
265 *
266 * @param conf the configuration
267 * @param maxSkipRecs acceptable skip records.
268 */
269 public static void setMapperMaxSkipRecords(Configuration conf,
270 long maxSkipRecs) {
271 conf.setLong(MAPPER_MAX_SKIP_RECORDS, maxSkipRecs);
272 }
273
274 /**
275 * Get the number of acceptable skip groups surrounding the bad group PER
276 * bad group in reducer. The number includes the bad group as well.
277 * To turn the feature of detection/skipping of bad groups off, set the
278 * value to 0.
279 * The framework tries to narrow down the skipped range by retrying
280 * until this threshold is met OR all attempts get exhausted for this task.
281 * Set the value to Long.MAX_VALUE to indicate that framework need not try to
282 * narrow down. Whatever groups(depends on application) get skipped are
283 * acceptable.
284 * Default value is 0.
285 *
286 * @param conf the configuration
287 * @return maxSkipGrps acceptable skip groups.
288 */
289 public static long getReducerMaxSkipGroups(Configuration conf) {
290 return conf.getLong(REDUCER_MAX_SKIP_GROUPS, 0);
291 }
292
293 /**
294 * Set the number of acceptable skip groups surrounding the bad group PER
295 * bad group in reducer. The number includes the bad group as well.
296 * To turn the feature of detection/skipping of bad groups off, set the
297 * value to 0.
298 * The framework tries to narrow down the skipped range by retrying
299 * until this threshold is met OR all attempts get exhausted for this task.
300 * Set the value to Long.MAX_VALUE to indicate that framework need not try to
301 * narrow down. Whatever groups(depends on application) get skipped are
302 * acceptable.
303 * Default value is 0.
304 *
305 * @param conf the configuration
306 * @param maxSkipGrps acceptable skip groups.
307 */
308 public static void setReducerMaxSkipGroups(Configuration conf,
309 long maxSkipGrps) {
310 conf.setLong(REDUCER_MAX_SKIP_GROUPS, maxSkipGrps);
311 }
312 }