001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.hadoop.mapreduce.lib.input; 019 020 import java.io.IOException; 021 022 import org.apache.hadoop.classification.InterfaceAudience; 023 import org.apache.hadoop.classification.InterfaceStability; 024 import org.apache.hadoop.conf.Configuration; 025 import org.apache.hadoop.fs.FileSystem; 026 import org.apache.hadoop.fs.Path; 027 import org.apache.hadoop.io.BytesWritable; 028 import org.apache.hadoop.io.DataOutputBuffer; 029 import org.apache.hadoop.io.SequenceFile; 030 import org.apache.hadoop.mapreduce.InputSplit; 031 import org.apache.hadoop.mapreduce.RecordReader; 032 import org.apache.hadoop.mapreduce.TaskAttemptContext; 033 034 /** 035 * InputFormat reading keys, values from SequenceFiles in binary (raw) 036 * format. 037 */ 038 @InterfaceAudience.Public 039 @InterfaceStability.Stable 040 public class SequenceFileAsBinaryInputFormat 041 extends SequenceFileInputFormat<BytesWritable,BytesWritable> { 042 043 public SequenceFileAsBinaryInputFormat() { 044 super(); 045 } 046 047 public RecordReader<BytesWritable,BytesWritable> createRecordReader( 048 InputSplit split, TaskAttemptContext context) 049 throws IOException { 050 return new SequenceFileAsBinaryRecordReader(); 051 } 052 053 /** 054 * Read records from a SequenceFile as binary (raw) bytes. 055 */ 056 public static class SequenceFileAsBinaryRecordReader 057 extends RecordReader<BytesWritable,BytesWritable> { 058 private SequenceFile.Reader in; 059 private long start; 060 private long end; 061 private boolean done = false; 062 private DataOutputBuffer buffer = new DataOutputBuffer(); 063 private SequenceFile.ValueBytes vbytes; 064 private BytesWritable key = null; 065 private BytesWritable value = null; 066 067 public void initialize(InputSplit split, TaskAttemptContext context) 068 throws IOException, InterruptedException { 069 Path path = ((FileSplit)split).getPath(); 070 Configuration conf = context.getConfiguration(); 071 FileSystem fs = path.getFileSystem(conf); 072 this.in = new SequenceFile.Reader(fs, path, conf); 073 this.end = ((FileSplit)split).getStart() + split.getLength(); 074 if (((FileSplit)split).getStart() > in.getPosition()) { 075 in.sync(((FileSplit)split).getStart()); // sync to start 076 } 077 this.start = in.getPosition(); 078 vbytes = in.createValueBytes(); 079 done = start >= end; 080 } 081 082 @Override 083 public BytesWritable getCurrentKey() 084 throws IOException, InterruptedException { 085 return key; 086 } 087 088 @Override 089 public BytesWritable getCurrentValue() 090 throws IOException, InterruptedException { 091 return value; 092 } 093 094 /** 095 * Retrieve the name of the key class for this SequenceFile. 096 * @see org.apache.hadoop.io.SequenceFile.Reader#getKeyClassName 097 */ 098 public String getKeyClassName() { 099 return in.getKeyClassName(); 100 } 101 102 /** 103 * Retrieve the name of the value class for this SequenceFile. 104 * @see org.apache.hadoop.io.SequenceFile.Reader#getValueClassName 105 */ 106 public String getValueClassName() { 107 return in.getValueClassName(); 108 } 109 110 /** 111 * Read raw bytes from a SequenceFile. 112 */ 113 public synchronized boolean nextKeyValue() 114 throws IOException, InterruptedException { 115 if (done) { 116 return false; 117 } 118 long pos = in.getPosition(); 119 boolean eof = -1 == in.nextRawKey(buffer); 120 if (!eof) { 121 if (key == null) { 122 key = new BytesWritable(); 123 } 124 if (value == null) { 125 value = new BytesWritable(); 126 } 127 key.set(buffer.getData(), 0, buffer.getLength()); 128 buffer.reset(); 129 in.nextRawValue(vbytes); 130 vbytes.writeUncompressedBytes(buffer); 131 value.set(buffer.getData(), 0, buffer.getLength()); 132 buffer.reset(); 133 } 134 return !(done = (eof || (pos >= end && in.syncSeen()))); 135 } 136 137 public void close() throws IOException { 138 in.close(); 139 } 140 141 /** 142 * Return the progress within the input split 143 * @return 0.0 to 1.0 of the input byte range 144 */ 145 public float getProgress() throws IOException, InterruptedException { 146 if (end == start) { 147 return 0.0f; 148 } else { 149 return Math.min(1.0f, (float)((in.getPosition() - start) / 150 (double)(end - start))); 151 } 152 } 153 } 154 }