001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.mapreduce.lib.output; 020 021import java.io.IOException; 022import java.util.Arrays; 023 024import org.apache.hadoop.fs.FileSystem; 025import org.apache.hadoop.fs.Path; 026import org.apache.hadoop.fs.FileUtil; 027 028import org.apache.hadoop.io.MapFile; 029import org.apache.hadoop.io.WritableComparable; 030import org.apache.hadoop.io.Writable; 031import org.apache.hadoop.io.SequenceFile.CompressionType; 032import org.apache.hadoop.io.compress.CompressionCodec; 033import org.apache.hadoop.io.compress.DefaultCodec; 034import org.apache.hadoop.mapreduce.Partitioner; 035import org.apache.hadoop.mapreduce.RecordWriter; 036import org.apache.hadoop.mapreduce.TaskAttemptContext; 037import org.apache.hadoop.util.ReflectionUtils; 038import org.apache.hadoop.classification.InterfaceAudience; 039import org.apache.hadoop.classification.InterfaceStability; 040import org.apache.hadoop.conf.Configuration; 041 042/** 043 * An {@link org.apache.hadoop.mapreduce.OutputFormat} that writes 044 * {@link MapFile}s. 045 */ 046@InterfaceAudience.Public 047@InterfaceStability.Stable 048public class MapFileOutputFormat 049 extends FileOutputFormat<WritableComparable<?>, Writable> { 050 051 public RecordWriter<WritableComparable<?>, Writable> getRecordWriter( 052 TaskAttemptContext context) throws IOException { 053 Configuration conf = context.getConfiguration(); 054 CompressionCodec codec = null; 055 CompressionType compressionType = CompressionType.NONE; 056 if (getCompressOutput(context)) { 057 // find the kind of compression to do 058 compressionType = SequenceFileOutputFormat.getOutputCompressionType(context); 059 060 // find the right codec 061 Class<?> codecClass = getOutputCompressorClass(context, 062 DefaultCodec.class); 063 codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf); 064 } 065 066 Path file = getDefaultWorkFile(context, ""); 067 FileSystem fs = file.getFileSystem(conf); 068 // ignore the progress parameter, since MapFile is local 069 final MapFile.Writer out = 070 new MapFile.Writer(conf, fs, file.toString(), 071 context.getOutputKeyClass().asSubclass(WritableComparable.class), 072 context.getOutputValueClass().asSubclass(Writable.class), 073 compressionType, codec, context); 074 075 return new RecordWriter<WritableComparable<?>, Writable>() { 076 public void write(WritableComparable<?> key, Writable value) 077 throws IOException { 078 out.append(key, value); 079 } 080 081 public void close(TaskAttemptContext context) throws IOException { 082 out.close(); 083 } 084 }; 085 } 086 087 /** Open the output generated by this format. */ 088 public static MapFile.Reader[] getReaders(Path dir, 089 Configuration conf) throws IOException { 090 FileSystem fs = dir.getFileSystem(conf); 091 Path[] names = FileUtil.stat2Paths(fs.listStatus(dir)); 092 093 // sort names, so that hash partitioning works 094 Arrays.sort(names); 095 096 MapFile.Reader[] parts = new MapFile.Reader[names.length]; 097 for (int i = 0; i < names.length; i++) { 098 parts[i] = new MapFile.Reader(fs, names[i].toString(), conf); 099 } 100 return parts; 101 } 102 103 /** Get an entry from output generated by this class. */ 104 public static <K extends WritableComparable<?>, V extends Writable> 105 Writable getEntry(MapFile.Reader[] readers, 106 Partitioner<K, V> partitioner, K key, V value) throws IOException { 107 int part = partitioner.getPartition(key, value, readers.length); 108 return readers[part].get(key, value); 109 } 110} 111