001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hdfs.server.datanode;
019
020import java.io.File;
021import java.io.FileOutputStream;
022import java.io.IOException;
023import java.io.OutputStream;
024import java.io.RandomAccessFile;
025import java.util.concurrent.atomic.AtomicReference;
026
027import org.apache.hadoop.hdfs.protocol.Block;
028import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
029import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
030import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
031import org.apache.hadoop.io.IOUtils;
032import org.apache.hadoop.util.DataChecksum;
033import org.apache.hadoop.util.StringUtils;
034
035/** 
036 * This class defines a replica in a pipeline, which
037 * includes a persistent replica being written to by a dfs client or
038 * a temporary replica being replicated by a source datanode or
039 * being copied for the balancing purpose.
040 * 
041 * The base class implements a temporary replica
042 */
043public class ReplicaInPipeline extends ReplicaInfo
044                        implements ReplicaInPipelineInterface {
045  private long bytesAcked;
046  private long bytesOnDisk;
047  private byte[] lastChecksum;  
048  private AtomicReference<Thread> writer = new AtomicReference<Thread>();
049
050  /**
051   * Bytes reserved for this replica on the containing volume.
052   * Based off difference between the estimated maximum block length and
053   * the bytes already written to this block.
054   */
055  private long bytesReserved;
056  private final long originalBytesReserved;
057
058  /**
059   * Constructor for a zero length replica
060   * @param blockId block id
061   * @param genStamp replica generation stamp
062   * @param vol volume where replica is located
063   * @param dir directory path where block and meta files are located
064   * @param bytesToReserve disk space to reserve for this replica, based on
065   *                       the estimated maximum block length.
066   */
067  public ReplicaInPipeline(long blockId, long genStamp, 
068        FsVolumeSpi vol, File dir, long bytesToReserve) {
069    this(blockId, 0L, genStamp, vol, dir, Thread.currentThread(), bytesToReserve);
070  }
071
072  /**
073   * Constructor
074   * @param block a block
075   * @param vol volume where replica is located
076   * @param dir directory path where block and meta files are located
077   * @param writer a thread that is writing to this replica
078   */
079  ReplicaInPipeline(Block block, 
080      FsVolumeSpi vol, File dir, Thread writer) {
081    this( block.getBlockId(), block.getNumBytes(), block.getGenerationStamp(),
082        vol, dir, writer, 0L);
083  }
084
085  /**
086   * Constructor
087   * @param blockId block id
088   * @param len replica length
089   * @param genStamp replica generation stamp
090   * @param vol volume where replica is located
091   * @param dir directory path where block and meta files are located
092   * @param writer a thread that is writing to this replica
093   * @param bytesToReserve disk space to reserve for this replica, based on
094   *                       the estimated maximum block length.
095   */
096  ReplicaInPipeline(long blockId, long len, long genStamp,
097      FsVolumeSpi vol, File dir, Thread writer, long bytesToReserve) {
098    super( blockId, len, genStamp, vol, dir);
099    this.bytesAcked = len;
100    this.bytesOnDisk = len;
101    this.writer.set(writer);
102    this.bytesReserved = bytesToReserve;
103    this.originalBytesReserved = bytesToReserve;
104  }
105
106  /**
107   * Copy constructor.
108   * @param from where to copy from
109   */
110  public ReplicaInPipeline(ReplicaInPipeline from) {
111    super(from);
112    this.bytesAcked = from.getBytesAcked();
113    this.bytesOnDisk = from.getBytesOnDisk();
114    this.writer.set(from.writer.get());
115    this.bytesReserved = from.bytesReserved;
116    this.originalBytesReserved = from.originalBytesReserved;
117  }
118
119  @Override
120  public long getVisibleLength() {
121    return -1;
122  }
123  
124  @Override  //ReplicaInfo
125  public ReplicaState getState() {
126    return ReplicaState.TEMPORARY;
127  }
128  
129  @Override // ReplicaInPipelineInterface
130  public long getBytesAcked() {
131    return bytesAcked;
132  }
133  
134  @Override // ReplicaInPipelineInterface
135  public void setBytesAcked(long bytesAcked) {
136    long newBytesAcked = bytesAcked - this.bytesAcked;
137    this.bytesAcked = bytesAcked;
138
139    // Once bytes are ACK'ed we can release equivalent space from the
140    // volume's reservedForRbw count. We could have released it as soon
141    // as the write-to-disk completed but that would be inefficient.
142    getVolume().releaseReservedSpace(newBytesAcked);
143    bytesReserved -= newBytesAcked;
144  }
145  
146  @Override // ReplicaInPipelineInterface
147  public long getBytesOnDisk() {
148    return bytesOnDisk;
149  }
150
151  @Override
152  public long getBytesReserved() {
153    return bytesReserved;
154  }
155  
156  @Override
157  public long getOriginalBytesReserved() {
158    return originalBytesReserved;
159  }
160
161  @Override
162  public void releaseAllBytesReserved() {  // ReplicaInPipelineInterface
163    getVolume().releaseReservedSpace(bytesReserved);
164    getVolume().releaseLockedMemory(bytesReserved);
165    bytesReserved = 0;
166  }
167
168  @Override // ReplicaInPipelineInterface
169  public synchronized void setLastChecksumAndDataLen(long dataLength, byte[] lastChecksum) {
170    this.bytesOnDisk = dataLength;
171    this.lastChecksum = lastChecksum;
172  }
173  
174  @Override // ReplicaInPipelineInterface
175  public synchronized ChunkChecksum getLastChecksumAndDataLen() {
176    return new ChunkChecksum(getBytesOnDisk(), lastChecksum);
177  }
178
179  public void interruptThread() {
180    Thread thread = writer.get();
181    if (thread != null && thread != Thread.currentThread() 
182        && thread.isAlive()) {
183      thread.interrupt();
184    }
185  }
186
187  @Override  // Object
188  public boolean equals(Object o) {
189    return super.equals(o);
190  }
191  
192  /**
193   * Attempt to set the writer to a new value.
194   */
195  public boolean attemptToSetWriter(Thread prevWriter, Thread newWriter) {
196    return writer.compareAndSet(prevWriter, newWriter);
197  }
198
199  /**
200   * Interrupt the writing thread and wait until it dies
201   * @throws IOException the waiting is interrupted
202   */
203  public void stopWriter(long xceiverStopTimeout) throws IOException {
204    while (true) {
205      Thread thread = writer.get();
206      if ((thread == null) || (thread == Thread.currentThread()) ||
207          (!thread.isAlive())) {
208        if (writer.compareAndSet(thread, null) == true) {
209          return; // Done
210        }
211        // The writer changed.  Go back to the start of the loop and attempt to
212        // stop the new writer.
213        continue;
214      }
215      thread.interrupt();
216      try {
217        thread.join(xceiverStopTimeout);
218        if (thread.isAlive()) {
219          // Our thread join timed out.
220          final String msg = "Join on writer thread " + thread + " timed out";
221          DataNode.LOG.warn(msg + "\n" + StringUtils.getStackTrace(thread));
222          throw new IOException(msg);
223        }
224      } catch (InterruptedException e) {
225        throw new IOException("Waiting for writer thread is interrupted.");
226      }
227    }
228  }
229
230  @Override  // Object
231  public int hashCode() {
232    return super.hashCode();
233  }
234  
235  @Override // ReplicaInPipelineInterface
236  public ReplicaOutputStreams createStreams(boolean isCreate, 
237      DataChecksum requestedChecksum) throws IOException {
238    File blockFile = getBlockFile();
239    File metaFile = getMetaFile();
240    if (DataNode.LOG.isDebugEnabled()) {
241      DataNode.LOG.debug("writeTo blockfile is " + blockFile +
242                         " of size " + blockFile.length());
243      DataNode.LOG.debug("writeTo metafile is " + metaFile +
244                         " of size " + metaFile.length());
245    }
246    long blockDiskSize = 0L;
247    long crcDiskSize = 0L;
248    
249    // the checksum that should actually be used -- this
250    // may differ from requestedChecksum for appends.
251    final DataChecksum checksum;
252    
253    RandomAccessFile metaRAF = new RandomAccessFile(metaFile, "rw");
254    
255    if (!isCreate) {
256      // For append or recovery, we must enforce the existing checksum.
257      // Also, verify that the file has correct lengths, etc.
258      boolean checkedMeta = false;
259      try {
260        BlockMetadataHeader header = BlockMetadataHeader.readHeader(metaRAF);
261        checksum = header.getChecksum();
262        
263        if (checksum.getBytesPerChecksum() !=
264            requestedChecksum.getBytesPerChecksum()) {
265          throw new IOException("Client requested checksum " +
266              requestedChecksum + " when appending to an existing block " +
267              "with different chunk size: " + checksum);
268        }
269        
270        int bytesPerChunk = checksum.getBytesPerChecksum();
271        int checksumSize = checksum.getChecksumSize();
272        
273        blockDiskSize = bytesOnDisk;
274        crcDiskSize = BlockMetadataHeader.getHeaderSize() +
275          (blockDiskSize+bytesPerChunk-1)/bytesPerChunk*checksumSize;
276        if (blockDiskSize>0 && 
277            (blockDiskSize>blockFile.length() || crcDiskSize>metaFile.length())) {
278          throw new IOException("Corrupted block: " + this);
279        }
280        checkedMeta = true;
281      } finally {
282        if (!checkedMeta) {
283          // clean up in case of exceptions.
284          IOUtils.closeStream(metaRAF);
285        }
286      }
287    } else {
288      // for create, we can use the requested checksum
289      checksum = requestedChecksum;
290    }
291    
292    FileOutputStream blockOut = null;
293    FileOutputStream crcOut = null;
294    try {
295      blockOut = new FileOutputStream(
296          new RandomAccessFile( blockFile, "rw" ).getFD() );
297      crcOut = new FileOutputStream(metaRAF.getFD() );
298      if (!isCreate) {
299        blockOut.getChannel().position(blockDiskSize);
300        crcOut.getChannel().position(crcDiskSize);
301      }
302      return new ReplicaOutputStreams(blockOut, crcOut, checksum,
303          getVolume().isTransientStorage());
304    } catch (IOException e) {
305      IOUtils.closeStream(blockOut);
306      IOUtils.closeStream(metaRAF);
307      throw e;
308    }
309  }
310
311  @Override
312  public OutputStream createRestartMetaStream() throws IOException {
313    File blockFile = getBlockFile();
314    File restartMeta = new File(blockFile.getParent()  +
315        File.pathSeparator + "." + blockFile.getName() + ".restart");
316    if (restartMeta.exists() && !restartMeta.delete()) {
317      DataNode.LOG.warn("Failed to delete restart meta file: " +
318          restartMeta.getPath());
319    }
320    return new FileOutputStream(restartMeta);
321  }
322
323  @Override
324  public String toString() {
325    return super.toString()
326        + "\n  bytesAcked=" + bytesAcked
327        + "\n  bytesOnDisk=" + bytesOnDisk;
328  }
329}