001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.hadoop.mapreduce.lib.output; 019 020 import java.io.IOException; 021 import java.io.DataOutputStream; 022 023 import org.apache.hadoop.classification.InterfaceAudience; 024 import org.apache.hadoop.classification.InterfaceStability; 025 import org.apache.hadoop.io.WritableComparable; 026 import org.apache.hadoop.io.Writable; 027 import org.apache.hadoop.io.BytesWritable; 028 import org.apache.hadoop.io.SequenceFile; 029 import org.apache.hadoop.io.SequenceFile.CompressionType; 030 import org.apache.hadoop.io.SequenceFile.ValueBytes; 031 import org.apache.hadoop.mapred.InvalidJobConfException; 032 import org.apache.hadoop.mapreduce.Job; 033 import org.apache.hadoop.mapreduce.JobContext; 034 import org.apache.hadoop.mapreduce.RecordWriter; 035 import org.apache.hadoop.mapreduce.TaskAttemptContext; 036 037 /** 038 * An {@link org.apache.hadoop.mapreduce.OutputFormat} that writes keys, 039 * values to {@link SequenceFile}s in binary(raw) format 040 */ 041 @InterfaceAudience.Public 042 @InterfaceStability.Stable 043 public class SequenceFileAsBinaryOutputFormat 044 extends SequenceFileOutputFormat <BytesWritable,BytesWritable> { 045 public static String KEY_CLASS = "mapreduce.output.seqbinaryoutputformat.key.class"; 046 public static String VALUE_CLASS = "mapreduce.output.seqbinaryoutputformat.value.class"; 047 048 /** 049 * Inner class used for appendRaw 050 */ 051 static public class WritableValueBytes implements ValueBytes { 052 private BytesWritable value; 053 054 public WritableValueBytes() { 055 this.value = null; 056 } 057 058 public WritableValueBytes(BytesWritable value) { 059 this.value = value; 060 } 061 062 public void reset(BytesWritable value) { 063 this.value = value; 064 } 065 066 public void writeUncompressedBytes(DataOutputStream outStream) 067 throws IOException { 068 outStream.write(value.getBytes(), 0, value.getLength()); 069 } 070 071 public void writeCompressedBytes(DataOutputStream outStream) 072 throws IllegalArgumentException, IOException { 073 throw new UnsupportedOperationException( 074 "WritableValueBytes doesn't support RECORD compression"); 075 } 076 077 public int getSize(){ 078 return value.getLength(); 079 } 080 } 081 082 /** 083 * Set the key class for the {@link SequenceFile} 084 * <p>This allows the user to specify the key class to be different 085 * from the actual class ({@link BytesWritable}) used for writing </p> 086 * 087 * @param job the {@link Job} to modify 088 * @param theClass the SequenceFile output key class. 089 */ 090 static public void setSequenceFileOutputKeyClass(Job job, 091 Class<?> theClass) { 092 job.getConfiguration().setClass(KEY_CLASS, 093 theClass, Object.class); 094 } 095 096 /** 097 * Set the value class for the {@link SequenceFile} 098 * <p>This allows the user to specify the value class to be different 099 * from the actual class ({@link BytesWritable}) used for writing </p> 100 * 101 * @param job the {@link Job} to modify 102 * @param theClass the SequenceFile output key class. 103 */ 104 static public void setSequenceFileOutputValueClass(Job job, 105 Class<?> theClass) { 106 job.getConfiguration().setClass(VALUE_CLASS, 107 theClass, Object.class); 108 } 109 110 /** 111 * Get the key class for the {@link SequenceFile} 112 * 113 * @return the key class of the {@link SequenceFile} 114 */ 115 static public Class<? extends WritableComparable> 116 getSequenceFileOutputKeyClass(JobContext job) { 117 return job.getConfiguration().getClass(KEY_CLASS, 118 job.getOutputKeyClass().asSubclass(WritableComparable.class), 119 WritableComparable.class); 120 } 121 122 /** 123 * Get the value class for the {@link SequenceFile} 124 * 125 * @return the value class of the {@link SequenceFile} 126 */ 127 static public Class<? extends Writable> getSequenceFileOutputValueClass( 128 JobContext job) { 129 return job.getConfiguration().getClass(VALUE_CLASS, 130 job.getOutputValueClass().asSubclass(Writable.class), Writable.class); 131 } 132 133 @Override 134 public RecordWriter<BytesWritable, BytesWritable> getRecordWriter( 135 TaskAttemptContext context) throws IOException { 136 final SequenceFile.Writer out = getSequenceWriter(context, 137 getSequenceFileOutputKeyClass(context), 138 getSequenceFileOutputValueClass(context)); 139 140 return new RecordWriter<BytesWritable, BytesWritable>() { 141 private WritableValueBytes wvaluebytes = new WritableValueBytes(); 142 143 public void write(BytesWritable bkey, BytesWritable bvalue) 144 throws IOException { 145 wvaluebytes.reset(bvalue); 146 out.appendRaw(bkey.getBytes(), 0, bkey.getLength(), wvaluebytes); 147 wvaluebytes.reset(null); 148 } 149 150 public void close(TaskAttemptContext context) throws IOException { 151 out.close(); 152 } 153 }; 154 } 155 156 @Override 157 public void checkOutputSpecs(JobContext job) throws IOException { 158 super.checkOutputSpecs(job); 159 if (getCompressOutput(job) && 160 getOutputCompressionType(job) == CompressionType.RECORD ) { 161 throw new InvalidJobConfException("SequenceFileAsBinaryOutputFormat " 162 + "doesn't support Record Compression" ); 163 } 164 } 165 }