001/* 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019 020package org.apache.hadoop.util; 021 022import static com.google.common.base.Preconditions.checkArgument; 023import static com.google.common.base.Preconditions.checkNotNull; 024 025import java.io.FilterInputStream; 026import java.io.IOException; 027import java.io.InputStream; 028 029import org.apache.hadoop.classification.InterfaceStability.Unstable; 030 031/** 032 * Copied from guava source code v15 (LimitedInputStream) 033 * Guava deprecated LimitInputStream in v14 and removed it in v15. Copying this class here 034 * allows to be compatible with guava 11 to 15+. 035 * 036 * Originally: org.apache.hadoop.hbase.io.LimitInputStream 037 */ 038@Unstable 039public final class LimitInputStream extends FilterInputStream { 040 private long left; 041 private long mark = -1; 042 043 public LimitInputStream(InputStream in, long limit) { 044 super(in); 045 checkNotNull(in); 046 checkArgument(limit >= 0, "limit must be non-negative"); 047 left = limit; 048 } 049 050 @Override 051 public int available() throws IOException { 052 return (int) Math.min(in.available(), left); 053 } 054 055 // it's okay to mark even if mark isn't supported, as reset won't work 056 @Override 057 public synchronized void mark(int readLimit) { 058 in.mark(readLimit); 059 mark = left; 060 } 061 062 @Override 063 public int read() throws IOException { 064 if (left == 0) { 065 return -1; 066 } 067 068 int result = in.read(); 069 if (result != -1) { 070 --left; 071 } 072 return result; 073 } 074 075 @Override 076 public int read(byte[] b, int off, int len) throws IOException { 077 if (len == 0) { 078 return 0; 079 } 080 if (left == 0) { 081 return -1; 082 } 083 084 len = (int) Math.min(len, left); 085 int result = in.read(b, off, len); 086 if (result != -1) { 087 left -= result; 088 } 089 return result; 090 } 091 092 @Override 093 public synchronized void reset() throws IOException { 094 if (!in.markSupported()) { 095 throw new IOException("Mark not supported"); 096 } 097 if (mark == -1) { 098 throw new IOException("Mark not set"); 099 } 100 101 in.reset(); 102 left = mark; 103 } 104 105 @Override 106 public long skip(long n) throws IOException { 107 n = Math.min(n, left); 108 long skipped = in.skip(n); 109 left -= skipped; 110 return skipped; 111 } 112}