001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.mapred; 019 020import java.io.IOException; 021 022import org.apache.hadoop.classification.InterfaceAudience; 023import org.apache.hadoop.classification.InterfaceStability; 024import org.apache.hadoop.fs.FileSystem; 025import org.apache.hadoop.fs.Path; 026import org.apache.hadoop.io.BytesWritable; 027import org.apache.hadoop.io.SequenceFile; 028import org.apache.hadoop.io.SequenceFile.CompressionType; 029import org.apache.hadoop.io.Writable; 030import org.apache.hadoop.io.WritableComparable; 031import org.apache.hadoop.io.compress.CompressionCodec; 032import org.apache.hadoop.io.compress.DefaultCodec; 033import org.apache.hadoop.util.Progressable; 034import org.apache.hadoop.util.ReflectionUtils; 035 036/** 037 * An {@link OutputFormat} that writes keys, values to 038 * {@link SequenceFile}s in binary(raw) format 039 */ 040@InterfaceAudience.Public 041@InterfaceStability.Stable 042public class SequenceFileAsBinaryOutputFormat 043 extends SequenceFileOutputFormat <BytesWritable,BytesWritable> { 044 045 /** 046 * Inner class used for appendRaw 047 */ 048 static protected class WritableValueBytes extends org.apache.hadoop.mapreduce 049 .lib.output.SequenceFileAsBinaryOutputFormat.WritableValueBytes { 050 } 051 052 /** 053 * Set the key class for the {@link SequenceFile} 054 * <p>This allows the user to specify the key class to be different 055 * from the actual class ({@link BytesWritable}) used for writing </p> 056 * 057 * @param conf the {@link JobConf} to modify 058 * @param theClass the SequenceFile output key class. 059 */ 060 static public void setSequenceFileOutputKeyClass(JobConf conf, 061 Class<?> theClass) { 062 conf.setClass(org.apache.hadoop.mapreduce.lib.output. 063 SequenceFileAsBinaryOutputFormat.KEY_CLASS, theClass, Object.class); 064 } 065 066 /** 067 * Set the value class for the {@link SequenceFile} 068 * <p>This allows the user to specify the value class to be different 069 * from the actual class ({@link BytesWritable}) used for writing </p> 070 * 071 * @param conf the {@link JobConf} to modify 072 * @param theClass the SequenceFile output key class. 073 */ 074 static public void setSequenceFileOutputValueClass(JobConf conf, 075 Class<?> theClass) { 076 conf.setClass(org.apache.hadoop.mapreduce.lib.output. 077 SequenceFileAsBinaryOutputFormat.VALUE_CLASS, theClass, Object.class); 078 } 079 080 /** 081 * Get the key class for the {@link SequenceFile} 082 * 083 * @return the key class of the {@link SequenceFile} 084 */ 085 static public Class<? extends WritableComparable> getSequenceFileOutputKeyClass(JobConf conf) { 086 return conf.getClass(org.apache.hadoop.mapreduce.lib.output. 087 SequenceFileAsBinaryOutputFormat.KEY_CLASS, 088 conf.getOutputKeyClass().asSubclass(WritableComparable.class), 089 WritableComparable.class); 090 } 091 092 /** 093 * Get the value class for the {@link SequenceFile} 094 * 095 * @return the value class of the {@link SequenceFile} 096 */ 097 static public Class<? extends Writable> getSequenceFileOutputValueClass(JobConf conf) { 098 return conf.getClass(org.apache.hadoop.mapreduce.lib.output. 099 SequenceFileAsBinaryOutputFormat.VALUE_CLASS, 100 conf.getOutputValueClass().asSubclass(Writable.class), Writable.class); 101 } 102 103 @Override 104 public RecordWriter <BytesWritable, BytesWritable> 105 getRecordWriter(FileSystem ignored, JobConf job, 106 String name, Progressable progress) 107 throws IOException { 108 // get the path of the temporary output file 109 Path file = FileOutputFormat.getTaskOutputPath(job, name); 110 111 FileSystem fs = file.getFileSystem(job); 112 CompressionCodec codec = null; 113 CompressionType compressionType = CompressionType.NONE; 114 if (getCompressOutput(job)) { 115 // find the kind of compression to do 116 compressionType = getOutputCompressionType(job); 117 118 // find the right codec 119 Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job, 120 DefaultCodec.class); 121 codec = ReflectionUtils.newInstance(codecClass, job); 122 } 123 final SequenceFile.Writer out = 124 SequenceFile.createWriter(fs, job, file, 125 getSequenceFileOutputKeyClass(job), 126 getSequenceFileOutputValueClass(job), 127 compressionType, 128 codec, 129 progress); 130 131 return new RecordWriter<BytesWritable, BytesWritable>() { 132 133 private WritableValueBytes wvaluebytes = new WritableValueBytes(); 134 135 public void write(BytesWritable bkey, BytesWritable bvalue) 136 throws IOException { 137 138 wvaluebytes.reset(bvalue); 139 out.appendRaw(bkey.getBytes(), 0, bkey.getLength(), wvaluebytes); 140 wvaluebytes.reset(null); 141 } 142 143 public void close(Reporter reporter) throws IOException { 144 out.close(); 145 } 146 147 }; 148 149 } 150 151 @Override 152 public void checkOutputSpecs(FileSystem ignored, JobConf job) 153 throws IOException { 154 super.checkOutputSpecs(ignored, job); 155 if (getCompressOutput(job) && 156 getOutputCompressionType(job) == CompressionType.RECORD ){ 157 throw new InvalidJobConfException("SequenceFileAsBinaryOutputFormat " 158 + "doesn't support Record Compression" ); 159 } 160 161 } 162 163}