001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.mapred; 020 021import java.io.IOException; 022 023import org.apache.hadoop.fs.FileSystem; 024import org.apache.hadoop.fs.Path; 025 026import org.apache.hadoop.io.MapFile; 027import org.apache.hadoop.io.WritableComparable; 028import org.apache.hadoop.io.Writable; 029import org.apache.hadoop.io.SequenceFile.CompressionType; 030import org.apache.hadoop.io.compress.CompressionCodec; 031import org.apache.hadoop.io.compress.DefaultCodec; 032import org.apache.hadoop.classification.InterfaceAudience; 033import org.apache.hadoop.classification.InterfaceStability; 034import org.apache.hadoop.conf.Configuration; 035import org.apache.hadoop.util.Progressable; 036import org.apache.hadoop.util.ReflectionUtils; 037 038/** An {@link OutputFormat} that writes {@link MapFile}s. 039 */ 040@InterfaceAudience.Public 041@InterfaceStability.Stable 042public class MapFileOutputFormat 043extends FileOutputFormat<WritableComparable, Writable> { 044 045 public RecordWriter<WritableComparable, Writable> getRecordWriter(FileSystem ignored, JobConf job, 046 String name, Progressable progress) 047 throws IOException { 048 // get the path of the temporary output file 049 Path file = FileOutputFormat.getTaskOutputPath(job, name); 050 051 FileSystem fs = file.getFileSystem(job); 052 CompressionCodec codec = null; 053 CompressionType compressionType = CompressionType.NONE; 054 if (getCompressOutput(job)) { 055 // find the kind of compression to do 056 compressionType = SequenceFileOutputFormat.getOutputCompressionType(job); 057 058 // find the right codec 059 Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job, 060 DefaultCodec.class); 061 codec = ReflectionUtils.newInstance(codecClass, job); 062 } 063 064 // ignore the progress parameter, since MapFile is local 065 final MapFile.Writer out = 066 new MapFile.Writer(job, fs, file.toString(), 067 job.getOutputKeyClass().asSubclass(WritableComparable.class), 068 job.getOutputValueClass().asSubclass(Writable.class), 069 compressionType, codec, 070 progress); 071 072 return new RecordWriter<WritableComparable, Writable>() { 073 074 public void write(WritableComparable key, Writable value) 075 throws IOException { 076 077 out.append(key, value); 078 } 079 080 public void close(Reporter reporter) throws IOException { out.close();} 081 }; 082 } 083 084 /** Open the output generated by this format. */ 085 public static MapFile.Reader[] getReaders(FileSystem ignored, Path dir, 086 Configuration conf) 087 throws IOException { 088 return org.apache.hadoop.mapreduce.lib.output.MapFileOutputFormat. 089 getReaders(dir, conf); 090 } 091 092 /** Get an entry from output generated by this class. */ 093 public static <K extends WritableComparable, V extends Writable> 094 Writable getEntry(MapFile.Reader[] readers, 095 Partitioner<K, V> partitioner, 096 K key, 097 V value) throws IOException { 098 int part = partitioner.getPartition(key, value, readers.length); 099 return readers[part].get(key, value); 100 } 101 102} 103