001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.hadoop.mapred;
019    
020    import java.io.IOException;
021    
022    import org.apache.hadoop.classification.InterfaceAudience;
023    import org.apache.hadoop.classification.InterfaceStability;
024    import org.apache.hadoop.fs.FileSystem;
025    import org.apache.hadoop.fs.Path;
026    import org.apache.hadoop.io.BytesWritable;
027    import org.apache.hadoop.io.SequenceFile;
028    import org.apache.hadoop.io.SequenceFile.CompressionType;
029    import org.apache.hadoop.io.Writable;
030    import org.apache.hadoop.io.WritableComparable;
031    import org.apache.hadoop.io.compress.CompressionCodec;
032    import org.apache.hadoop.io.compress.DefaultCodec;
033    import org.apache.hadoop.util.Progressable;
034    import org.apache.hadoop.util.ReflectionUtils;
035    
036    /** 
037     * An {@link OutputFormat} that writes keys, values to 
038     * {@link SequenceFile}s in binary(raw) format
039     */
040    @InterfaceAudience.Public
041    @InterfaceStability.Stable
042    public class SequenceFileAsBinaryOutputFormat 
043     extends SequenceFileOutputFormat <BytesWritable,BytesWritable> {
044    
045      /** 
046       * Inner class used for appendRaw
047       */
048      static protected class WritableValueBytes extends org.apache.hadoop.mapreduce
049          .lib.output.SequenceFileAsBinaryOutputFormat.WritableValueBytes {
050        public WritableValueBytes() {
051          super();
052        }
053    
054        public WritableValueBytes(BytesWritable value) {
055          super(value);
056        }
057      }
058    
059      /**
060       * Set the key class for the {@link SequenceFile}
061       * <p>This allows the user to specify the key class to be different 
062       * from the actual class ({@link BytesWritable}) used for writing </p>
063       * 
064       * @param conf the {@link JobConf} to modify
065       * @param theClass the SequenceFile output key class.
066       */
067      static public void setSequenceFileOutputKeyClass(JobConf conf, 
068                                                       Class<?> theClass) {
069        conf.setClass(org.apache.hadoop.mapreduce.lib.output.
070          SequenceFileAsBinaryOutputFormat.KEY_CLASS, theClass, Object.class);
071      }
072    
073      /**
074       * Set the value class for the {@link SequenceFile}
075       * <p>This allows the user to specify the value class to be different 
076       * from the actual class ({@link BytesWritable}) used for writing </p>
077       * 
078       * @param conf the {@link JobConf} to modify
079       * @param theClass the SequenceFile output key class.
080       */
081      static public void setSequenceFileOutputValueClass(JobConf conf, 
082                                                         Class<?> theClass) {
083        conf.setClass(org.apache.hadoop.mapreduce.lib.output.
084          SequenceFileAsBinaryOutputFormat.VALUE_CLASS, theClass, Object.class);
085      }
086    
087      /**
088       * Get the key class for the {@link SequenceFile}
089       * 
090       * @return the key class of the {@link SequenceFile}
091       */
092      static public Class<? extends WritableComparable> getSequenceFileOutputKeyClass(JobConf conf) { 
093        return conf.getClass(org.apache.hadoop.mapreduce.lib.output.
094          SequenceFileAsBinaryOutputFormat.KEY_CLASS, 
095          conf.getOutputKeyClass().asSubclass(WritableComparable.class),
096          WritableComparable.class);
097      }
098    
099      /**
100       * Get the value class for the {@link SequenceFile}
101       * 
102       * @return the value class of the {@link SequenceFile}
103       */
104      static public Class<? extends Writable> getSequenceFileOutputValueClass(JobConf conf) { 
105        return conf.getClass(org.apache.hadoop.mapreduce.lib.output.
106          SequenceFileAsBinaryOutputFormat.VALUE_CLASS, 
107          conf.getOutputValueClass().asSubclass(Writable.class), Writable.class);
108      }
109      
110      @Override 
111      public RecordWriter <BytesWritable, BytesWritable> 
112                 getRecordWriter(FileSystem ignored, JobConf job,
113                                 String name, Progressable progress)
114        throws IOException {
115        // get the path of the temporary output file 
116        Path file = FileOutputFormat.getTaskOutputPath(job, name);
117        
118        FileSystem fs = file.getFileSystem(job);
119        CompressionCodec codec = null;
120        CompressionType compressionType = CompressionType.NONE;
121        if (getCompressOutput(job)) {
122          // find the kind of compression to do
123          compressionType = getOutputCompressionType(job);
124    
125          // find the right codec
126          Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job,
127              DefaultCodec.class);
128          codec = ReflectionUtils.newInstance(codecClass, job);
129        }
130        final SequenceFile.Writer out = 
131          SequenceFile.createWriter(fs, job, file,
132                        getSequenceFileOutputKeyClass(job),
133                        getSequenceFileOutputValueClass(job),
134                        compressionType,
135                        codec,
136                        progress);
137    
138        return new RecordWriter<BytesWritable, BytesWritable>() {
139            
140            private WritableValueBytes wvaluebytes = new WritableValueBytes();
141    
142            public void write(BytesWritable bkey, BytesWritable bvalue)
143              throws IOException {
144    
145              wvaluebytes.reset(bvalue);
146              out.appendRaw(bkey.getBytes(), 0, bkey.getLength(), wvaluebytes);
147              wvaluebytes.reset(null);
148            }
149    
150            public void close(Reporter reporter) throws IOException { 
151              out.close();
152            }
153    
154          };
155    
156      }
157    
158      @Override 
159      public void checkOutputSpecs(FileSystem ignored, JobConf job) 
160                throws IOException {
161        super.checkOutputSpecs(ignored, job);
162        if (getCompressOutput(job) && 
163            getOutputCompressionType(job) == CompressionType.RECORD ){
164            throw new InvalidJobConfException("SequenceFileAsBinaryOutputFormat "
165                        + "doesn't support Record Compression" );
166        }
167    
168      }
169    
170    }