001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.mapred;
020
021 import java.io.IOException;
022 import java.util.Arrays;
023
024 import org.apache.hadoop.classification.InterfaceAudience;
025 import org.apache.hadoop.classification.InterfaceStability;
026 import org.apache.hadoop.conf.Configuration;
027 import org.apache.hadoop.fs.FileSystem;
028 import org.apache.hadoop.fs.FileUtil;
029 import org.apache.hadoop.fs.Path;
030 import org.apache.hadoop.io.SequenceFile;
031 import org.apache.hadoop.io.SequenceFile.CompressionType;
032 import org.apache.hadoop.io.compress.CompressionCodec;
033 import org.apache.hadoop.io.compress.DefaultCodec;
034 import org.apache.hadoop.util.Progressable;
035 import org.apache.hadoop.util.ReflectionUtils;
036
037 /**
038 * An {@link OutputFormat} that writes {@link SequenceFile}s.
039 */
040 @InterfaceAudience.Public
041 @InterfaceStability.Stable
042 public class SequenceFileOutputFormat <K,V> extends FileOutputFormat<K, V> {
043
044 public RecordWriter<K, V> getRecordWriter(
045 FileSystem ignored, JobConf job,
046 String name, Progressable progress)
047 throws IOException {
048 // get the path of the temporary output file
049 Path file = FileOutputFormat.getTaskOutputPath(job, name);
050
051 FileSystem fs = file.getFileSystem(job);
052 CompressionCodec codec = null;
053 CompressionType compressionType = CompressionType.NONE;
054 if (getCompressOutput(job)) {
055 // find the kind of compression to do
056 compressionType = getOutputCompressionType(job);
057
058 // find the right codec
059 Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job,
060 DefaultCodec.class);
061 codec = ReflectionUtils.newInstance(codecClass, job);
062 }
063 final SequenceFile.Writer out =
064 SequenceFile.createWriter(fs, job, file,
065 job.getOutputKeyClass(),
066 job.getOutputValueClass(),
067 compressionType,
068 codec,
069 progress);
070
071 return new RecordWriter<K, V>() {
072
073 public void write(K key, V value)
074 throws IOException {
075
076 out.append(key, value);
077 }
078
079 public void close(Reporter reporter) throws IOException { out.close();}
080 };
081 }
082
083 /** Open the output generated by this format. */
084 public static SequenceFile.Reader[] getReaders(Configuration conf, Path dir)
085 throws IOException {
086 FileSystem fs = dir.getFileSystem(conf);
087 Path[] names = FileUtil.stat2Paths(fs.listStatus(dir));
088
089 // sort names, so that hash partitioning works
090 Arrays.sort(names);
091
092 SequenceFile.Reader[] parts = new SequenceFile.Reader[names.length];
093 for (int i = 0; i < names.length; i++) {
094 parts[i] = new SequenceFile.Reader(fs, names[i], conf);
095 }
096 return parts;
097 }
098
099 /**
100 * Get the {@link CompressionType} for the output {@link SequenceFile}.
101 * @param conf the {@link JobConf}
102 * @return the {@link CompressionType} for the output {@link SequenceFile},
103 * defaulting to {@link CompressionType#RECORD}
104 */
105 public static CompressionType getOutputCompressionType(JobConf conf) {
106 String val = conf.get(org.apache.hadoop.mapreduce.lib.output.
107 FileOutputFormat.COMPRESS_TYPE, CompressionType.RECORD.toString());
108 return CompressionType.valueOf(val);
109 }
110
111 /**
112 * Set the {@link CompressionType} for the output {@link SequenceFile}.
113 * @param conf the {@link JobConf} to modify
114 * @param style the {@link CompressionType} for the output
115 * {@link SequenceFile}
116 */
117 public static void setOutputCompressionType(JobConf conf,
118 CompressionType style) {
119 setCompressOutput(conf, true);
120 conf.set(org.apache.hadoop.mapreduce.lib.output.
121 FileOutputFormat.COMPRESS_TYPE, style.toString());
122 }
123
124 }
125