001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.mapred; 019 020import java.io.IOException; 021 022import org.apache.hadoop.classification.InterfaceAudience; 023import org.apache.hadoop.classification.InterfaceStability; 024import org.apache.hadoop.conf.Configuration; 025import org.apache.hadoop.fs.FileSystem; 026import org.apache.hadoop.fs.Path; 027import org.apache.hadoop.io.BytesWritable; 028import org.apache.hadoop.io.DataOutputBuffer; 029import org.apache.hadoop.io.SequenceFile; 030 031/** 032 * InputFormat reading keys, values from SequenceFiles in binary (raw) 033 * format. 034 */ 035@InterfaceAudience.Public 036@InterfaceStability.Stable 037public class SequenceFileAsBinaryInputFormat 038 extends SequenceFileInputFormat<BytesWritable,BytesWritable> { 039 040 public SequenceFileAsBinaryInputFormat() { 041 super(); 042 } 043 044 public RecordReader<BytesWritable,BytesWritable> getRecordReader( 045 InputSplit split, JobConf job, Reporter reporter) 046 throws IOException { 047 return new SequenceFileAsBinaryRecordReader(job, (FileSplit)split); 048 } 049 050 /** 051 * Read records from a SequenceFile as binary (raw) bytes. 052 */ 053 public static class SequenceFileAsBinaryRecordReader 054 implements RecordReader<BytesWritable,BytesWritable> { 055 private SequenceFile.Reader in; 056 private long start; 057 private long end; 058 private boolean done = false; 059 private DataOutputBuffer buffer = new DataOutputBuffer(); 060 private SequenceFile.ValueBytes vbytes; 061 062 public SequenceFileAsBinaryRecordReader(Configuration conf, FileSplit split) 063 throws IOException { 064 Path path = split.getPath(); 065 FileSystem fs = path.getFileSystem(conf); 066 this.in = new SequenceFile.Reader(fs, path, conf); 067 this.end = split.getStart() + split.getLength(); 068 if (split.getStart() > in.getPosition()) 069 in.sync(split.getStart()); // sync to start 070 this.start = in.getPosition(); 071 vbytes = in.createValueBytes(); 072 done = start >= end; 073 } 074 075 public BytesWritable createKey() { 076 return new BytesWritable(); 077 } 078 079 public BytesWritable createValue() { 080 return new BytesWritable(); 081 } 082 083 /** 084 * Retrieve the name of the key class for this SequenceFile. 085 * @see org.apache.hadoop.io.SequenceFile.Reader#getKeyClassName 086 */ 087 public String getKeyClassName() { 088 return in.getKeyClassName(); 089 } 090 091 /** 092 * Retrieve the name of the value class for this SequenceFile. 093 * @see org.apache.hadoop.io.SequenceFile.Reader#getValueClassName 094 */ 095 public String getValueClassName() { 096 return in.getValueClassName(); 097 } 098 099 /** 100 * Read raw bytes from a SequenceFile. 101 */ 102 public synchronized boolean next(BytesWritable key, BytesWritable val) 103 throws IOException { 104 if (done) return false; 105 long pos = in.getPosition(); 106 boolean eof = -1 == in.nextRawKey(buffer); 107 if (!eof) { 108 key.set(buffer.getData(), 0, buffer.getLength()); 109 buffer.reset(); 110 in.nextRawValue(vbytes); 111 vbytes.writeUncompressedBytes(buffer); 112 val.set(buffer.getData(), 0, buffer.getLength()); 113 buffer.reset(); 114 } 115 return !(done = (eof || (pos >= end && in.syncSeen()))); 116 } 117 118 public long getPos() throws IOException { 119 return in.getPosition(); 120 } 121 122 public void close() throws IOException { 123 in.close(); 124 } 125 126 /** 127 * Return the progress within the input split 128 * @return 0.0 to 1.0 of the input byte range 129 */ 130 public float getProgress() throws IOException { 131 if (end == start) { 132 return 0.0f; 133 } else { 134 return Math.min(1.0f, (float)((in.getPosition() - start) / 135 (double)(end - start))); 136 } 137 } 138 } 139}