001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.mapred;
020    
021    import org.apache.hadoop.classification.InterfaceAudience;
022    import org.apache.hadoop.classification.InterfaceStability;
023    import org.apache.hadoop.conf.Configuration;
024    import org.apache.hadoop.fs.Path;
025    
026    /**
027     * Utility class for skip bad records functionality. It contains various 
028     * settings related to skipping of bad records.
029     * 
030     * <p>Hadoop provides an optional mode of execution in which the bad records
031     * are detected and skipped in further attempts.
032     * 
033     * <p>This feature can be used when map/reduce tasks crashes deterministically on 
034     * certain input. This happens due to bugs in the map/reduce function. The usual
035     * course would be to fix these bugs. But sometimes this is not possible; 
036     * perhaps the bug is in third party libraries for which the source code is 
037     * not available. Due to this, the task never reaches to completion even with 
038     * multiple attempts and complete data for that task is lost.</p>
039     *  
040     * <p>With this feature, only a small portion of data is lost surrounding 
041     * the bad record, which may be acceptable for some user applications.
042     * see {@link SkipBadRecords#setMapperMaxSkipRecords(Configuration, long)}</p>
043     * 
044     * <p>The skipping mode gets kicked off after certain no of failures 
045     * see {@link SkipBadRecords#setAttemptsToStartSkipping(Configuration, int)}</p>
046     *  
047     * <p>In the skipping mode, the map/reduce task maintains the record range which 
048     * is getting processed at all times. Before giving the input to the
049     * map/reduce function, it sends this record range to the Task tracker.
050     * If task crashes, the Task tracker knows which one was the last reported
051     * range. On further attempts that range get skipped.</p>
052     */
053    @InterfaceAudience.Public
054    @InterfaceStability.Stable
055    public class SkipBadRecords {
056      
057      /**
058       * Special counters which are written by the application and are 
059       * used by the framework for detecting bad records. For detecting bad records 
060       * these counters must be incremented by the application.
061       */
062      public static final String COUNTER_GROUP = "SkippingTaskCounters";
063      
064      /**
065       * Number of processed map records.
066       * @see SkipBadRecords#getAutoIncrMapperProcCount(Configuration)
067       */
068      public static final String COUNTER_MAP_PROCESSED_RECORDS = 
069        "MapProcessedRecords";
070      
071      /**
072       * Number of processed reduce groups.
073       * @see SkipBadRecords#getAutoIncrReducerProcCount(Configuration)
074       */
075      public static final String COUNTER_REDUCE_PROCESSED_GROUPS = 
076        "ReduceProcessedGroups";
077      
078      private static final String ATTEMPTS_TO_START_SKIPPING = 
079        JobContext.SKIP_START_ATTEMPTS;
080      private static final String AUTO_INCR_MAP_PROC_COUNT = 
081        JobContext.MAP_SKIP_INCR_PROC_COUNT;
082      private static final String AUTO_INCR_REDUCE_PROC_COUNT = 
083        JobContext.REDUCE_SKIP_INCR_PROC_COUNT;
084      private static final String OUT_PATH = JobContext.SKIP_OUTDIR;
085      private static final String MAPPER_MAX_SKIP_RECORDS = 
086        JobContext.MAP_SKIP_MAX_RECORDS;
087      private static final String REDUCER_MAX_SKIP_GROUPS = 
088        JobContext.REDUCE_SKIP_MAXGROUPS;
089      
090      /**
091       * Get the number of Task attempts AFTER which skip mode 
092       * will be kicked off. When skip mode is kicked off, the 
093       * tasks reports the range of records which it will process 
094       * next to the TaskTracker. So that on failures, TT knows which 
095       * ones are possibly the bad records. On further executions, 
096       * those are skipped.
097       * Default value is 2.
098       * 
099       * @param conf the configuration
100       * @return attemptsToStartSkipping no of task attempts
101       */
102      public static int getAttemptsToStartSkipping(Configuration conf) {
103        return conf.getInt(ATTEMPTS_TO_START_SKIPPING, 2);
104      }
105    
106      /**
107       * Set the number of Task attempts AFTER which skip mode 
108       * will be kicked off. When skip mode is kicked off, the 
109       * tasks reports the range of records which it will process 
110       * next to the TaskTracker. So that on failures, TT knows which 
111       * ones are possibly the bad records. On further executions, 
112       * those are skipped.
113       * Default value is 2.
114       * 
115       * @param conf the configuration
116       * @param attemptsToStartSkipping no of task attempts
117       */
118      public static void setAttemptsToStartSkipping(Configuration conf, 
119          int attemptsToStartSkipping) {
120        conf.setInt(ATTEMPTS_TO_START_SKIPPING, attemptsToStartSkipping);
121      }
122    
123      /**
124       * Get the flag which if set to true, 
125       * {@link SkipBadRecords#COUNTER_MAP_PROCESSED_RECORDS} is incremented 
126       * by MapRunner after invoking the map function. This value must be set to 
127       * false for applications which process the records asynchronously 
128       * or buffer the input records. For example streaming. 
129       * In such cases applications should increment this counter on their own.
130       * Default value is true.
131       * 
132       * @param conf the configuration
133       * @return <code>true</code> if auto increment 
134       *                       {@link SkipBadRecords#COUNTER_MAP_PROCESSED_RECORDS}.
135       *         <code>false</code> otherwise.
136       */
137      public static boolean getAutoIncrMapperProcCount(Configuration conf) {
138        return conf.getBoolean(AUTO_INCR_MAP_PROC_COUNT, true);
139      }
140      
141      /**
142       * Set the flag which if set to true, 
143       * {@link SkipBadRecords#COUNTER_MAP_PROCESSED_RECORDS} is incremented 
144       * by MapRunner after invoking the map function. This value must be set to 
145       * false for applications which process the records asynchronously 
146       * or buffer the input records. For example streaming. 
147       * In such cases applications should increment this counter on their own.
148       * Default value is true.
149       * 
150       * @param conf the configuration
151       * @param autoIncr whether to auto increment 
152       *        {@link SkipBadRecords#COUNTER_MAP_PROCESSED_RECORDS}.
153       */
154      public static void setAutoIncrMapperProcCount(Configuration conf, 
155          boolean autoIncr) {
156        conf.setBoolean(AUTO_INCR_MAP_PROC_COUNT, autoIncr);
157      }
158      
159      /**
160       * Get the flag which if set to true, 
161       * {@link SkipBadRecords#COUNTER_REDUCE_PROCESSED_GROUPS} is incremented 
162       * by framework after invoking the reduce function. This value must be set to 
163       * false for applications which process the records asynchronously 
164       * or buffer the input records. For example streaming. 
165       * In such cases applications should increment this counter on their own.
166       * Default value is true.
167       * 
168       * @param conf the configuration
169       * @return <code>true</code> if auto increment 
170       *                    {@link SkipBadRecords#COUNTER_REDUCE_PROCESSED_GROUPS}.
171       *         <code>false</code> otherwise.
172       */
173      public static boolean getAutoIncrReducerProcCount(Configuration conf) {
174        return conf.getBoolean(AUTO_INCR_REDUCE_PROC_COUNT, true);
175      }
176      
177      /**
178       * Set the flag which if set to true, 
179       * {@link SkipBadRecords#COUNTER_REDUCE_PROCESSED_GROUPS} is incremented 
180       * by framework after invoking the reduce function. This value must be set to 
181       * false for applications which process the records asynchronously 
182       * or buffer the input records. For example streaming. 
183       * In such cases applications should increment this counter on their own.
184       * Default value is true.
185       * 
186       * @param conf the configuration
187       * @param autoIncr whether to auto increment 
188       *        {@link SkipBadRecords#COUNTER_REDUCE_PROCESSED_GROUPS}.
189       */
190      public static void setAutoIncrReducerProcCount(Configuration conf, 
191          boolean autoIncr) {
192        conf.setBoolean(AUTO_INCR_REDUCE_PROC_COUNT, autoIncr);
193      }
194      
195      /**
196       * Get the directory to which skipped records are written. By default it is 
197       * the sub directory of the output _logs directory.
198       * User can stop writing skipped records by setting the value null.
199       * 
200       * @param conf the configuration.
201       * @return path skip output directory. Null is returned if this is not set 
202       * and output directory is also not set.
203       */
204      public static Path getSkipOutputPath(Configuration conf) {
205        String name =  conf.get(OUT_PATH);
206        if(name!=null) {
207          if("none".equals(name)) {
208            return null;
209          }
210          return new Path(name);
211        }
212        Path outPath = FileOutputFormat.getOutputPath(new JobConf(conf));
213        return outPath==null ? null : new Path(outPath, 
214            "_logs"+Path.SEPARATOR+"skip");
215      }
216      
217      /**
218       * Set the directory to which skipped records are written. By default it is 
219       * the sub directory of the output _logs directory.
220       * User can stop writing skipped records by setting the value null.
221       * 
222       * @param conf the configuration.
223       * @param path skip output directory path
224       */
225      public static void setSkipOutputPath(JobConf conf, Path path) {
226        String pathStr = null;
227        if(path==null) {
228          pathStr = "none";
229        } else {
230          pathStr = path.toString();
231        }
232        conf.set(OUT_PATH, pathStr);
233      }
234      
235      /**
236       * Get the number of acceptable skip records surrounding the bad record PER 
237       * bad record in mapper. The number includes the bad record as well.
238       * To turn the feature of detection/skipping of bad records off, set the 
239       * value to 0.
240       * The framework tries to narrow down the skipped range by retrying  
241       * until this threshold is met OR all attempts get exhausted for this task. 
242       * Set the value to Long.MAX_VALUE to indicate that framework need not try to 
243       * narrow down. Whatever records(depends on application) get skipped are 
244       * acceptable.
245       * Default value is 0.
246       * 
247       * @param conf the configuration
248       * @return maxSkipRecs acceptable skip records.
249       */
250      public static long getMapperMaxSkipRecords(Configuration conf) {
251        return conf.getLong(MAPPER_MAX_SKIP_RECORDS, 0);
252      }
253      
254      /**
255       * Set the number of acceptable skip records surrounding the bad record PER 
256       * bad record in mapper. The number includes the bad record as well.
257       * To turn the feature of detection/skipping of bad records off, set the 
258       * value to 0.
259       * The framework tries to narrow down the skipped range by retrying  
260       * until this threshold is met OR all attempts get exhausted for this task. 
261       * Set the value to Long.MAX_VALUE to indicate that framework need not try to 
262       * narrow down. Whatever records(depends on application) get skipped are 
263       * acceptable.
264       * Default value is 0.
265       * 
266       * @param conf the configuration
267       * @param maxSkipRecs acceptable skip records.
268       */
269      public static void setMapperMaxSkipRecords(Configuration conf, 
270          long maxSkipRecs) {
271        conf.setLong(MAPPER_MAX_SKIP_RECORDS, maxSkipRecs);
272      }
273      
274      /**
275       * Get the number of acceptable skip groups surrounding the bad group PER 
276       * bad group in reducer. The number includes the bad group as well.
277       * To turn the feature of detection/skipping of bad groups off, set the 
278       * value to 0.
279       * The framework tries to narrow down the skipped range by retrying  
280       * until this threshold is met OR all attempts get exhausted for this task. 
281       * Set the value to Long.MAX_VALUE to indicate that framework need not try to 
282       * narrow down. Whatever groups(depends on application) get skipped are 
283       * acceptable.
284       * Default value is 0.
285       * 
286       * @param conf the configuration
287       * @return maxSkipGrps acceptable skip groups.
288       */
289      public static long getReducerMaxSkipGroups(Configuration conf) {
290        return conf.getLong(REDUCER_MAX_SKIP_GROUPS, 0);
291      }
292      
293      /**
294       * Set the number of acceptable skip groups surrounding the bad group PER 
295       * bad group in reducer. The number includes the bad group as well.
296       * To turn the feature of detection/skipping of bad groups off, set the 
297       * value to 0.
298       * The framework tries to narrow down the skipped range by retrying  
299       * until this threshold is met OR all attempts get exhausted for this task. 
300       * Set the value to Long.MAX_VALUE to indicate that framework need not try to 
301       * narrow down. Whatever groups(depends on application) get skipped are 
302       * acceptable.
303       * Default value is 0.
304       * 
305       * @param conf the configuration
306       * @param maxSkipGrps acceptable skip groups.
307       */
308      public static void setReducerMaxSkipGroups(Configuration conf, 
309          long maxSkipGrps) {
310        conf.setLong(REDUCER_MAX_SKIP_GROUPS, maxSkipGrps);
311      }
312    }