001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.mapred;
020
021import java.io.IOException;
022
023import org.apache.hadoop.classification.InterfaceAudience;
024import org.apache.hadoop.classification.InterfaceStability;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.fs.FileSystem;
027import org.apache.hadoop.fs.Path;
028import org.apache.hadoop.io.BytesWritable;
029import org.apache.hadoop.io.LongWritable;
030import org.apache.hadoop.io.compress.CompressionCodec;
031import org.apache.hadoop.io.compress.CompressionCodecFactory;
032
033/**
034 * FixedLengthInputFormat is an input format used to read input files
035 * which contain fixed length records.  The content of a record need not be
036 * text.  It can be arbitrary binary data.  Users must configure the record
037 * length property by calling:
038 * FixedLengthInputFormat.setRecordLength(conf, recordLength);<br><br> or
039 * conf.setInt(FixedLengthInputFormat.FIXED_RECORD_LENGTH, recordLength);
040 * <br><br>
041 * @see FixedLengthRecordReader
042 */
043@InterfaceAudience.Public
044@InterfaceStability.Stable
045public class FixedLengthInputFormat
046    extends FileInputFormat<LongWritable, BytesWritable>
047    implements JobConfigurable {
048
049  private CompressionCodecFactory compressionCodecs = null;
050  
051  public static final String FIXED_RECORD_LENGTH =
052      "fixedlengthinputformat.record.length"; 
053
054  /**
055   * Set the length of each record
056   * @param conf configuration
057   * @param recordLength the length of a record
058   */
059  public static void setRecordLength(Configuration conf, int recordLength) {
060    conf.setInt(FIXED_RECORD_LENGTH, recordLength);
061  }
062
063  /**
064   * Get record length value
065   * @param conf configuration
066   * @return the record length, zero means none was set
067   */
068  public static int getRecordLength(Configuration conf) {
069    return conf.getInt(FIXED_RECORD_LENGTH, 0);
070  }
071
072  @Override
073  public void configure(JobConf conf) {
074    compressionCodecs = new CompressionCodecFactory(conf);
075  }
076
077  @Override
078  public RecordReader<LongWritable, BytesWritable>
079      getRecordReader(InputSplit genericSplit, JobConf job, Reporter reporter)
080      throws IOException {
081    reporter.setStatus(genericSplit.toString());
082    int recordLength = getRecordLength(job);
083    if (recordLength <= 0) {
084      throw new IOException("Fixed record length " + recordLength
085          + " is invalid.  It should be set to a value greater than zero");
086    }
087    return new FixedLengthRecordReader(job, (FileSplit)genericSplit,
088                                       recordLength);
089  }
090
091  @Override
092  protected boolean isSplitable(FileSystem fs, Path file) {
093    final CompressionCodec codec = compressionCodecs.getCodec(file);
094    return(null == codec);
095  }
096
097}