001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.mapred;
020    
021    import java.io.IOException;
022    import java.util.Arrays;
023    
024    import org.apache.hadoop.classification.InterfaceAudience;
025    import org.apache.hadoop.classification.InterfaceStability;
026    import org.apache.hadoop.conf.Configuration;
027    import org.apache.hadoop.fs.FileSystem;
028    import org.apache.hadoop.fs.FileUtil;
029    import org.apache.hadoop.fs.Path;
030    import org.apache.hadoop.io.SequenceFile;
031    import org.apache.hadoop.io.SequenceFile.CompressionType;
032    import org.apache.hadoop.io.compress.CompressionCodec;
033    import org.apache.hadoop.io.compress.DefaultCodec;
034    import org.apache.hadoop.util.Progressable;
035    import org.apache.hadoop.util.ReflectionUtils;
036    
037    /** 
038     * An {@link OutputFormat} that writes {@link SequenceFile}s. 
039     */
040    @InterfaceAudience.Public
041    @InterfaceStability.Stable
042    public class SequenceFileOutputFormat <K,V> extends FileOutputFormat<K, V> {
043    
044      public RecordWriter<K, V> getRecordWriter(
045                                              FileSystem ignored, JobConf job,
046                                              String name, Progressable progress)
047        throws IOException {
048        // get the path of the temporary output file 
049        Path file = FileOutputFormat.getTaskOutputPath(job, name);
050        
051        FileSystem fs = file.getFileSystem(job);
052        CompressionCodec codec = null;
053        CompressionType compressionType = CompressionType.NONE;
054        if (getCompressOutput(job)) {
055          // find the kind of compression to do
056          compressionType = getOutputCompressionType(job);
057    
058          // find the right codec
059          Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job,
060              DefaultCodec.class);
061          codec = ReflectionUtils.newInstance(codecClass, job);
062        }
063        final SequenceFile.Writer out = 
064          SequenceFile.createWriter(fs, job, file,
065                                    job.getOutputKeyClass(),
066                                    job.getOutputValueClass(),
067                                    compressionType,
068                                    codec,
069                                    progress);
070    
071        return new RecordWriter<K, V>() {
072    
073            public void write(K key, V value)
074              throws IOException {
075    
076              out.append(key, value);
077            }
078    
079            public void close(Reporter reporter) throws IOException { out.close();}
080          };
081      }
082    
083      /** Open the output generated by this format. */
084      public static SequenceFile.Reader[] getReaders(Configuration conf, Path dir)
085        throws IOException {
086        FileSystem fs = dir.getFileSystem(conf);
087        Path[] names = FileUtil.stat2Paths(fs.listStatus(dir));
088        
089        // sort names, so that hash partitioning works
090        Arrays.sort(names);
091        
092        SequenceFile.Reader[] parts = new SequenceFile.Reader[names.length];
093        for (int i = 0; i < names.length; i++) {
094          parts[i] = new SequenceFile.Reader(fs, names[i], conf);
095        }
096        return parts;
097      }
098    
099      /**
100       * Get the {@link CompressionType} for the output {@link SequenceFile}.
101       * @param conf the {@link JobConf}
102       * @return the {@link CompressionType} for the output {@link SequenceFile}, 
103       *         defaulting to {@link CompressionType#RECORD}
104       */
105      public static CompressionType getOutputCompressionType(JobConf conf) {
106        String val = conf.get(org.apache.hadoop.mapreduce.lib.output.
107          FileOutputFormat.COMPRESS_TYPE, CompressionType.RECORD.toString());
108        return CompressionType.valueOf(val);
109      }
110      
111      /**
112       * Set the {@link CompressionType} for the output {@link SequenceFile}.
113       * @param conf the {@link JobConf} to modify
114       * @param style the {@link CompressionType} for the output
115       *              {@link SequenceFile} 
116       */
117      public static void setOutputCompressionType(JobConf conf, 
118                                                              CompressionType style) {
119        setCompressOutput(conf, true);
120        conf.set(org.apache.hadoop.mapreduce.lib.output.
121          FileOutputFormat.COMPRESS_TYPE, style.toString());
122      }
123    
124    }
125