001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.mapred;
020
021import java.io.IOException;
022import java.util.Arrays;
023
024import org.apache.hadoop.classification.InterfaceAudience;
025import org.apache.hadoop.classification.InterfaceStability;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.fs.FileSystem;
028import org.apache.hadoop.fs.FileUtil;
029import org.apache.hadoop.fs.Path;
030import org.apache.hadoop.io.SequenceFile;
031import org.apache.hadoop.io.SequenceFile.CompressionType;
032import org.apache.hadoop.io.compress.CompressionCodec;
033import org.apache.hadoop.io.compress.DefaultCodec;
034import org.apache.hadoop.util.Progressable;
035import org.apache.hadoop.util.ReflectionUtils;
036
037/** 
038 * An {@link OutputFormat} that writes {@link SequenceFile}s. 
039 */
040@InterfaceAudience.Public
041@InterfaceStability.Stable
042public class SequenceFileOutputFormat <K,V> extends FileOutputFormat<K, V> {
043
044  public RecordWriter<K, V> getRecordWriter(
045                                          FileSystem ignored, JobConf job,
046                                          String name, Progressable progress)
047    throws IOException {
048    // get the path of the temporary output file 
049    Path file = FileOutputFormat.getTaskOutputPath(job, name);
050    
051    FileSystem fs = file.getFileSystem(job);
052    CompressionCodec codec = null;
053    CompressionType compressionType = CompressionType.NONE;
054    if (getCompressOutput(job)) {
055      // find the kind of compression to do
056      compressionType = getOutputCompressionType(job);
057
058      // find the right codec
059      Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job,
060          DefaultCodec.class);
061      codec = ReflectionUtils.newInstance(codecClass, job);
062    }
063    final SequenceFile.Writer out = 
064      SequenceFile.createWriter(fs, job, file,
065                                job.getOutputKeyClass(),
066                                job.getOutputValueClass(),
067                                compressionType,
068                                codec,
069                                progress);
070
071    return new RecordWriter<K, V>() {
072
073        public void write(K key, V value)
074          throws IOException {
075
076          out.append(key, value);
077        }
078
079        public void close(Reporter reporter) throws IOException { out.close();}
080      };
081  }
082
083  /** Open the output generated by this format. */
084  public static SequenceFile.Reader[] getReaders(Configuration conf, Path dir)
085    throws IOException {
086    FileSystem fs = dir.getFileSystem(conf);
087    Path[] names = FileUtil.stat2Paths(fs.listStatus(dir));
088    
089    // sort names, so that hash partitioning works
090    Arrays.sort(names);
091    
092    SequenceFile.Reader[] parts = new SequenceFile.Reader[names.length];
093    for (int i = 0; i < names.length; i++) {
094      parts[i] = new SequenceFile.Reader(fs, names[i], conf);
095    }
096    return parts;
097  }
098
099  /**
100   * Get the {@link CompressionType} for the output {@link SequenceFile}.
101   * @param conf the {@link JobConf}
102   * @return the {@link CompressionType} for the output {@link SequenceFile}, 
103   *         defaulting to {@link CompressionType#RECORD}
104   */
105  public static CompressionType getOutputCompressionType(JobConf conf) {
106    String val = conf.get(org.apache.hadoop.mapreduce.lib.output.
107      FileOutputFormat.COMPRESS_TYPE, CompressionType.RECORD.toString());
108    return CompressionType.valueOf(val);
109  }
110  
111  /**
112   * Set the {@link CompressionType} for the output {@link SequenceFile}.
113   * @param conf the {@link JobConf} to modify
114   * @param style the {@link CompressionType} for the output
115   *              {@link SequenceFile} 
116   */
117  public static void setOutputCompressionType(JobConf conf, 
118                                                          CompressionType style) {
119    setCompressOutput(conf, true);
120    conf.set(org.apache.hadoop.mapreduce.lib.output.
121      FileOutputFormat.COMPRESS_TYPE, style.toString());
122  }
123
124}
125