001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.mapred;
020
021import org.apache.hadoop.classification.InterfaceAudience;
022import org.apache.hadoop.classification.InterfaceStability;
023import org.apache.hadoop.conf.Configuration;
024import org.apache.hadoop.fs.Path;
025
026/**
027 * Utility class for skip bad records functionality. It contains various 
028 * settings related to skipping of bad records.
029 * 
030 * <p>Hadoop provides an optional mode of execution in which the bad records
031 * are detected and skipped in further attempts.
032 * 
033 * <p>This feature can be used when map/reduce tasks crashes deterministically on 
034 * certain input. This happens due to bugs in the map/reduce function. The usual
035 * course would be to fix these bugs. But sometimes this is not possible; 
036 * perhaps the bug is in third party libraries for which the source code is 
037 * not available. Due to this, the task never reaches to completion even with 
038 * multiple attempts and complete data for that task is lost.</p>
039 *  
040 * <p>With this feature, only a small portion of data is lost surrounding 
041 * the bad record, which may be acceptable for some user applications.
042 * see {@link SkipBadRecords#setMapperMaxSkipRecords(Configuration, long)}</p>
043 * 
044 * <p>The skipping mode gets kicked off after certain no of failures 
045 * see {@link SkipBadRecords#setAttemptsToStartSkipping(Configuration, int)}</p>
046 *  
047 * <p>In the skipping mode, the map/reduce task maintains the record range which 
048 * is getting processed at all times. Before giving the input to the
049 * map/reduce function, it sends this record range to the Task tracker.
050 * If task crashes, the Task tracker knows which one was the last reported
051 * range. On further attempts that range get skipped.</p>
052 */
053@InterfaceAudience.Public
054@InterfaceStability.Stable
055public class SkipBadRecords {
056  
057  /**
058   * Special counters which are written by the application and are 
059   * used by the framework for detecting bad records. For detecting bad records 
060   * these counters must be incremented by the application.
061   */
062  public static final String COUNTER_GROUP = "SkippingTaskCounters";
063  
064  /**
065   * Number of processed map records.
066   * @see SkipBadRecords#getAutoIncrMapperProcCount(Configuration)
067   */
068  public static final String COUNTER_MAP_PROCESSED_RECORDS = 
069    "MapProcessedRecords";
070  
071  /**
072   * Number of processed reduce groups.
073   * @see SkipBadRecords#getAutoIncrReducerProcCount(Configuration)
074   */
075  public static final String COUNTER_REDUCE_PROCESSED_GROUPS = 
076    "ReduceProcessedGroups";
077  
078  private static final String ATTEMPTS_TO_START_SKIPPING = 
079    JobContext.SKIP_START_ATTEMPTS;
080  private static final String AUTO_INCR_MAP_PROC_COUNT = 
081    JobContext.MAP_SKIP_INCR_PROC_COUNT;
082  private static final String AUTO_INCR_REDUCE_PROC_COUNT = 
083    JobContext.REDUCE_SKIP_INCR_PROC_COUNT;
084  private static final String OUT_PATH = JobContext.SKIP_OUTDIR;
085  private static final String MAPPER_MAX_SKIP_RECORDS = 
086    JobContext.MAP_SKIP_MAX_RECORDS;
087  private static final String REDUCER_MAX_SKIP_GROUPS = 
088    JobContext.REDUCE_SKIP_MAXGROUPS;
089  
090  /**
091   * Get the number of Task attempts AFTER which skip mode 
092   * will be kicked off. When skip mode is kicked off, the 
093   * tasks reports the range of records which it will process 
094   * next to the TaskTracker. So that on failures, TT knows which 
095   * ones are possibly the bad records. On further executions, 
096   * those are skipped.
097   * Default value is 2.
098   * 
099   * @param conf the configuration
100   * @return attemptsToStartSkipping no of task attempts
101   */
102  public static int getAttemptsToStartSkipping(Configuration conf) {
103    return conf.getInt(ATTEMPTS_TO_START_SKIPPING, 2);
104  }
105
106  /**
107   * Set the number of Task attempts AFTER which skip mode 
108   * will be kicked off. When skip mode is kicked off, the 
109   * tasks reports the range of records which it will process 
110   * next to the TaskTracker. So that on failures, TT knows which 
111   * ones are possibly the bad records. On further executions, 
112   * those are skipped.
113   * Default value is 2.
114   * 
115   * @param conf the configuration
116   * @param attemptsToStartSkipping no of task attempts
117   */
118  public static void setAttemptsToStartSkipping(Configuration conf, 
119      int attemptsToStartSkipping) {
120    conf.setInt(ATTEMPTS_TO_START_SKIPPING, attemptsToStartSkipping);
121  }
122
123  /**
124   * Get the flag which if set to true, 
125   * {@link SkipBadRecords#COUNTER_MAP_PROCESSED_RECORDS} is incremented 
126   * by MapRunner after invoking the map function. This value must be set to 
127   * false for applications which process the records asynchronously 
128   * or buffer the input records. For example streaming. 
129   * In such cases applications should increment this counter on their own.
130   * Default value is true.
131   * 
132   * @param conf the configuration
133   * @return <code>true</code> if auto increment 
134   *                       {@link SkipBadRecords#COUNTER_MAP_PROCESSED_RECORDS}.
135   *         <code>false</code> otherwise.
136   */
137  public static boolean getAutoIncrMapperProcCount(Configuration conf) {
138    return conf.getBoolean(AUTO_INCR_MAP_PROC_COUNT, true);
139  }
140  
141  /**
142   * Set the flag which if set to true, 
143   * {@link SkipBadRecords#COUNTER_MAP_PROCESSED_RECORDS} is incremented 
144   * by MapRunner after invoking the map function. This value must be set to 
145   * false for applications which process the records asynchronously 
146   * or buffer the input records. For example streaming. 
147   * In such cases applications should increment this counter on their own.
148   * Default value is true.
149   * 
150   * @param conf the configuration
151   * @param autoIncr whether to auto increment 
152   *        {@link SkipBadRecords#COUNTER_MAP_PROCESSED_RECORDS}.
153   */
154  public static void setAutoIncrMapperProcCount(Configuration conf, 
155      boolean autoIncr) {
156    conf.setBoolean(AUTO_INCR_MAP_PROC_COUNT, autoIncr);
157  }
158  
159  /**
160   * Get the flag which if set to true, 
161   * {@link SkipBadRecords#COUNTER_REDUCE_PROCESSED_GROUPS} is incremented 
162   * by framework after invoking the reduce function. This value must be set to 
163   * false for applications which process the records asynchronously 
164   * or buffer the input records. For example streaming. 
165   * In such cases applications should increment this counter on their own.
166   * Default value is true.
167   * 
168   * @param conf the configuration
169   * @return <code>true</code> if auto increment 
170   *                    {@link SkipBadRecords#COUNTER_REDUCE_PROCESSED_GROUPS}.
171   *         <code>false</code> otherwise.
172   */
173  public static boolean getAutoIncrReducerProcCount(Configuration conf) {
174    return conf.getBoolean(AUTO_INCR_REDUCE_PROC_COUNT, true);
175  }
176  
177  /**
178   * Set the flag which if set to true, 
179   * {@link SkipBadRecords#COUNTER_REDUCE_PROCESSED_GROUPS} is incremented 
180   * by framework after invoking the reduce function. This value must be set to 
181   * false for applications which process the records asynchronously 
182   * or buffer the input records. For example streaming. 
183   * In such cases applications should increment this counter on their own.
184   * Default value is true.
185   * 
186   * @param conf the configuration
187   * @param autoIncr whether to auto increment 
188   *        {@link SkipBadRecords#COUNTER_REDUCE_PROCESSED_GROUPS}.
189   */
190  public static void setAutoIncrReducerProcCount(Configuration conf, 
191      boolean autoIncr) {
192    conf.setBoolean(AUTO_INCR_REDUCE_PROC_COUNT, autoIncr);
193  }
194  
195  /**
196   * Get the directory to which skipped records are written. By default it is 
197   * the sub directory of the output _logs directory.
198   * User can stop writing skipped records by setting the value null.
199   * 
200   * @param conf the configuration.
201   * @return path skip output directory. Null is returned if this is not set 
202   * and output directory is also not set.
203   */
204  public static Path getSkipOutputPath(Configuration conf) {
205    String name =  conf.get(OUT_PATH);
206    if(name!=null) {
207      if("none".equals(name)) {
208        return null;
209      }
210      return new Path(name);
211    }
212    Path outPath = FileOutputFormat.getOutputPath(new JobConf(conf));
213    return outPath==null ? null : new Path(outPath, 
214        "_logs"+Path.SEPARATOR+"skip");
215  }
216  
217  /**
218   * Set the directory to which skipped records are written. By default it is 
219   * the sub directory of the output _logs directory.
220   * User can stop writing skipped records by setting the value null.
221   * 
222   * @param conf the configuration.
223   * @param path skip output directory path
224   */
225  public static void setSkipOutputPath(JobConf conf, Path path) {
226    String pathStr = null;
227    if(path==null) {
228      pathStr = "none";
229    } else {
230      pathStr = path.toString();
231    }
232    conf.set(OUT_PATH, pathStr);
233  }
234  
235  /**
236   * Get the number of acceptable skip records surrounding the bad record PER 
237   * bad record in mapper. The number includes the bad record as well.
238   * To turn the feature of detection/skipping of bad records off, set the 
239   * value to 0.
240   * The framework tries to narrow down the skipped range by retrying  
241   * until this threshold is met OR all attempts get exhausted for this task. 
242   * Set the value to Long.MAX_VALUE to indicate that framework need not try to 
243   * narrow down. Whatever records(depends on application) get skipped are 
244   * acceptable.
245   * Default value is 0.
246   * 
247   * @param conf the configuration
248   * @return maxSkipRecs acceptable skip records.
249   */
250  public static long getMapperMaxSkipRecords(Configuration conf) {
251    return conf.getLong(MAPPER_MAX_SKIP_RECORDS, 0);
252  }
253  
254  /**
255   * Set the number of acceptable skip records surrounding the bad record PER 
256   * bad record in mapper. The number includes the bad record as well.
257   * To turn the feature of detection/skipping of bad records off, set the 
258   * value to 0.
259   * The framework tries to narrow down the skipped range by retrying  
260   * until this threshold is met OR all attempts get exhausted for this task. 
261   * Set the value to Long.MAX_VALUE to indicate that framework need not try to 
262   * narrow down. Whatever records(depends on application) get skipped are 
263   * acceptable.
264   * Default value is 0.
265   * 
266   * @param conf the configuration
267   * @param maxSkipRecs acceptable skip records.
268   */
269  public static void setMapperMaxSkipRecords(Configuration conf, 
270      long maxSkipRecs) {
271    conf.setLong(MAPPER_MAX_SKIP_RECORDS, maxSkipRecs);
272  }
273  
274  /**
275   * Get the number of acceptable skip groups surrounding the bad group PER 
276   * bad group in reducer. The number includes the bad group as well.
277   * To turn the feature of detection/skipping of bad groups off, set the 
278   * value to 0.
279   * The framework tries to narrow down the skipped range by retrying  
280   * until this threshold is met OR all attempts get exhausted for this task. 
281   * Set the value to Long.MAX_VALUE to indicate that framework need not try to 
282   * narrow down. Whatever groups(depends on application) get skipped are 
283   * acceptable.
284   * Default value is 0.
285   * 
286   * @param conf the configuration
287   * @return maxSkipGrps acceptable skip groups.
288   */
289  public static long getReducerMaxSkipGroups(Configuration conf) {
290    return conf.getLong(REDUCER_MAX_SKIP_GROUPS, 0);
291  }
292  
293  /**
294   * Set the number of acceptable skip groups surrounding the bad group PER 
295   * bad group in reducer. The number includes the bad group as well.
296   * To turn the feature of detection/skipping of bad groups off, set the 
297   * value to 0.
298   * The framework tries to narrow down the skipped range by retrying  
299   * until this threshold is met OR all attempts get exhausted for this task. 
300   * Set the value to Long.MAX_VALUE to indicate that framework need not try to 
301   * narrow down. Whatever groups(depends on application) get skipped are 
302   * acceptable.
303   * Default value is 0.
304   * 
305   * @param conf the configuration
306   * @param maxSkipGrps acceptable skip groups.
307   */
308  public static void setReducerMaxSkipGroups(Configuration conf, 
309      long maxSkipGrps) {
310    conf.setLong(REDUCER_MAX_SKIP_GROUPS, maxSkipGrps);
311  }
312}