001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.hadoop.mapred; 019 020 import java.io.IOException; 021 022 import org.apache.hadoop.classification.InterfaceAudience; 023 import org.apache.hadoop.classification.InterfaceStability; 024 import org.apache.hadoop.fs.FileSystem; 025 import org.apache.hadoop.fs.Path; 026 import org.apache.hadoop.io.BytesWritable; 027 import org.apache.hadoop.io.SequenceFile; 028 import org.apache.hadoop.io.SequenceFile.CompressionType; 029 import org.apache.hadoop.io.Writable; 030 import org.apache.hadoop.io.WritableComparable; 031 import org.apache.hadoop.io.compress.CompressionCodec; 032 import org.apache.hadoop.io.compress.DefaultCodec; 033 import org.apache.hadoop.util.Progressable; 034 import org.apache.hadoop.util.ReflectionUtils; 035 036 /** 037 * An {@link OutputFormat} that writes keys, values to 038 * {@link SequenceFile}s in binary(raw) format 039 */ 040 @InterfaceAudience.Public 041 @InterfaceStability.Stable 042 public class SequenceFileAsBinaryOutputFormat 043 extends SequenceFileOutputFormat <BytesWritable,BytesWritable> { 044 045 /** 046 * Inner class used for appendRaw 047 */ 048 static protected class WritableValueBytes extends org.apache.hadoop.mapreduce 049 .lib.output.SequenceFileAsBinaryOutputFormat.WritableValueBytes { 050 public WritableValueBytes() { 051 super(); 052 } 053 054 public WritableValueBytes(BytesWritable value) { 055 super(value); 056 } 057 } 058 059 /** 060 * Set the key class for the {@link SequenceFile} 061 * <p>This allows the user to specify the key class to be different 062 * from the actual class ({@link BytesWritable}) used for writing </p> 063 * 064 * @param conf the {@link JobConf} to modify 065 * @param theClass the SequenceFile output key class. 066 */ 067 static public void setSequenceFileOutputKeyClass(JobConf conf, 068 Class<?> theClass) { 069 conf.setClass(org.apache.hadoop.mapreduce.lib.output. 070 SequenceFileAsBinaryOutputFormat.KEY_CLASS, theClass, Object.class); 071 } 072 073 /** 074 * Set the value class for the {@link SequenceFile} 075 * <p>This allows the user to specify the value class to be different 076 * from the actual class ({@link BytesWritable}) used for writing </p> 077 * 078 * @param conf the {@link JobConf} to modify 079 * @param theClass the SequenceFile output key class. 080 */ 081 static public void setSequenceFileOutputValueClass(JobConf conf, 082 Class<?> theClass) { 083 conf.setClass(org.apache.hadoop.mapreduce.lib.output. 084 SequenceFileAsBinaryOutputFormat.VALUE_CLASS, theClass, Object.class); 085 } 086 087 /** 088 * Get the key class for the {@link SequenceFile} 089 * 090 * @return the key class of the {@link SequenceFile} 091 */ 092 static public Class<? extends WritableComparable> getSequenceFileOutputKeyClass(JobConf conf) { 093 return conf.getClass(org.apache.hadoop.mapreduce.lib.output. 094 SequenceFileAsBinaryOutputFormat.KEY_CLASS, 095 conf.getOutputKeyClass().asSubclass(WritableComparable.class), 096 WritableComparable.class); 097 } 098 099 /** 100 * Get the value class for the {@link SequenceFile} 101 * 102 * @return the value class of the {@link SequenceFile} 103 */ 104 static public Class<? extends Writable> getSequenceFileOutputValueClass(JobConf conf) { 105 return conf.getClass(org.apache.hadoop.mapreduce.lib.output. 106 SequenceFileAsBinaryOutputFormat.VALUE_CLASS, 107 conf.getOutputValueClass().asSubclass(Writable.class), Writable.class); 108 } 109 110 @Override 111 public RecordWriter <BytesWritable, BytesWritable> 112 getRecordWriter(FileSystem ignored, JobConf job, 113 String name, Progressable progress) 114 throws IOException { 115 // get the path of the temporary output file 116 Path file = FileOutputFormat.getTaskOutputPath(job, name); 117 118 FileSystem fs = file.getFileSystem(job); 119 CompressionCodec codec = null; 120 CompressionType compressionType = CompressionType.NONE; 121 if (getCompressOutput(job)) { 122 // find the kind of compression to do 123 compressionType = getOutputCompressionType(job); 124 125 // find the right codec 126 Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job, 127 DefaultCodec.class); 128 codec = ReflectionUtils.newInstance(codecClass, job); 129 } 130 final SequenceFile.Writer out = 131 SequenceFile.createWriter(fs, job, file, 132 getSequenceFileOutputKeyClass(job), 133 getSequenceFileOutputValueClass(job), 134 compressionType, 135 codec, 136 progress); 137 138 return new RecordWriter<BytesWritable, BytesWritable>() { 139 140 private WritableValueBytes wvaluebytes = new WritableValueBytes(); 141 142 public void write(BytesWritable bkey, BytesWritable bvalue) 143 throws IOException { 144 145 wvaluebytes.reset(bvalue); 146 out.appendRaw(bkey.getBytes(), 0, bkey.getLength(), wvaluebytes); 147 wvaluebytes.reset(null); 148 } 149 150 public void close(Reporter reporter) throws IOException { 151 out.close(); 152 } 153 154 }; 155 156 } 157 158 @Override 159 public void checkOutputSpecs(FileSystem ignored, JobConf job) 160 throws IOException { 161 super.checkOutputSpecs(ignored, job); 162 if (getCompressOutput(job) && 163 getOutputCompressionType(job) == CompressionType.RECORD ){ 164 throw new InvalidJobConfException("SequenceFileAsBinaryOutputFormat " 165 + "doesn't support Record Compression" ); 166 } 167 168 } 169 170 }