001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019 package org.apache.hadoop.mapred; 020 021 import java.io.IOException; 022 import java.util.Arrays; 023 024 import org.apache.hadoop.classification.InterfaceAudience; 025 import org.apache.hadoop.classification.InterfaceStability; 026 import org.apache.hadoop.conf.Configuration; 027 import org.apache.hadoop.fs.FileSystem; 028 import org.apache.hadoop.fs.FileUtil; 029 import org.apache.hadoop.fs.Path; 030 import org.apache.hadoop.io.SequenceFile; 031 import org.apache.hadoop.io.SequenceFile.CompressionType; 032 import org.apache.hadoop.io.compress.CompressionCodec; 033 import org.apache.hadoop.io.compress.DefaultCodec; 034 import org.apache.hadoop.util.Progressable; 035 import org.apache.hadoop.util.ReflectionUtils; 036 037 /** 038 * An {@link OutputFormat} that writes {@link SequenceFile}s. 039 */ 040 @InterfaceAudience.Public 041 @InterfaceStability.Stable 042 public class SequenceFileOutputFormat <K,V> extends FileOutputFormat<K, V> { 043 044 public RecordWriter<K, V> getRecordWriter( 045 FileSystem ignored, JobConf job, 046 String name, Progressable progress) 047 throws IOException { 048 // get the path of the temporary output file 049 Path file = FileOutputFormat.getTaskOutputPath(job, name); 050 051 FileSystem fs = file.getFileSystem(job); 052 CompressionCodec codec = null; 053 CompressionType compressionType = CompressionType.NONE; 054 if (getCompressOutput(job)) { 055 // find the kind of compression to do 056 compressionType = getOutputCompressionType(job); 057 058 // find the right codec 059 Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job, 060 DefaultCodec.class); 061 codec = ReflectionUtils.newInstance(codecClass, job); 062 } 063 final SequenceFile.Writer out = 064 SequenceFile.createWriter(fs, job, file, 065 job.getOutputKeyClass(), 066 job.getOutputValueClass(), 067 compressionType, 068 codec, 069 progress); 070 071 return new RecordWriter<K, V>() { 072 073 public void write(K key, V value) 074 throws IOException { 075 076 out.append(key, value); 077 } 078 079 public void close(Reporter reporter) throws IOException { out.close();} 080 }; 081 } 082 083 /** Open the output generated by this format. */ 084 public static SequenceFile.Reader[] getReaders(Configuration conf, Path dir) 085 throws IOException { 086 FileSystem fs = dir.getFileSystem(conf); 087 Path[] names = FileUtil.stat2Paths(fs.listStatus(dir)); 088 089 // sort names, so that hash partitioning works 090 Arrays.sort(names); 091 092 SequenceFile.Reader[] parts = new SequenceFile.Reader[names.length]; 093 for (int i = 0; i < names.length; i++) { 094 parts[i] = new SequenceFile.Reader(fs, names[i], conf); 095 } 096 return parts; 097 } 098 099 /** 100 * Get the {@link CompressionType} for the output {@link SequenceFile}. 101 * @param conf the {@link JobConf} 102 * @return the {@link CompressionType} for the output {@link SequenceFile}, 103 * defaulting to {@link CompressionType#RECORD} 104 */ 105 public static CompressionType getOutputCompressionType(JobConf conf) { 106 String val = conf.get(org.apache.hadoop.mapreduce.lib.output. 107 FileOutputFormat.COMPRESS_TYPE, CompressionType.RECORD.toString()); 108 return CompressionType.valueOf(val); 109 } 110 111 /** 112 * Set the {@link CompressionType} for the output {@link SequenceFile}. 113 * @param conf the {@link JobConf} to modify 114 * @param style the {@link CompressionType} for the output 115 * {@link SequenceFile} 116 */ 117 public static void setOutputCompressionType(JobConf conf, 118 CompressionType style) { 119 setCompressOutput(conf, true); 120 conf.set(org.apache.hadoop.mapreduce.lib.output. 121 FileOutputFormat.COMPRESS_TYPE, style.toString()); 122 } 123 124 } 125