001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.mapreduce.lib.output;
019
020import java.io.IOException;
021import java.io.DataOutputStream;
022
023import org.apache.hadoop.classification.InterfaceAudience;
024import org.apache.hadoop.classification.InterfaceStability;
025import org.apache.hadoop.io.WritableComparable;
026import org.apache.hadoop.io.Writable;
027import org.apache.hadoop.io.BytesWritable;
028import org.apache.hadoop.io.SequenceFile;
029import org.apache.hadoop.io.SequenceFile.CompressionType;
030import org.apache.hadoop.io.SequenceFile.ValueBytes;
031import org.apache.hadoop.mapred.InvalidJobConfException;
032import org.apache.hadoop.mapreduce.Job;
033import org.apache.hadoop.mapreduce.JobContext;
034import org.apache.hadoop.mapreduce.RecordWriter;
035import org.apache.hadoop.mapreduce.TaskAttemptContext;
036
037/** 
038 * An {@link org.apache.hadoop.mapreduce.OutputFormat} that writes keys, 
039 * values to {@link SequenceFile}s in binary(raw) format
040 */
041@InterfaceAudience.Public
042@InterfaceStability.Stable
043public class SequenceFileAsBinaryOutputFormat 
044    extends SequenceFileOutputFormat <BytesWritable,BytesWritable> {
045  public static String KEY_CLASS = "mapreduce.output.seqbinaryoutputformat.key.class"; 
046  public static String VALUE_CLASS = "mapreduce.output.seqbinaryoutputformat.value.class"; 
047
048  /** 
049   * Inner class used for appendRaw
050   */
051  static public class WritableValueBytes implements ValueBytes {
052    private BytesWritable value;
053
054    public WritableValueBytes() {
055      this.value = null;
056    }
057    
058    public WritableValueBytes(BytesWritable value) {
059      this.value = value;
060    }
061
062    public void reset(BytesWritable value) {
063      this.value = value;
064    }
065
066    public void writeUncompressedBytes(DataOutputStream outStream)
067        throws IOException {
068      outStream.write(value.getBytes(), 0, value.getLength());
069    }
070
071    public void writeCompressedBytes(DataOutputStream outStream)
072        throws IllegalArgumentException, IOException {
073      throw new UnsupportedOperationException(
074        "WritableValueBytes doesn't support RECORD compression"); 
075    }
076    
077    public int getSize(){
078      return value.getLength();
079    }
080  }
081
082  /**
083   * Set the key class for the {@link SequenceFile}
084   * <p>This allows the user to specify the key class to be different 
085   * from the actual class ({@link BytesWritable}) used for writing </p>
086   * 
087   * @param job the {@link Job} to modify
088   * @param theClass the SequenceFile output key class.
089   */
090  static public void setSequenceFileOutputKeyClass(Job job, 
091      Class<?> theClass) {
092    job.getConfiguration().setClass(KEY_CLASS,
093      theClass, Object.class);
094  }
095
096  /**
097   * Set the value class for the {@link SequenceFile}
098   * <p>This allows the user to specify the value class to be different 
099   * from the actual class ({@link BytesWritable}) used for writing </p>
100   * 
101   * @param job the {@link Job} to modify
102   * @param theClass the SequenceFile output key class.
103   */
104  static public void setSequenceFileOutputValueClass(Job job, 
105      Class<?> theClass) {
106    job.getConfiguration().setClass(VALUE_CLASS, 
107                  theClass, Object.class);
108  }
109
110  /**
111   * Get the key class for the {@link SequenceFile}
112   * 
113   * @return the key class of the {@link SequenceFile}
114   */
115  static public Class<? extends WritableComparable> 
116      getSequenceFileOutputKeyClass(JobContext job) { 
117    return job.getConfiguration().getClass(KEY_CLASS,
118      job.getOutputKeyClass().asSubclass(WritableComparable.class), 
119      WritableComparable.class);
120  }
121
122  /**
123   * Get the value class for the {@link SequenceFile}
124   * 
125   * @return the value class of the {@link SequenceFile}
126   */
127  static public Class<? extends Writable> getSequenceFileOutputValueClass(
128      JobContext job) { 
129    return job.getConfiguration().getClass(VALUE_CLASS, 
130      job.getOutputValueClass().asSubclass(Writable.class), Writable.class);
131  }
132  
133  @Override 
134  public RecordWriter<BytesWritable, BytesWritable> getRecordWriter(
135      TaskAttemptContext context) throws IOException {
136    final SequenceFile.Writer out = getSequenceWriter(context,
137      getSequenceFileOutputKeyClass(context),
138      getSequenceFileOutputValueClass(context)); 
139
140    return new RecordWriter<BytesWritable, BytesWritable>() {
141      private WritableValueBytes wvaluebytes = new WritableValueBytes();
142
143      public void write(BytesWritable bkey, BytesWritable bvalue)
144        throws IOException {
145        wvaluebytes.reset(bvalue);
146        out.appendRaw(bkey.getBytes(), 0, bkey.getLength(), wvaluebytes);
147        wvaluebytes.reset(null);
148      }
149
150      public void close(TaskAttemptContext context) throws IOException { 
151        out.close();
152      }
153    };
154  }
155
156  @Override 
157  public void checkOutputSpecs(JobContext job) throws IOException {
158    super.checkOutputSpecs(job);
159    if (getCompressOutput(job) && 
160        getOutputCompressionType(job) == CompressionType.RECORD ) {
161      throw new InvalidJobConfException("SequenceFileAsBinaryOutputFormat "
162        + "doesn't support Record Compression" );
163    }
164  }
165}