001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.mapreduce.lib.output; 020 021import java.io.IOException; 022import java.text.NumberFormat; 023 024import org.apache.hadoop.classification.InterfaceAudience; 025import org.apache.hadoop.classification.InterfaceStability; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.fs.FileSystem; 028import org.apache.hadoop.fs.Path; 029import org.apache.hadoop.io.compress.CompressionCodec; 030import org.apache.hadoop.fs.FileAlreadyExistsException; 031import org.apache.hadoop.mapred.InvalidJobConfException; 032import org.apache.hadoop.mapreduce.Job; 033import org.apache.hadoop.mapreduce.JobContext; 034import org.apache.hadoop.mapreduce.OutputCommitter; 035import org.apache.hadoop.mapreduce.OutputFormat; 036import org.apache.hadoop.mapreduce.RecordWriter; 037import org.apache.hadoop.mapreduce.TaskAttemptContext; 038import org.apache.hadoop.mapreduce.TaskID; 039import org.apache.hadoop.mapreduce.TaskInputOutputContext; 040import org.apache.hadoop.mapreduce.security.TokenCache; 041 042/** A base class for {@link OutputFormat}s that read from {@link FileSystem}s.*/ 043@InterfaceAudience.Public 044@InterfaceStability.Stable 045public abstract class FileOutputFormat<K, V> extends OutputFormat<K, V> { 046 047 /** Construct output file names so that, when an output directory listing is 048 * sorted lexicographically, positions correspond to output partitions.*/ 049 private static final NumberFormat NUMBER_FORMAT = NumberFormat.getInstance(); 050 protected static final String BASE_OUTPUT_NAME = "mapreduce.output.basename"; 051 protected static final String PART = "part"; 052 static { 053 NUMBER_FORMAT.setMinimumIntegerDigits(5); 054 NUMBER_FORMAT.setGroupingUsed(false); 055 } 056 private FileOutputCommitter committer = null; 057public static final String COMPRESS ="mapreduce.output.fileoutputformat.compress"; 058public static final String COMPRESS_CODEC = 059"mapreduce.output.fileoutputformat.compress.codec"; 060public static final String COMPRESS_TYPE = "mapreduce.output.fileoutputformat.compress.type"; 061public static final String OUTDIR = "mapreduce.output.fileoutputformat.outputdir"; 062 063 /** 064 * Set whether the output of the job is compressed. 065 * @param job the job to modify 066 * @param compress should the output of the job be compressed? 067 */ 068 public static void setCompressOutput(Job job, boolean compress) { 069 job.getConfiguration().setBoolean(FileOutputFormat.COMPRESS, compress); 070 } 071 072 /** 073 * Is the job output compressed? 074 * @param job the Job to look in 075 * @return <code>true</code> if the job output should be compressed, 076 * <code>false</code> otherwise 077 */ 078 public static boolean getCompressOutput(JobContext job) { 079 return job.getConfiguration().getBoolean( 080 FileOutputFormat.COMPRESS, false); 081 } 082 083 /** 084 * Set the {@link CompressionCodec} to be used to compress job outputs. 085 * @param job the job to modify 086 * @param codecClass the {@link CompressionCodec} to be used to 087 * compress the job outputs 088 */ 089 public static void 090 setOutputCompressorClass(Job job, 091 Class<? extends CompressionCodec> codecClass) { 092 setCompressOutput(job, true); 093 job.getConfiguration().setClass(FileOutputFormat.COMPRESS_CODEC, 094 codecClass, 095 CompressionCodec.class); 096 } 097 098 /** 099 * Get the {@link CompressionCodec} for compressing the job outputs. 100 * @param job the {@link Job} to look in 101 * @param defaultValue the {@link CompressionCodec} to return if not set 102 * @return the {@link CompressionCodec} to be used to compress the 103 * job outputs 104 * @throws IllegalArgumentException if the class was specified, but not found 105 */ 106 public static Class<? extends CompressionCodec> 107 getOutputCompressorClass(JobContext job, 108 Class<? extends CompressionCodec> defaultValue) { 109 Class<? extends CompressionCodec> codecClass = defaultValue; 110 Configuration conf = job.getConfiguration(); 111 String name = conf.get(FileOutputFormat.COMPRESS_CODEC); 112 if (name != null) { 113 try { 114 codecClass = 115 conf.getClassByName(name).asSubclass(CompressionCodec.class); 116 } catch (ClassNotFoundException e) { 117 throw new IllegalArgumentException("Compression codec " + name + 118 " was not found.", e); 119 } 120 } 121 return codecClass; 122 } 123 124 public abstract RecordWriter<K, V> 125 getRecordWriter(TaskAttemptContext job 126 ) throws IOException, InterruptedException; 127 128 public void checkOutputSpecs(JobContext job 129 ) throws FileAlreadyExistsException, IOException{ 130 // Ensure that the output directory is set and not already there 131 Path outDir = getOutputPath(job); 132 if (outDir == null) { 133 throw new InvalidJobConfException("Output directory not set."); 134 } 135 136 // get delegation token for outDir's file system 137 TokenCache.obtainTokensForNamenodes(job.getCredentials(), 138 new Path[] { outDir }, job.getConfiguration()); 139 140 if (outDir.getFileSystem(job.getConfiguration()).exists(outDir)) { 141 throw new FileAlreadyExistsException("Output directory " + outDir + 142 " already exists"); 143 } 144 } 145 146 /** 147 * Set the {@link Path} of the output directory for the map-reduce job. 148 * 149 * @param job The job to modify 150 * @param outputDir the {@link Path} of the output directory for 151 * the map-reduce job. 152 */ 153 public static void setOutputPath(Job job, Path outputDir) throws IOException { 154 outputDir = outputDir.getFileSystem(job.getConfiguration()).makeQualified( 155 outputDir); 156 job.getConfiguration().set(FileOutputFormat.OUTDIR, outputDir.toString()); 157 } 158 159 /** 160 * Get the {@link Path} to the output directory for the map-reduce job. 161 * 162 * @return the {@link Path} to the output directory for the map-reduce job. 163 * @see FileOutputFormat#getWorkOutputPath(TaskInputOutputContext) 164 */ 165 public static Path getOutputPath(JobContext job) { 166 String name = job.getConfiguration().get(FileOutputFormat.OUTDIR); 167 return name == null ? null: new Path(name); 168 } 169 170 /** 171 * Get the {@link Path} to the task's temporary output directory 172 * for the map-reduce job 173 * 174 * <h4 id="SideEffectFiles">Tasks' Side-Effect Files</h4> 175 * 176 * <p>Some applications need to create/write-to side-files, which differ from 177 * the actual job-outputs. 178 * 179 * <p>In such cases there could be issues with 2 instances of the same TIP 180 * (running simultaneously e.g. speculative tasks) trying to open/write-to the 181 * same file (path) on HDFS. Hence the application-writer will have to pick 182 * unique names per task-attempt (e.g. using the attemptid, say 183 * <tt>attempt_200709221812_0001_m_000000_0</tt>), not just per TIP.</p> 184 * 185 * <p>To get around this the Map-Reduce framework helps the application-writer 186 * out by maintaining a special 187 * <tt>${mapreduce.output.fileoutputformat.outputdir}/_temporary/_${taskid}</tt> 188 * sub-directory for each task-attempt on HDFS where the output of the 189 * task-attempt goes. On successful completion of the task-attempt the files 190 * in the <tt>${mapreduce.output.fileoutputformat.outputdir}/_temporary/_${taskid}</tt> (only) 191 * are <i>promoted</i> to <tt>${mapreduce.output.fileoutputformat.outputdir}</tt>. Of course, the 192 * framework discards the sub-directory of unsuccessful task-attempts. This 193 * is completely transparent to the application.</p> 194 * 195 * <p>The application-writer can take advantage of this by creating any 196 * side-files required in a work directory during execution 197 * of his task i.e. via 198 * {@link #getWorkOutputPath(TaskInputOutputContext)}, and 199 * the framework will move them out similarly - thus she doesn't have to pick 200 * unique paths per task-attempt.</p> 201 * 202 * <p>The entire discussion holds true for maps of jobs with 203 * reducer=NONE (i.e. 0 reduces) since output of the map, in that case, 204 * goes directly to HDFS.</p> 205 * 206 * @return the {@link Path} to the task's temporary output directory 207 * for the map-reduce job. 208 */ 209 public static Path getWorkOutputPath(TaskInputOutputContext<?,?,?,?> context 210 ) throws IOException, 211 InterruptedException { 212 FileOutputCommitter committer = (FileOutputCommitter) 213 context.getOutputCommitter(); 214 return committer.getWorkPath(); 215 } 216 217 /** 218 * Helper function to generate a {@link Path} for a file that is unique for 219 * the task within the job output directory. 220 * 221 * <p>The path can be used to create custom files from within the map and 222 * reduce tasks. The path name will be unique for each task. The path parent 223 * will be the job output directory.</p>ls 224 * 225 * <p>This method uses the {@link #getUniqueFile} method to make the file name 226 * unique for the task.</p> 227 * 228 * @param context the context for the task. 229 * @param name the name for the file. 230 * @param extension the extension for the file 231 * @return a unique path accross all tasks of the job. 232 */ 233 public 234 static Path getPathForWorkFile(TaskInputOutputContext<?,?,?,?> context, 235 String name, 236 String extension 237 ) throws IOException, InterruptedException { 238 return new Path(getWorkOutputPath(context), 239 getUniqueFile(context, name, extension)); 240 } 241 242 /** 243 * Generate a unique filename, based on the task id, name, and extension 244 * @param context the task that is calling this 245 * @param name the base filename 246 * @param extension the filename extension 247 * @return a string like $name-[mrsct]-$id$extension 248 */ 249 public synchronized static String getUniqueFile(TaskAttemptContext context, 250 String name, 251 String extension) { 252 TaskID taskId = context.getTaskAttemptID().getTaskID(); 253 int partition = taskId.getId(); 254 StringBuilder result = new StringBuilder(); 255 result.append(name); 256 result.append('-'); 257 result.append( 258 TaskID.getRepresentingCharacter(taskId.getTaskType())); 259 result.append('-'); 260 result.append(NUMBER_FORMAT.format(partition)); 261 result.append(extension); 262 return result.toString(); 263 } 264 265 /** 266 * Get the default path and filename for the output format. 267 * @param context the task context 268 * @param extension an extension to add to the filename 269 * @return a full path $output/_temporary/$taskid/part-[mr]-$id 270 * @throws IOException 271 */ 272 public Path getDefaultWorkFile(TaskAttemptContext context, 273 String extension) throws IOException{ 274 FileOutputCommitter committer = 275 (FileOutputCommitter) getOutputCommitter(context); 276 return new Path(committer.getWorkPath(), getUniqueFile(context, 277 getOutputName(context), extension)); 278 } 279 280 /** 281 * Get the base output name for the output file. 282 */ 283 protected static String getOutputName(JobContext job) { 284 return job.getConfiguration().get(BASE_OUTPUT_NAME, PART); 285 } 286 287 /** 288 * Set the base output name for output file to be created. 289 */ 290 protected static void setOutputName(JobContext job, String name) { 291 job.getConfiguration().set(BASE_OUTPUT_NAME, name); 292 } 293 294 public synchronized 295 OutputCommitter getOutputCommitter(TaskAttemptContext context 296 ) throws IOException { 297 if (committer == null) { 298 Path output = getOutputPath(context); 299 committer = new FileOutputCommitter(output, context); 300 } 301 return committer; 302 } 303} 304