001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.mapreduce.lib.output;
020
021import java.io.IOException;
022
023import org.apache.hadoop.fs.FileSystem;
024import org.apache.hadoop.fs.Path;
025
026import org.apache.hadoop.io.SequenceFile;
027import org.apache.hadoop.io.SequenceFile.CompressionType;
028import org.apache.hadoop.io.compress.CompressionCodec;
029import org.apache.hadoop.io.compress.DefaultCodec;
030import org.apache.hadoop.mapreduce.Job;
031import org.apache.hadoop.mapreduce.JobContext;
032import org.apache.hadoop.mapreduce.OutputFormat;
033import org.apache.hadoop.mapreduce.RecordWriter;
034import org.apache.hadoop.mapreduce.TaskAttemptContext;
035import org.apache.hadoop.util.ReflectionUtils;
036import org.apache.hadoop.classification.InterfaceAudience;
037import org.apache.hadoop.classification.InterfaceStability;
038import org.apache.hadoop.conf.Configuration;
039
040/** An {@link OutputFormat} that writes {@link SequenceFile}s. */
041@InterfaceAudience.Public
042@InterfaceStability.Stable
043public class SequenceFileOutputFormat <K,V> extends FileOutputFormat<K, V> {
044
045  protected SequenceFile.Writer getSequenceWriter(TaskAttemptContext context,
046      Class<?> keyClass, Class<?> valueClass) 
047      throws IOException {
048    Configuration conf = context.getConfiguration();
049            
050    CompressionCodec codec = null;
051    CompressionType compressionType = CompressionType.NONE;
052    if (getCompressOutput(context)) {
053      // find the kind of compression to do
054      compressionType = getOutputCompressionType(context);
055      // find the right codec
056      Class<?> codecClass = getOutputCompressorClass(context, 
057                                                     DefaultCodec.class);
058      codec = (CompressionCodec) 
059        ReflectionUtils.newInstance(codecClass, conf);
060    }
061    // get the path of the temporary output file 
062    Path file = getDefaultWorkFile(context, "");
063    FileSystem fs = file.getFileSystem(conf);
064    return SequenceFile.createWriter(fs, conf, file,
065             keyClass,
066             valueClass,
067             compressionType,
068             codec,
069             context);
070  }
071  
072  public RecordWriter<K, V> 
073         getRecordWriter(TaskAttemptContext context
074                         ) throws IOException, InterruptedException {
075    final SequenceFile.Writer out = getSequenceWriter(context,
076      context.getOutputKeyClass(), context.getOutputValueClass());
077
078    return new RecordWriter<K, V>() {
079
080        public void write(K key, V value)
081          throws IOException {
082
083          out.append(key, value);
084        }
085
086        public void close(TaskAttemptContext context) throws IOException { 
087          out.close();
088        }
089      };
090  }
091
092  /**
093   * Get the {@link CompressionType} for the output {@link SequenceFile}.
094   * @param job the {@link Job}
095   * @return the {@link CompressionType} for the output {@link SequenceFile}, 
096   *         defaulting to {@link CompressionType#RECORD}
097   */
098  public static CompressionType getOutputCompressionType(JobContext job) {
099    String val = job.getConfiguration().get(FileOutputFormat.COMPRESS_TYPE, 
100                                            CompressionType.RECORD.toString());
101    return CompressionType.valueOf(val);
102  }
103  
104  /**
105   * Set the {@link CompressionType} for the output {@link SequenceFile}.
106   * @param job the {@link Job} to modify
107   * @param style the {@link CompressionType} for the output
108   *              {@link SequenceFile} 
109   */
110  public static void setOutputCompressionType(Job job, 
111                                                          CompressionType style) {
112    setCompressOutput(job, true);
113    job.getConfiguration().set(FileOutputFormat.COMPRESS_TYPE, 
114                               style.toString());
115  }
116
117}
118