001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.mapred;
020    
021    import java.io.IOException;
022    
023    import org.apache.hadoop.fs.FileSystem;
024    import org.apache.hadoop.fs.Path;
025    
026    import org.apache.hadoop.io.MapFile;
027    import org.apache.hadoop.io.WritableComparable;
028    import org.apache.hadoop.io.Writable;
029    import org.apache.hadoop.io.SequenceFile.CompressionType;
030    import org.apache.hadoop.io.compress.CompressionCodec;
031    import org.apache.hadoop.io.compress.DefaultCodec;
032    import org.apache.hadoop.classification.InterfaceAudience;
033    import org.apache.hadoop.classification.InterfaceStability;
034    import org.apache.hadoop.conf.Configuration;
035    import org.apache.hadoop.util.Progressable;
036    import org.apache.hadoop.util.ReflectionUtils;
037    
038    /** An {@link OutputFormat} that writes {@link MapFile}s.
039     */
040    @InterfaceAudience.Public
041    @InterfaceStability.Stable
042    public class MapFileOutputFormat 
043    extends FileOutputFormat<WritableComparable, Writable> {
044    
045      public RecordWriter<WritableComparable, Writable> getRecordWriter(FileSystem ignored, JobConf job,
046                                          String name, Progressable progress)
047        throws IOException {
048        // get the path of the temporary output file 
049        Path file = FileOutputFormat.getTaskOutputPath(job, name);
050        
051        FileSystem fs = file.getFileSystem(job);
052        CompressionCodec codec = null;
053        CompressionType compressionType = CompressionType.NONE;
054        if (getCompressOutput(job)) {
055          // find the kind of compression to do
056          compressionType = SequenceFileOutputFormat.getOutputCompressionType(job);
057    
058          // find the right codec
059          Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job,
060              DefaultCodec.class);
061          codec = ReflectionUtils.newInstance(codecClass, job);
062        }
063        
064        // ignore the progress parameter, since MapFile is local
065        final MapFile.Writer out =
066          new MapFile.Writer(job, fs, file.toString(),
067                             job.getOutputKeyClass().asSubclass(WritableComparable.class),
068                             job.getOutputValueClass().asSubclass(Writable.class),
069                             compressionType, codec,
070                             progress);
071    
072        return new RecordWriter<WritableComparable, Writable>() {
073    
074            public void write(WritableComparable key, Writable value)
075              throws IOException {
076    
077              out.append(key, value);
078            }
079    
080            public void close(Reporter reporter) throws IOException { out.close();}
081          };
082      }
083    
084      /** Open the output generated by this format. */
085      public static MapFile.Reader[] getReaders(FileSystem ignored, Path dir,
086                                                Configuration conf)
087          throws IOException {
088        return org.apache.hadoop.mapreduce.lib.output.MapFileOutputFormat.
089          getReaders(dir, conf);
090      }
091        
092      /** Get an entry from output generated by this class. */
093      public static <K extends WritableComparable, V extends Writable>
094      Writable getEntry(MapFile.Reader[] readers,
095                                      Partitioner<K, V> partitioner,
096                                      K key,
097                                      V value) throws IOException {
098        int part = partitioner.getPartition(key, value, readers.length);
099        return readers[part].get(key, value);
100      }
101    
102    }
103