001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.mapred;
020
021 import java.io.IOException;
022
023 import org.apache.hadoop.classification.InterfaceAudience;
024 import org.apache.hadoop.classification.InterfaceStability;
025 import org.apache.hadoop.conf.Configuration;
026 import org.apache.hadoop.fs.FileSystem;
027 import org.apache.hadoop.fs.Path;
028 import org.apache.hadoop.io.BytesWritable;
029 import org.apache.hadoop.io.LongWritable;
030 import org.apache.hadoop.io.compress.CompressionCodec;
031 import org.apache.hadoop.io.compress.CompressionCodecFactory;
032
033 /**
034 * FixedLengthInputFormat is an input format used to read input files
035 * which contain fixed length records. The content of a record need not be
036 * text. It can be arbitrary binary data. Users must configure the record
037 * length property by calling:
038 * FixedLengthInputFormat.setRecordLength(conf, recordLength);<br><br> or
039 * conf.setInt(FixedLengthInputFormat.FIXED_RECORD_LENGTH, recordLength);
040 * <br><br>
041 * @see FixedLengthRecordReader
042 */
043 @InterfaceAudience.Public
044 @InterfaceStability.Stable
045 public class FixedLengthInputFormat
046 extends FileInputFormat<LongWritable, BytesWritable>
047 implements JobConfigurable {
048
049 private CompressionCodecFactory compressionCodecs = null;
050
051 public static final String FIXED_RECORD_LENGTH =
052 "fixedlengthinputformat.record.length";
053
054 /**
055 * Set the length of each record
056 * @param conf configuration
057 * @param recordLength the length of a record
058 */
059 public static void setRecordLength(Configuration conf, int recordLength) {
060 conf.setInt(FIXED_RECORD_LENGTH, recordLength);
061 }
062
063 /**
064 * Get record length value
065 * @param conf configuration
066 * @return the record length, zero means none was set
067 */
068 public static int getRecordLength(Configuration conf) {
069 return conf.getInt(FIXED_RECORD_LENGTH, 0);
070 }
071
072 @Override
073 public void configure(JobConf conf) {
074 compressionCodecs = new CompressionCodecFactory(conf);
075 }
076
077 @Override
078 public RecordReader<LongWritable, BytesWritable>
079 getRecordReader(InputSplit genericSplit, JobConf job, Reporter reporter)
080 throws IOException {
081 reporter.setStatus(genericSplit.toString());
082 int recordLength = getRecordLength(job);
083 if (recordLength <= 0) {
084 throw new IOException("Fixed record length " + recordLength
085 + " is invalid. It should be set to a value greater than zero");
086 }
087 return new FixedLengthRecordReader(job, (FileSplit)genericSplit,
088 recordLength);
089 }
090
091 @Override
092 protected boolean isSplitable(FileSystem fs, Path file) {
093 final CompressionCodec codec = compressionCodecs.getCodec(file);
094 return(null == codec);
095 }
096
097 }