001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hdfs.server.datanode; 019 020import java.io.File; 021import java.io.FileOutputStream; 022import java.io.IOException; 023import java.io.OutputStream; 024import java.io.RandomAccessFile; 025import java.util.concurrent.atomic.AtomicReference; 026 027import org.apache.hadoop.hdfs.protocol.Block; 028import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; 029import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; 030import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams; 031import org.apache.hadoop.io.IOUtils; 032import org.apache.hadoop.util.DataChecksum; 033import org.apache.hadoop.util.StringUtils; 034 035/** 036 * This class defines a replica in a pipeline, which 037 * includes a persistent replica being written to by a dfs client or 038 * a temporary replica being replicated by a source datanode or 039 * being copied for the balancing purpose. 040 * 041 * The base class implements a temporary replica 042 */ 043public class ReplicaInPipeline extends ReplicaInfo 044 implements ReplicaInPipelineInterface { 045 private long bytesAcked; 046 private long bytesOnDisk; 047 private byte[] lastChecksum; 048 private AtomicReference<Thread> writer = new AtomicReference<Thread>(); 049 050 /** 051 * Bytes reserved for this replica on the containing volume. 052 * Based off difference between the estimated maximum block length and 053 * the bytes already written to this block. 054 */ 055 private long bytesReserved; 056 private final long originalBytesReserved; 057 058 /** 059 * Constructor for a zero length replica 060 * @param blockId block id 061 * @param genStamp replica generation stamp 062 * @param vol volume where replica is located 063 * @param dir directory path where block and meta files are located 064 * @param bytesToReserve disk space to reserve for this replica, based on 065 * the estimated maximum block length. 066 */ 067 public ReplicaInPipeline(long blockId, long genStamp, 068 FsVolumeSpi vol, File dir, long bytesToReserve) { 069 this(blockId, 0L, genStamp, vol, dir, Thread.currentThread(), bytesToReserve); 070 } 071 072 /** 073 * Constructor 074 * @param block a block 075 * @param vol volume where replica is located 076 * @param dir directory path where block and meta files are located 077 * @param writer a thread that is writing to this replica 078 */ 079 ReplicaInPipeline(Block block, 080 FsVolumeSpi vol, File dir, Thread writer) { 081 this( block.getBlockId(), block.getNumBytes(), block.getGenerationStamp(), 082 vol, dir, writer, 0L); 083 } 084 085 /** 086 * Constructor 087 * @param blockId block id 088 * @param len replica length 089 * @param genStamp replica generation stamp 090 * @param vol volume where replica is located 091 * @param dir directory path where block and meta files are located 092 * @param writer a thread that is writing to this replica 093 * @param bytesToReserve disk space to reserve for this replica, based on 094 * the estimated maximum block length. 095 */ 096 ReplicaInPipeline(long blockId, long len, long genStamp, 097 FsVolumeSpi vol, File dir, Thread writer, long bytesToReserve) { 098 super( blockId, len, genStamp, vol, dir); 099 this.bytesAcked = len; 100 this.bytesOnDisk = len; 101 this.writer.set(writer); 102 this.bytesReserved = bytesToReserve; 103 this.originalBytesReserved = bytesToReserve; 104 } 105 106 /** 107 * Copy constructor. 108 * @param from where to copy from 109 */ 110 public ReplicaInPipeline(ReplicaInPipeline from) { 111 super(from); 112 this.bytesAcked = from.getBytesAcked(); 113 this.bytesOnDisk = from.getBytesOnDisk(); 114 this.writer.set(from.writer.get()); 115 this.bytesReserved = from.bytesReserved; 116 this.originalBytesReserved = from.originalBytesReserved; 117 } 118 119 @Override 120 public long getVisibleLength() { 121 return -1; 122 } 123 124 @Override //ReplicaInfo 125 public ReplicaState getState() { 126 return ReplicaState.TEMPORARY; 127 } 128 129 @Override // ReplicaInPipelineInterface 130 public long getBytesAcked() { 131 return bytesAcked; 132 } 133 134 @Override // ReplicaInPipelineInterface 135 public void setBytesAcked(long bytesAcked) { 136 long newBytesAcked = bytesAcked - this.bytesAcked; 137 this.bytesAcked = bytesAcked; 138 139 // Once bytes are ACK'ed we can release equivalent space from the 140 // volume's reservedForRbw count. We could have released it as soon 141 // as the write-to-disk completed but that would be inefficient. 142 getVolume().releaseReservedSpace(newBytesAcked); 143 bytesReserved -= newBytesAcked; 144 } 145 146 @Override // ReplicaInPipelineInterface 147 public long getBytesOnDisk() { 148 return bytesOnDisk; 149 } 150 151 @Override 152 public long getBytesReserved() { 153 return bytesReserved; 154 } 155 156 @Override 157 public long getOriginalBytesReserved() { 158 return originalBytesReserved; 159 } 160 161 @Override 162 public void releaseAllBytesReserved() { // ReplicaInPipelineInterface 163 getVolume().releaseReservedSpace(bytesReserved); 164 getVolume().releaseLockedMemory(bytesReserved); 165 bytesReserved = 0; 166 } 167 168 @Override // ReplicaInPipelineInterface 169 public synchronized void setLastChecksumAndDataLen(long dataLength, byte[] lastChecksum) { 170 this.bytesOnDisk = dataLength; 171 this.lastChecksum = lastChecksum; 172 } 173 174 @Override // ReplicaInPipelineInterface 175 public synchronized ChunkChecksum getLastChecksumAndDataLen() { 176 return new ChunkChecksum(getBytesOnDisk(), lastChecksum); 177 } 178 179 public void interruptThread() { 180 Thread thread = writer.get(); 181 if (thread != null && thread != Thread.currentThread() 182 && thread.isAlive()) { 183 thread.interrupt(); 184 } 185 } 186 187 @Override // Object 188 public boolean equals(Object o) { 189 return super.equals(o); 190 } 191 192 /** 193 * Attempt to set the writer to a new value. 194 */ 195 public boolean attemptToSetWriter(Thread prevWriter, Thread newWriter) { 196 return writer.compareAndSet(prevWriter, newWriter); 197 } 198 199 /** 200 * Interrupt the writing thread and wait until it dies 201 * @throws IOException the waiting is interrupted 202 */ 203 public void stopWriter(long xceiverStopTimeout) throws IOException { 204 while (true) { 205 Thread thread = writer.get(); 206 if ((thread == null) || (thread == Thread.currentThread()) || 207 (!thread.isAlive())) { 208 if (writer.compareAndSet(thread, null) == true) { 209 return; // Done 210 } 211 // The writer changed. Go back to the start of the loop and attempt to 212 // stop the new writer. 213 continue; 214 } 215 thread.interrupt(); 216 try { 217 thread.join(xceiverStopTimeout); 218 if (thread.isAlive()) { 219 // Our thread join timed out. 220 final String msg = "Join on writer thread " + thread + " timed out"; 221 DataNode.LOG.warn(msg + "\n" + StringUtils.getStackTrace(thread)); 222 throw new IOException(msg); 223 } 224 } catch (InterruptedException e) { 225 throw new IOException("Waiting for writer thread is interrupted."); 226 } 227 } 228 } 229 230 @Override // Object 231 public int hashCode() { 232 return super.hashCode(); 233 } 234 235 @Override // ReplicaInPipelineInterface 236 public ReplicaOutputStreams createStreams(boolean isCreate, 237 DataChecksum requestedChecksum) throws IOException { 238 File blockFile = getBlockFile(); 239 File metaFile = getMetaFile(); 240 if (DataNode.LOG.isDebugEnabled()) { 241 DataNode.LOG.debug("writeTo blockfile is " + blockFile + 242 " of size " + blockFile.length()); 243 DataNode.LOG.debug("writeTo metafile is " + metaFile + 244 " of size " + metaFile.length()); 245 } 246 long blockDiskSize = 0L; 247 long crcDiskSize = 0L; 248 249 // the checksum that should actually be used -- this 250 // may differ from requestedChecksum for appends. 251 final DataChecksum checksum; 252 253 RandomAccessFile metaRAF = new RandomAccessFile(metaFile, "rw"); 254 255 if (!isCreate) { 256 // For append or recovery, we must enforce the existing checksum. 257 // Also, verify that the file has correct lengths, etc. 258 boolean checkedMeta = false; 259 try { 260 BlockMetadataHeader header = BlockMetadataHeader.readHeader(metaRAF); 261 checksum = header.getChecksum(); 262 263 if (checksum.getBytesPerChecksum() != 264 requestedChecksum.getBytesPerChecksum()) { 265 throw new IOException("Client requested checksum " + 266 requestedChecksum + " when appending to an existing block " + 267 "with different chunk size: " + checksum); 268 } 269 270 int bytesPerChunk = checksum.getBytesPerChecksum(); 271 int checksumSize = checksum.getChecksumSize(); 272 273 blockDiskSize = bytesOnDisk; 274 crcDiskSize = BlockMetadataHeader.getHeaderSize() + 275 (blockDiskSize+bytesPerChunk-1)/bytesPerChunk*checksumSize; 276 if (blockDiskSize>0 && 277 (blockDiskSize>blockFile.length() || crcDiskSize>metaFile.length())) { 278 throw new IOException("Corrupted block: " + this); 279 } 280 checkedMeta = true; 281 } finally { 282 if (!checkedMeta) { 283 // clean up in case of exceptions. 284 IOUtils.closeStream(metaRAF); 285 } 286 } 287 } else { 288 // for create, we can use the requested checksum 289 checksum = requestedChecksum; 290 } 291 292 FileOutputStream blockOut = null; 293 FileOutputStream crcOut = null; 294 try { 295 blockOut = new FileOutputStream( 296 new RandomAccessFile( blockFile, "rw" ).getFD() ); 297 crcOut = new FileOutputStream(metaRAF.getFD() ); 298 if (!isCreate) { 299 blockOut.getChannel().position(blockDiskSize); 300 crcOut.getChannel().position(crcDiskSize); 301 } 302 return new ReplicaOutputStreams(blockOut, crcOut, checksum, 303 getVolume().isTransientStorage()); 304 } catch (IOException e) { 305 IOUtils.closeStream(blockOut); 306 IOUtils.closeStream(metaRAF); 307 throw e; 308 } 309 } 310 311 @Override 312 public OutputStream createRestartMetaStream() throws IOException { 313 File blockFile = getBlockFile(); 314 File restartMeta = new File(blockFile.getParent() + 315 File.pathSeparator + "." + blockFile.getName() + ".restart"); 316 if (restartMeta.exists() && !restartMeta.delete()) { 317 DataNode.LOG.warn("Failed to delete restart meta file: " + 318 restartMeta.getPath()); 319 } 320 return new FileOutputStream(restartMeta); 321 } 322 323 @Override 324 public String toString() { 325 return super.toString() 326 + "\n bytesAcked=" + bytesAcked 327 + "\n bytesOnDisk=" + bytesOnDisk; 328 } 329}