001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019 package org.apache.hadoop.mapred; 020 021 import java.io.IOException; 022 023 import org.apache.hadoop.classification.InterfaceAudience; 024 import org.apache.hadoop.classification.InterfaceStability; 025 import org.apache.hadoop.conf.Configuration; 026 import org.apache.hadoop.fs.FileSystem; 027 import org.apache.hadoop.fs.Path; 028 import org.apache.hadoop.io.BytesWritable; 029 import org.apache.hadoop.io.LongWritable; 030 import org.apache.hadoop.io.compress.CompressionCodec; 031 import org.apache.hadoop.io.compress.CompressionCodecFactory; 032 033 /** 034 * FixedLengthInputFormat is an input format used to read input files 035 * which contain fixed length records. The content of a record need not be 036 * text. It can be arbitrary binary data. Users must configure the record 037 * length property by calling: 038 * FixedLengthInputFormat.setRecordLength(conf, recordLength);<br><br> or 039 * conf.setInt(FixedLengthInputFormat.FIXED_RECORD_LENGTH, recordLength); 040 * <br><br> 041 * @see FixedLengthRecordReader 042 */ 043 @InterfaceAudience.Public 044 @InterfaceStability.Stable 045 public class FixedLengthInputFormat 046 extends FileInputFormat<LongWritable, BytesWritable> 047 implements JobConfigurable { 048 049 private CompressionCodecFactory compressionCodecs = null; 050 051 public static final String FIXED_RECORD_LENGTH = 052 "fixedlengthinputformat.record.length"; 053 054 /** 055 * Set the length of each record 056 * @param conf configuration 057 * @param recordLength the length of a record 058 */ 059 public static void setRecordLength(Configuration conf, int recordLength) { 060 conf.setInt(FIXED_RECORD_LENGTH, recordLength); 061 } 062 063 /** 064 * Get record length value 065 * @param conf configuration 066 * @return the record length, zero means none was set 067 */ 068 public static int getRecordLength(Configuration conf) { 069 return conf.getInt(FIXED_RECORD_LENGTH, 0); 070 } 071 072 @Override 073 public void configure(JobConf conf) { 074 compressionCodecs = new CompressionCodecFactory(conf); 075 } 076 077 @Override 078 public RecordReader<LongWritable, BytesWritable> 079 getRecordReader(InputSplit genericSplit, JobConf job, Reporter reporter) 080 throws IOException { 081 reporter.setStatus(genericSplit.toString()); 082 int recordLength = getRecordLength(job); 083 if (recordLength <= 0) { 084 throw new IOException("Fixed record length " + recordLength 085 + " is invalid. It should be set to a value greater than zero"); 086 } 087 return new FixedLengthRecordReader(job, (FileSplit)genericSplit, 088 recordLength); 089 } 090 091 @Override 092 protected boolean isSplitable(FileSystem fs, Path file) { 093 final CompressionCodec codec = compressionCodecs.getCodec(file); 094 return(null == codec); 095 } 096 097 }