001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.io; 019 020import java.io.FileDescriptor; 021import java.io.IOException; 022import java.util.concurrent.ArrayBlockingQueue; 023import java.util.concurrent.ThreadPoolExecutor; 024import java.util.concurrent.TimeUnit; 025 026import org.apache.commons.logging.Log; 027import org.apache.commons.logging.LogFactory; 028import org.apache.hadoop.classification.InterfaceAudience; 029import org.apache.hadoop.classification.InterfaceStability; 030import org.apache.hadoop.io.nativeio.NativeIO; 031 032import static org.apache.hadoop.io.nativeio.NativeIO.POSIX.POSIX_FADV_WILLNEED; 033 034import com.google.common.base.Preconditions; 035import com.google.common.util.concurrent.ThreadFactoryBuilder; 036 037/** 038 * Manages a pool of threads which can issue readahead requests on file descriptors. 039 */ 040@InterfaceAudience.Private 041@InterfaceStability.Evolving 042public class ReadaheadPool { 043 static final Log LOG = LogFactory.getLog(ReadaheadPool.class); 044 private static final int POOL_SIZE = 4; 045 private static final int MAX_POOL_SIZE = 16; 046 private static final int CAPACITY = 1024; 047 private final ThreadPoolExecutor pool; 048 049 private static ReadaheadPool instance; 050 051 /** 052 * Return the singleton instance for the current process. 053 */ 054 public static ReadaheadPool getInstance() { 055 synchronized (ReadaheadPool.class) { 056 if (instance == null && NativeIO.isAvailable()) { 057 instance = new ReadaheadPool(); 058 } 059 return instance; 060 } 061 } 062 063 private ReadaheadPool() { 064 pool = new ThreadPoolExecutor(POOL_SIZE, MAX_POOL_SIZE, 3L, TimeUnit.SECONDS, 065 new ArrayBlockingQueue<Runnable>(CAPACITY)); 066 pool.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy()); 067 pool.setThreadFactory(new ThreadFactoryBuilder() 068 .setDaemon(true) 069 .setNameFormat("Readahead Thread #%d") 070 .build()); 071 } 072 073 /** 074 * Issue a request to readahead on the given file descriptor. 075 * 076 * @param identifier a textual identifier that will be used in error 077 * messages (e.g. the file name) 078 * @param fd the file descriptor to read ahead 079 * @param curPos the current offset at which reads are being issued 080 * @param readaheadLength the configured length to read ahead 081 * @param maxOffsetToRead the maximum offset that will be readahead 082 * (useful if, for example, only some segment of the file is 083 * requested by the user). Pass {@link Long.MAX_VALUE} to allow 084 * readahead to the end of the file. 085 * @param lastReadahead the result returned by the previous invocation 086 * of this function on this file descriptor, or null if this is 087 * the first call 088 * @return an object representing this outstanding request, or null 089 * if no readahead was performed 090 */ 091 public ReadaheadRequest readaheadStream( 092 String identifier, 093 FileDescriptor fd, 094 long curPos, 095 long readaheadLength, 096 long maxOffsetToRead, 097 ReadaheadRequest lastReadahead) { 098 099 Preconditions.checkArgument(curPos <= maxOffsetToRead, 100 "Readahead position %s higher than maxOffsetToRead %s", 101 curPos, maxOffsetToRead); 102 103 if (readaheadLength <= 0) { 104 return null; 105 } 106 107 long lastOffset = Long.MIN_VALUE; 108 109 if (lastReadahead != null) { 110 lastOffset = lastReadahead.getOffset(); 111 } 112 113 // trigger each readahead when we have reached the halfway mark 114 // in the previous readahead. This gives the system time 115 // to satisfy the readahead before we start reading the data. 116 long nextOffset = lastOffset + readaheadLength / 2; 117 if (curPos >= nextOffset) { 118 // cancel any currently pending readahead, to avoid 119 // piling things up in the queue. Each reader should have at most 120 // one outstanding request in the queue. 121 if (lastReadahead != null) { 122 lastReadahead.cancel(); 123 lastReadahead = null; 124 } 125 126 long length = Math.min(readaheadLength, 127 maxOffsetToRead - curPos); 128 129 if (length <= 0) { 130 // we've reached the end of the stream 131 return null; 132 } 133 134 return submitReadahead(identifier, fd, curPos, length); 135 } else { 136 return lastReadahead; 137 } 138 } 139 140 /** 141 * Submit a request to readahead on the given file descriptor. 142 * @param identifier a textual identifier used in error messages, etc. 143 * @param fd the file descriptor to readahead 144 * @param off the offset at which to start the readahead 145 * @param len the number of bytes to read 146 * @return an object representing this pending request 147 */ 148 public ReadaheadRequest submitReadahead( 149 String identifier, FileDescriptor fd, long off, long len) { 150 ReadaheadRequestImpl req = new ReadaheadRequestImpl( 151 identifier, fd, off, len); 152 pool.execute(req); 153 if (LOG.isTraceEnabled()) { 154 LOG.trace("submit readahead: " + req); 155 } 156 return req; 157 } 158 159 /** 160 * An outstanding readahead request that has been submitted to 161 * the pool. This request may be pending or may have been 162 * completed. 163 */ 164 public interface ReadaheadRequest { 165 /** 166 * Cancels the request for readahead. This should be used 167 * if the reader no longer needs the requested data, <em>before</em> 168 * closing the related file descriptor. 169 * 170 * It is safe to use even if the readahead request has already 171 * been fulfilled. 172 */ 173 public void cancel(); 174 175 /** 176 * @return the requested offset 177 */ 178 public long getOffset(); 179 180 /** 181 * @return the requested length 182 */ 183 public long getLength(); 184 } 185 186 private static class ReadaheadRequestImpl implements Runnable, ReadaheadRequest { 187 private final String identifier; 188 private final FileDescriptor fd; 189 private final long off, len; 190 private volatile boolean canceled = false; 191 192 private ReadaheadRequestImpl(String identifier, FileDescriptor fd, long off, long len) { 193 this.identifier = identifier; 194 this.fd = fd; 195 this.off = off; 196 this.len = len; 197 } 198 199 @Override 200 public void run() { 201 if (canceled) return; 202 // There's a very narrow race here that the file will close right at 203 // this instant. But if that happens, we'll likely receive an EBADF 204 // error below, and see that it's canceled, ignoring the error. 205 // It's also possible that we'll end up requesting readahead on some 206 // other FD, which may be wasted work, but won't cause a problem. 207 try { 208 NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(identifier, 209 fd, off, len, POSIX_FADV_WILLNEED); 210 } catch (IOException ioe) { 211 if (canceled) { 212 // no big deal - the reader canceled the request and closed 213 // the file. 214 return; 215 } 216 LOG.warn("Failed readahead on " + identifier, 217 ioe); 218 } 219 } 220 221 @Override 222 public void cancel() { 223 canceled = true; 224 // We could attempt to remove it from the work queue, but that would 225 // add complexity. In practice, the work queues remain very short, 226 // so removing canceled requests has no gain. 227 } 228 229 @Override 230 public long getOffset() { 231 return off; 232 } 233 234 @Override 235 public long getLength() { 236 return len; 237 } 238 239 @Override 240 public String toString() { 241 return "ReadaheadRequestImpl [identifier='" + identifier + "', fd=" + fd 242 + ", off=" + off + ", len=" + len + "]"; 243 } 244 } 245}