001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.hadoop.mapreduce.lib.output;
019    
020    import java.io.IOException;
021    import java.io.DataOutputStream;
022    
023    import org.apache.hadoop.classification.InterfaceAudience;
024    import org.apache.hadoop.classification.InterfaceStability;
025    import org.apache.hadoop.io.WritableComparable;
026    import org.apache.hadoop.io.Writable;
027    import org.apache.hadoop.io.BytesWritable;
028    import org.apache.hadoop.io.SequenceFile;
029    import org.apache.hadoop.io.SequenceFile.CompressionType;
030    import org.apache.hadoop.io.SequenceFile.ValueBytes;
031    import org.apache.hadoop.mapred.InvalidJobConfException;
032    import org.apache.hadoop.mapreduce.Job;
033    import org.apache.hadoop.mapreduce.JobContext;
034    import org.apache.hadoop.mapreduce.RecordWriter;
035    import org.apache.hadoop.mapreduce.TaskAttemptContext;
036    
037    /** 
038     * An {@link org.apache.hadoop.mapreduce.OutputFormat} that writes keys, 
039     * values to {@link SequenceFile}s in binary(raw) format
040     */
041    @InterfaceAudience.Public
042    @InterfaceStability.Stable
043    public class SequenceFileAsBinaryOutputFormat 
044        extends SequenceFileOutputFormat <BytesWritable,BytesWritable> {
045      public static String KEY_CLASS = "mapreduce.output.seqbinaryoutputformat.key.class"; 
046      public static String VALUE_CLASS = "mapreduce.output.seqbinaryoutputformat.value.class"; 
047    
048      /** 
049       * Inner class used for appendRaw
050       */
051      static public class WritableValueBytes implements ValueBytes {
052        private BytesWritable value;
053    
054        public WritableValueBytes() {
055          this.value = null;
056        }
057        
058        public WritableValueBytes(BytesWritable value) {
059          this.value = value;
060        }
061    
062        public void reset(BytesWritable value) {
063          this.value = value;
064        }
065    
066        public void writeUncompressedBytes(DataOutputStream outStream)
067            throws IOException {
068          outStream.write(value.getBytes(), 0, value.getLength());
069        }
070    
071        public void writeCompressedBytes(DataOutputStream outStream)
072            throws IllegalArgumentException, IOException {
073          throw new UnsupportedOperationException(
074            "WritableValueBytes doesn't support RECORD compression"); 
075        }
076        
077        public int getSize(){
078          return value.getLength();
079        }
080      }
081    
082      /**
083       * Set the key class for the {@link SequenceFile}
084       * <p>This allows the user to specify the key class to be different 
085       * from the actual class ({@link BytesWritable}) used for writing </p>
086       * 
087       * @param job the {@link Job} to modify
088       * @param theClass the SequenceFile output key class.
089       */
090      static public void setSequenceFileOutputKeyClass(Job job, 
091          Class<?> theClass) {
092        job.getConfiguration().setClass(KEY_CLASS,
093          theClass, Object.class);
094      }
095    
096      /**
097       * Set the value class for the {@link SequenceFile}
098       * <p>This allows the user to specify the value class to be different 
099       * from the actual class ({@link BytesWritable}) used for writing </p>
100       * 
101       * @param job the {@link Job} to modify
102       * @param theClass the SequenceFile output key class.
103       */
104      static public void setSequenceFileOutputValueClass(Job job, 
105          Class<?> theClass) {
106        job.getConfiguration().setClass(VALUE_CLASS, 
107                      theClass, Object.class);
108      }
109    
110      /**
111       * Get the key class for the {@link SequenceFile}
112       * 
113       * @return the key class of the {@link SequenceFile}
114       */
115      static public Class<? extends WritableComparable> 
116          getSequenceFileOutputKeyClass(JobContext job) { 
117        return job.getConfiguration().getClass(KEY_CLASS,
118          job.getOutputKeyClass().asSubclass(WritableComparable.class), 
119          WritableComparable.class);
120      }
121    
122      /**
123       * Get the value class for the {@link SequenceFile}
124       * 
125       * @return the value class of the {@link SequenceFile}
126       */
127      static public Class<? extends Writable> getSequenceFileOutputValueClass(
128          JobContext job) { 
129        return job.getConfiguration().getClass(VALUE_CLASS, 
130          job.getOutputValueClass().asSubclass(Writable.class), Writable.class);
131      }
132      
133      @Override 
134      public RecordWriter<BytesWritable, BytesWritable> getRecordWriter(
135          TaskAttemptContext context) throws IOException {
136        final SequenceFile.Writer out = getSequenceWriter(context,
137          getSequenceFileOutputKeyClass(context),
138          getSequenceFileOutputValueClass(context)); 
139    
140        return new RecordWriter<BytesWritable, BytesWritable>() {
141          private WritableValueBytes wvaluebytes = new WritableValueBytes();
142    
143          public void write(BytesWritable bkey, BytesWritable bvalue)
144            throws IOException {
145            wvaluebytes.reset(bvalue);
146            out.appendRaw(bkey.getBytes(), 0, bkey.getLength(), wvaluebytes);
147            wvaluebytes.reset(null);
148          }
149    
150          public void close(TaskAttemptContext context) throws IOException { 
151            out.close();
152          }
153        };
154      }
155    
156      @Override 
157      public void checkOutputSpecs(JobContext job) throws IOException {
158        super.checkOutputSpecs(job);
159        if (getCompressOutput(job) && 
160            getOutputCompressionType(job) == CompressionType.RECORD ) {
161          throw new InvalidJobConfException("SequenceFileAsBinaryOutputFormat "
162            + "doesn't support Record Compression" );
163        }
164      }
165    }