001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.mapreduce.lib.output;
020    
021    import java.io.IOException;
022    
023    import org.apache.commons.logging.Log;
024    import org.apache.commons.logging.LogFactory;
025    import org.apache.hadoop.classification.InterfaceAudience;
026    import org.apache.hadoop.classification.InterfaceAudience.Private;
027    import org.apache.hadoop.classification.InterfaceStability;
028    import org.apache.hadoop.fs.FileStatus;
029    import org.apache.hadoop.fs.FileSystem;
030    import org.apache.hadoop.fs.Path;
031    import org.apache.hadoop.fs.PathFilter;
032    import org.apache.hadoop.mapreduce.JobContext;
033    import org.apache.hadoop.mapreduce.JobStatus;
034    import org.apache.hadoop.mapreduce.MRJobConfig;
035    import org.apache.hadoop.mapreduce.OutputCommitter;
036    import org.apache.hadoop.mapreduce.TaskAttemptContext;
037    import org.apache.hadoop.mapreduce.TaskAttemptID;
038    
039    /** An {@link OutputCommitter} that commits files specified 
040     * in job output directory i.e. ${mapreduce.output.fileoutputformat.outputdir}.
041     **/
042    @InterfaceAudience.Public
043    @InterfaceStability.Stable
044    public class FileOutputCommitter extends OutputCommitter {
045      private static final Log LOG = LogFactory.getLog(FileOutputCommitter.class);
046    
047      /** 
048       * Name of directory where pending data is placed.  Data that has not been
049       * committed yet.
050       */
051      public static final String PENDING_DIR_NAME = "_temporary";
052      /**
053       * Temporary directory name 
054       *
055       * The static variable to be compatible with M/R 1.x
056       */
057      @Deprecated
058      protected static final String TEMP_DIR_NAME = PENDING_DIR_NAME;
059      public static final String SUCCEEDED_FILE_NAME = "_SUCCESS";
060      public static final String SUCCESSFUL_JOB_OUTPUT_DIR_MARKER = 
061        "mapreduce.fileoutputcommitter.marksuccessfuljobs";
062      private Path outputPath = null;
063      private Path workPath = null;
064    
065      /**
066       * Create a file output committer
067       * @param outputPath the job's output path, or null if you want the output
068       * committer to act as a noop.
069       * @param context the task's context
070       * @throws IOException
071       */
072      public FileOutputCommitter(Path outputPath, 
073                                 TaskAttemptContext context) throws IOException {
074        this(outputPath, (JobContext)context);
075        if (outputPath != null) {
076          workPath = getTaskAttemptPath(context, outputPath);
077        }
078      }
079      
080      /**
081       * Create a file output committer
082       * @param outputPath the job's output path, or null if you want the output
083       * committer to act as a noop.
084       * @param context the task's context
085       * @throws IOException
086       */
087      @Private
088      public FileOutputCommitter(Path outputPath, 
089                                 JobContext context) throws IOException {
090        if (outputPath != null) {
091          FileSystem fs = outputPath.getFileSystem(context.getConfiguration());
092          this.outputPath = fs.makeQualified(outputPath);
093        }
094      }
095      
096      /**
097       * @return the path where final output of the job should be placed.  This
098       * could also be considered the committed application attempt path.
099       */
100      private Path getOutputPath() {
101        return this.outputPath;
102      }
103      
104      /**
105       * @return true if we have an output path set, else false.
106       */
107      private boolean hasOutputPath() {
108        return this.outputPath != null;
109      }
110      
111      /**
112       * @return the path where the output of pending job attempts are
113       * stored.
114       */
115      private Path getPendingJobAttemptsPath() {
116        return getPendingJobAttemptsPath(getOutputPath());
117      }
118      
119      /**
120       * Get the location of pending job attempts.
121       * @param out the base output directory.
122       * @return the location of pending job attempts.
123       */
124      private static Path getPendingJobAttemptsPath(Path out) {
125        return new Path(out, PENDING_DIR_NAME);
126      }
127      
128      /**
129       * Get the Application Attempt Id for this job
130       * @param context the context to look in
131       * @return the Application Attempt Id for a given job.
132       */
133      private static int getAppAttemptId(JobContext context) {
134        return context.getConfiguration().getInt(
135            MRJobConfig.APPLICATION_ATTEMPT_ID, 0);
136      }
137      
138      /**
139       * Compute the path where the output of a given job attempt will be placed. 
140       * @param context the context of the job.  This is used to get the
141       * application attempt id.
142       * @return the path to store job attempt data.
143       */
144      public Path getJobAttemptPath(JobContext context) {
145        return getJobAttemptPath(context, getOutputPath());
146      }
147      
148      /**
149       * Compute the path where the output of a given job attempt will be placed. 
150       * @param context the context of the job.  This is used to get the
151       * application attempt id.
152       * @param out the output path to place these in.
153       * @return the path to store job attempt data.
154       */
155      public static Path getJobAttemptPath(JobContext context, Path out) {
156        return getJobAttemptPath(getAppAttemptId(context), out);
157      }
158      
159      /**
160       * Compute the path where the output of a given job attempt will be placed. 
161       * @param appAttemptId the ID of the application attempt for this job.
162       * @return the path to store job attempt data.
163       */
164      protected Path getJobAttemptPath(int appAttemptId) {
165        return getJobAttemptPath(appAttemptId, getOutputPath());
166      }
167      
168      /**
169       * Compute the path where the output of a given job attempt will be placed. 
170       * @param appAttemptId the ID of the application attempt for this job.
171       * @return the path to store job attempt data.
172       */
173      private static Path getJobAttemptPath(int appAttemptId, Path out) {
174        return new Path(getPendingJobAttemptsPath(out), String.valueOf(appAttemptId));
175      }
176      
177      /**
178       * Compute the path where the output of pending task attempts are stored.
179       * @param context the context of the job with pending tasks. 
180       * @return the path where the output of pending task attempts are stored.
181       */
182      private Path getPendingTaskAttemptsPath(JobContext context) {
183        return getPendingTaskAttemptsPath(context, getOutputPath());
184      }
185      
186      /**
187       * Compute the path where the output of pending task attempts are stored.
188       * @param context the context of the job with pending tasks. 
189       * @return the path where the output of pending task attempts are stored.
190       */
191      private static Path getPendingTaskAttemptsPath(JobContext context, Path out) {
192        return new Path(getJobAttemptPath(context, out), PENDING_DIR_NAME);
193      }
194      
195      /**
196       * Compute the path where the output of a task attempt is stored until
197       * that task is committed.
198       * 
199       * @param context the context of the task attempt.
200       * @return the path where a task attempt should be stored.
201       */
202      public Path getTaskAttemptPath(TaskAttemptContext context) {
203        return new Path(getPendingTaskAttemptsPath(context), 
204            String.valueOf(context.getTaskAttemptID()));
205      }
206      
207      /**
208       * Compute the path where the output of a task attempt is stored until
209       * that task is committed.
210       * 
211       * @param context the context of the task attempt.
212       * @param out The output path to put things in.
213       * @return the path where a task attempt should be stored.
214       */
215      public static Path getTaskAttemptPath(TaskAttemptContext context, Path out) {
216        return new Path(getPendingTaskAttemptsPath(context, out), 
217            String.valueOf(context.getTaskAttemptID()));
218      }
219      
220      /**
221       * Compute the path where the output of a committed task is stored until
222       * the entire job is committed.
223       * @param context the context of the task attempt
224       * @return the path where the output of a committed task is stored until
225       * the entire job is committed.
226       */
227      public Path getCommittedTaskPath(TaskAttemptContext context) {
228        return getCommittedTaskPath(getAppAttemptId(context), context);
229      }
230      
231      public static Path getCommittedTaskPath(TaskAttemptContext context, Path out) {
232        return getCommittedTaskPath(getAppAttemptId(context), context, out);
233      }
234      
235      /**
236       * Compute the path where the output of a committed task is stored until the
237       * entire job is committed for a specific application attempt.
238       * @param appAttemptId the id of the application attempt to use
239       * @param context the context of any task.
240       * @return the path where the output of a committed task is stored.
241       */
242      protected Path getCommittedTaskPath(int appAttemptId, TaskAttemptContext context) {
243        return new Path(getJobAttemptPath(appAttemptId),
244            String.valueOf(context.getTaskAttemptID().getTaskID()));
245      }
246      
247      private static Path getCommittedTaskPath(int appAttemptId, TaskAttemptContext context, Path out) {
248        return new Path(getJobAttemptPath(appAttemptId, out),
249            String.valueOf(context.getTaskAttemptID().getTaskID()));
250      }
251      
252      private static class CommittedTaskFilter implements PathFilter {
253        @Override
254        public boolean accept(Path path) {
255          return !PENDING_DIR_NAME.equals(path.getName());
256        }
257      }
258      
259      /**
260       * Get a list of all paths where output from committed tasks are stored.
261       * @param context the context of the current job
262       * @return the list of these Paths/FileStatuses. 
263       * @throws IOException
264       */
265      private FileStatus[] getAllCommittedTaskPaths(JobContext context) 
266        throws IOException {
267        Path jobAttemptPath = getJobAttemptPath(context);
268        FileSystem fs = jobAttemptPath.getFileSystem(context.getConfiguration());
269        return fs.listStatus(jobAttemptPath, new CommittedTaskFilter());
270      }
271      
272      /**
273       * Get the directory that the task should write results into.
274       * @return the work directory
275       * @throws IOException
276       */
277      public Path getWorkPath() throws IOException {
278        return workPath;
279      }
280    
281      /**
282       * Create the temporary directory that is the root of all of the task 
283       * work directories.
284       * @param context the job's context
285       */
286      public void setupJob(JobContext context) throws IOException {
287        if (hasOutputPath()) {
288          Path jobAttemptPath = getJobAttemptPath(context);
289          FileSystem fs = jobAttemptPath.getFileSystem(
290              context.getConfiguration());
291          if (!fs.mkdirs(jobAttemptPath)) {
292            LOG.error("Mkdirs failed to create " + jobAttemptPath);
293          }
294        } else {
295          LOG.warn("Output Path is null in setupJob()");
296        }
297      }
298      
299      /**
300       * The job has completed so move all committed tasks to the final output dir.
301       * Delete the temporary directory, including all of the work directories.
302       * Create a _SUCCESS file to make it as successful.
303       * @param context the job's context
304       */
305      public void commitJob(JobContext context) throws IOException {
306        if (hasOutputPath()) {
307          Path finalOutput = getOutputPath();
308          FileSystem fs = finalOutput.getFileSystem(context.getConfiguration());
309          for(FileStatus stat: getAllCommittedTaskPaths(context)) {
310            mergePaths(fs, stat, finalOutput);
311          }
312    
313          // delete the _temporary folder and create a _done file in the o/p folder
314          cleanupJob(context);
315          // True if the job requires output.dir marked on successful job.
316          // Note that by default it is set to true.
317          if (context.getConfiguration().getBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, true)) {
318            Path markerPath = new Path(outputPath, SUCCEEDED_FILE_NAME);
319            fs.create(markerPath).close();
320          }
321        } else {
322          LOG.warn("Output Path is null in commitJob()");
323        }
324      }
325    
326      /**
327       * Merge two paths together.  Anything in from will be moved into to, if there
328       * are any name conflicts while merging the files or directories in from win.
329       * @param fs the File System to use
330       * @param from the path data is coming from.
331       * @param to the path data is going to.
332       * @throws IOException on any error
333       */
334      private static void mergePaths(FileSystem fs, final FileStatus from,
335          final Path to)
336        throws IOException {
337         LOG.debug("Merging data from "+from+" to "+to);
338         if(from.isFile()) {
339           if(fs.exists(to)) {
340             if(!fs.delete(to, true)) {
341               throw new IOException("Failed to delete "+to);
342             }
343           }
344    
345           if(!fs.rename(from.getPath(), to)) {
346             throw new IOException("Failed to rename "+from+" to "+to);
347           }
348         } else if(from.isDirectory()) {
349           if(fs.exists(to)) {
350             FileStatus toStat = fs.getFileStatus(to);
351             if(!toStat.isDirectory()) {
352               if(!fs.delete(to, true)) {
353                 throw new IOException("Failed to delete "+to);
354               }
355               if(!fs.rename(from.getPath(), to)) {
356                 throw new IOException("Failed to rename "+from+" to "+to);
357               }
358             } else {
359               //It is a directory so merge everything in the directories
360               for(FileStatus subFrom: fs.listStatus(from.getPath())) {
361                 Path subTo = new Path(to, subFrom.getPath().getName());
362                 mergePaths(fs, subFrom, subTo);
363               }
364             }
365           } else {
366             //it does not exist just rename
367             if(!fs.rename(from.getPath(), to)) {
368               throw new IOException("Failed to rename "+from+" to "+to);
369             }
370           }
371         }
372      }
373    
374      @Override
375      @Deprecated
376      public void cleanupJob(JobContext context) throws IOException {
377        if (hasOutputPath()) {
378          Path pendingJobAttemptsPath = getPendingJobAttemptsPath();
379          FileSystem fs = pendingJobAttemptsPath
380              .getFileSystem(context.getConfiguration());
381          fs.delete(pendingJobAttemptsPath, true);
382        } else {
383          LOG.warn("Output Path is null in cleanupJob()");
384        }
385      }
386    
387      /**
388       * Delete the temporary directory, including all of the work directories.
389       * @param context the job's context
390       */
391      @Override
392      public void abortJob(JobContext context, JobStatus.State state) 
393      throws IOException {
394        // delete the _temporary folder
395        cleanupJob(context);
396      }
397      
398      /**
399       * No task setup required.
400       */
401      @Override
402      public void setupTask(TaskAttemptContext context) throws IOException {
403        // FileOutputCommitter's setupTask doesn't do anything. Because the
404        // temporary task directory is created on demand when the 
405        // task is writing.
406      }
407    
408      /**
409       * Move the files from the work directory to the job output directory
410       * @param context the task context
411       */
412      @Override
413      public void commitTask(TaskAttemptContext context) 
414      throws IOException {
415        commitTask(context, null);
416      }
417    
418      @Private
419      public void commitTask(TaskAttemptContext context, Path taskAttemptPath) 
420      throws IOException {
421        TaskAttemptID attemptId = context.getTaskAttemptID();
422        if (hasOutputPath()) {
423          context.progress();
424          if(taskAttemptPath == null) {
425            taskAttemptPath = getTaskAttemptPath(context);
426          }
427          Path committedTaskPath = getCommittedTaskPath(context);
428          FileSystem fs = taskAttemptPath.getFileSystem(context.getConfiguration());
429          if (fs.exists(taskAttemptPath)) {
430            if(fs.exists(committedTaskPath)) {
431              if(!fs.delete(committedTaskPath, true)) {
432                throw new IOException("Could not delete " + committedTaskPath);
433              }
434            }
435            if(!fs.rename(taskAttemptPath, committedTaskPath)) {
436              throw new IOException("Could not rename " + taskAttemptPath + " to "
437                  + committedTaskPath);
438            }
439            LOG.info("Saved output of task '" + attemptId + "' to " + 
440                committedTaskPath);
441          } else {
442            LOG.warn("No Output found for " + attemptId);
443          }
444        } else {
445          LOG.warn("Output Path is null in commitTask()");
446        }
447      }
448    
449      /**
450       * Delete the work directory
451       * @throws IOException 
452       */
453      @Override
454      public void abortTask(TaskAttemptContext context) throws IOException {
455        abortTask(context, null);
456      }
457    
458      @Private
459      public void abortTask(TaskAttemptContext context, Path taskAttemptPath) throws IOException {
460        if (hasOutputPath()) { 
461          context.progress();
462          if(taskAttemptPath == null) {
463            taskAttemptPath = getTaskAttemptPath(context);
464          }
465          FileSystem fs = taskAttemptPath.getFileSystem(context.getConfiguration());
466          if(!fs.delete(taskAttemptPath, true)) {
467            LOG.warn("Could not delete "+taskAttemptPath);
468          }
469        } else {
470          LOG.warn("Output Path is null in abortTask()");
471        }
472      }
473    
474      /**
475       * Did this task write any files in the work directory?
476       * @param context the task's context
477       */
478      @Override
479      public boolean needsTaskCommit(TaskAttemptContext context
480                                     ) throws IOException {
481        return needsTaskCommit(context, null);
482      }
483    
484      @Private
485      public boolean needsTaskCommit(TaskAttemptContext context, Path taskAttemptPath
486        ) throws IOException {
487        if(hasOutputPath()) {
488          if(taskAttemptPath == null) {
489            taskAttemptPath = getTaskAttemptPath(context);
490          }
491          FileSystem fs = taskAttemptPath.getFileSystem(context.getConfiguration());
492          return fs.exists(taskAttemptPath);
493        }
494        return false;
495      }
496    
497      @Override
498      @Deprecated
499      public boolean isRecoverySupported() {
500        return true;
501      }
502      
503      @Override
504      public void recoverTask(TaskAttemptContext context)
505          throws IOException {
506        if(hasOutputPath()) {
507          context.progress();
508          TaskAttemptID attemptId = context.getTaskAttemptID();
509          int previousAttempt = getAppAttemptId(context) - 1;
510          if (previousAttempt < 0) {
511            throw new IOException ("Cannot recover task output for first attempt...");
512          }
513    
514          Path committedTaskPath = getCommittedTaskPath(context);
515          Path previousCommittedTaskPath = getCommittedTaskPath(
516              previousAttempt, context);
517          FileSystem fs = committedTaskPath.getFileSystem(context.getConfiguration());
518    
519          LOG.debug("Trying to recover task from " + previousCommittedTaskPath 
520              + " into " + committedTaskPath);
521          if (fs.exists(previousCommittedTaskPath)) {
522            if(fs.exists(committedTaskPath)) {
523              if(!fs.delete(committedTaskPath, true)) {
524                throw new IOException("Could not delete "+committedTaskPath);
525              }
526            }
527            //Rename can fail if the parent directory does not yet exist.
528            Path committedParent = committedTaskPath.getParent();
529            fs.mkdirs(committedParent);
530            if(!fs.rename(previousCommittedTaskPath, committedTaskPath)) {
531              throw new IOException("Could not rename " + previousCommittedTaskPath +
532                  " to " + committedTaskPath);
533            }
534            LOG.info("Saved output of " + attemptId + " to " + committedTaskPath);
535          } else {
536            LOG.warn(attemptId+" had no output to recover.");
537          }
538        } else {
539          LOG.warn("Output Path is null in recoverTask()");
540        }
541      }
542    }