001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.mapreduce.lib.output;
020    
021    import java.io.IOException;
022    import java.util.Arrays;
023    
024    import org.apache.hadoop.fs.FileSystem;
025    import org.apache.hadoop.fs.Path;
026    import org.apache.hadoop.fs.FileUtil;
027    
028    import org.apache.hadoop.io.MapFile;
029    import org.apache.hadoop.io.WritableComparable;
030    import org.apache.hadoop.io.Writable;
031    import org.apache.hadoop.io.SequenceFile.CompressionType;
032    import org.apache.hadoop.io.compress.CompressionCodec;
033    import org.apache.hadoop.io.compress.DefaultCodec;
034    import org.apache.hadoop.mapreduce.Partitioner;
035    import org.apache.hadoop.mapreduce.RecordWriter;
036    import org.apache.hadoop.mapreduce.TaskAttemptContext;
037    import org.apache.hadoop.util.ReflectionUtils;
038    import org.apache.hadoop.classification.InterfaceAudience;
039    import org.apache.hadoop.classification.InterfaceStability;
040    import org.apache.hadoop.conf.Configuration;
041    
042    /** 
043     * An {@link org.apache.hadoop.mapreduce.OutputFormat} that writes 
044     * {@link MapFile}s.
045     */
046    @InterfaceAudience.Public
047    @InterfaceStability.Stable
048    public class MapFileOutputFormat 
049        extends FileOutputFormat<WritableComparable<?>, Writable> {
050    
051      public RecordWriter<WritableComparable<?>, Writable> getRecordWriter(
052          TaskAttemptContext context) throws IOException {
053        Configuration conf = context.getConfiguration();
054        CompressionCodec codec = null;
055        CompressionType compressionType = CompressionType.NONE;
056        if (getCompressOutput(context)) {
057          // find the kind of compression to do
058          compressionType = SequenceFileOutputFormat.getOutputCompressionType(context);
059    
060          // find the right codec
061          Class<?> codecClass = getOutputCompressorClass(context,
062                                      DefaultCodec.class);
063          codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);
064        }
065    
066        Path file = getDefaultWorkFile(context, "");
067        FileSystem fs = file.getFileSystem(conf);
068        // ignore the progress parameter, since MapFile is local
069        final MapFile.Writer out =
070          new MapFile.Writer(conf, fs, file.toString(),
071            context.getOutputKeyClass().asSubclass(WritableComparable.class),
072            context.getOutputValueClass().asSubclass(Writable.class),
073            compressionType, codec, context);
074    
075        return new RecordWriter<WritableComparable<?>, Writable>() {
076            public void write(WritableComparable<?> key, Writable value)
077                throws IOException {
078              out.append(key, value);
079            }
080    
081            public void close(TaskAttemptContext context) throws IOException { 
082              out.close();
083            }
084          };
085      }
086    
087      /** Open the output generated by this format. */
088      public static MapFile.Reader[] getReaders(Path dir,
089          Configuration conf) throws IOException {
090        FileSystem fs = dir.getFileSystem(conf);
091        Path[] names = FileUtil.stat2Paths(fs.listStatus(dir));
092    
093        // sort names, so that hash partitioning works
094        Arrays.sort(names);
095        
096        MapFile.Reader[] parts = new MapFile.Reader[names.length];
097        for (int i = 0; i < names.length; i++) {
098          parts[i] = new MapFile.Reader(fs, names[i].toString(), conf);
099        }
100        return parts;
101      }
102        
103      /** Get an entry from output generated by this class. */
104      public static <K extends WritableComparable<?>, V extends Writable>
105          Writable getEntry(MapFile.Reader[] readers, 
106          Partitioner<K, V> partitioner, K key, V value) throws IOException {
107        int part = partitioner.getPartition(key, value, readers.length);
108        return readers[part].get(key, value);
109      }
110    }
111