001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hdfs.server.blockmanagement;
019
020import org.apache.hadoop.hdfs.protocol.Block;
021import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
022import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
023import org.apache.hadoop.hdfs.server.namenode.NameNode;
024
025import java.util.ArrayList;
026import java.util.List;
027
028import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState.COMPLETE;
029
030/**
031 * Represents the under construction feature of a Block.
032 * This is usually the last block of a file opened for write or append.
033 */
034public class BlockUnderConstructionFeature {
035  private BlockUCState blockUCState;
036  private static final ReplicaUnderConstruction[] NO_REPLICAS =
037      new ReplicaUnderConstruction[0];
038
039  /**
040   * Block replicas as assigned when the block was allocated.
041   */
042  private ReplicaUnderConstruction[] replicas = NO_REPLICAS;
043
044  /**
045   * Index of the primary data node doing the recovery. Useful for log
046   * messages.
047   */
048  private int primaryNodeIndex = -1;
049
050  /**
051   * The new generation stamp, which this block will have
052   * after the recovery succeeds. Also used as a recovery id to identify
053   * the right recovery if any of the abandoned recoveries re-appear.
054   */
055  private long blockRecoveryId = 0;
056
057  /**
058   * The block source to use in the event of copy-on-write truncate.
059   */
060  private Block truncateBlock;
061
062  public BlockUnderConstructionFeature(Block blk,
063      BlockUCState state, DatanodeStorageInfo[] targets) {
064    assert getBlockUCState() != COMPLETE :
065        "BlockUnderConstructionFeature cannot be in COMPLETE state";
066    this.blockUCState = state;
067    setExpectedLocations(blk, targets);
068  }
069
070  /** Set expected locations */
071  public void setExpectedLocations(Block block, DatanodeStorageInfo[] targets) {
072    int numLocations = targets == null ? 0 : targets.length;
073    this.replicas = new ReplicaUnderConstruction[numLocations];
074    for(int i = 0; i < numLocations; i++) {
075      replicas[i] = new ReplicaUnderConstruction(block, targets[i],
076          ReplicaState.RBW);
077    }
078  }
079
080  /**
081   * Create array of expected replica locations
082   * (as has been assigned by chooseTargets()).
083   */
084  public DatanodeStorageInfo[] getExpectedStorageLocations() {
085    int numLocations = getNumExpectedLocations();
086    DatanodeStorageInfo[] storages = new DatanodeStorageInfo[numLocations];
087    for (int i = 0; i < numLocations; i++) {
088      storages[i] = replicas[i].getExpectedStorageLocation();
089    }
090    return storages;
091  }
092
093  /** Get the number of expected locations */
094  public int getNumExpectedLocations() {
095    return replicas.length;
096  }
097
098  /**
099   * Return the state of the block under construction.
100   * @see BlockUCState
101   */
102  public BlockUCState getBlockUCState() {
103    return blockUCState;
104  }
105
106  void setBlockUCState(BlockUCState s) {
107    blockUCState = s;
108  }
109
110  public long getBlockRecoveryId() {
111    return blockRecoveryId;
112  }
113
114  /** Get recover block */
115  public Block getTruncateBlock() {
116    return truncateBlock;
117  }
118
119  public void setTruncateBlock(Block recoveryBlock) {
120    this.truncateBlock = recoveryBlock;
121  }
122
123  /**
124   * Set {@link #blockUCState} to {@link BlockUCState#COMMITTED}.
125   */
126  void commit() {
127    blockUCState = BlockUCState.COMMITTED;
128  }
129
130  List<ReplicaUnderConstruction> getStaleReplicas(long genStamp) {
131    List<ReplicaUnderConstruction> staleReplicas = new ArrayList<>();
132    // Remove replicas with wrong gen stamp. The replica list is unchanged.
133    for (ReplicaUnderConstruction r : replicas) {
134      if (genStamp != r.getGenerationStamp()) {
135        staleReplicas.add(r);
136      }
137    }
138    return staleReplicas;
139  }
140
141  /**
142   * Initialize lease recovery for this block.
143   * Find the first alive data-node starting from the previous primary and
144   * make it primary.
145   */
146  public void initializeBlockRecovery(BlockInfo blockInfo, long recoveryId) {
147    setBlockUCState(BlockUCState.UNDER_RECOVERY);
148    blockRecoveryId = recoveryId;
149    if (replicas.length == 0) {
150      NameNode.blockStateChangeLog.warn("BLOCK*" +
151          " BlockUnderConstructionFeature.initializeBlockRecovery:" +
152          " No blocks found, lease removed.");
153      // sets primary node index and return.
154      primaryNodeIndex = -1;
155      return;
156    }
157    boolean allLiveReplicasTriedAsPrimary = true;
158    for (ReplicaUnderConstruction replica : replicas) {
159      // Check if all replicas have been tried or not.
160      if (replica.isAlive()) {
161        allLiveReplicasTriedAsPrimary = allLiveReplicasTriedAsPrimary
162            && replica.getChosenAsPrimary();
163      }
164    }
165    if (allLiveReplicasTriedAsPrimary) {
166      // Just set all the replicas to be chosen whether they are alive or not.
167      for (ReplicaUnderConstruction replica : replicas) {
168        replica.setChosenAsPrimary(false);
169      }
170    }
171    long mostRecentLastUpdate = 0;
172    ReplicaUnderConstruction primary = null;
173    primaryNodeIndex = -1;
174    for (int i = 0; i < replicas.length; i++) {
175      // Skip alive replicas which have been chosen for recovery.
176      if (!(replicas[i].isAlive() && !replicas[i].getChosenAsPrimary())) {
177        continue;
178      }
179      final ReplicaUnderConstruction ruc = replicas[i];
180      final long lastUpdate = ruc.getExpectedStorageLocation()
181          .getDatanodeDescriptor().getLastUpdateMonotonic();
182      if (lastUpdate > mostRecentLastUpdate) {
183        primaryNodeIndex = i;
184        primary = ruc;
185        mostRecentLastUpdate = lastUpdate;
186      }
187    }
188    if (primary != null) {
189      primary.getExpectedStorageLocation().getDatanodeDescriptor()
190          .addBlockToBeRecovered(blockInfo);
191      primary.setChosenAsPrimary(true);
192      NameNode.blockStateChangeLog.debug(
193          "BLOCK* {} recovery started, primary={}", this, primary);
194    }
195  }
196
197  /** Add the reported replica if it is not already in the replica list. */
198  void addReplicaIfNotPresent(DatanodeStorageInfo storage,
199      Block reportedBlock, ReplicaState rState) {
200    if (replicas.length == 0) {
201      replicas = new ReplicaUnderConstruction[1];
202      replicas[0] = new ReplicaUnderConstruction(reportedBlock, storage,
203          rState);
204    } else {
205      for (int i = 0; i < replicas.length; i++) {
206        DatanodeStorageInfo expected =
207            replicas[i].getExpectedStorageLocation();
208        if (expected == storage) {
209          replicas[i].setGenerationStamp(reportedBlock.getGenerationStamp());
210          return;
211        } else if (expected != null && expected.getDatanodeDescriptor() ==
212            storage.getDatanodeDescriptor()) {
213          // The Datanode reported that the block is on a different storage
214          // than the one chosen by BlockPlacementPolicy. This can occur as
215          // we allow Datanodes to choose the target storage. Update our
216          // state by removing the stale entry and adding a new one.
217          replicas[i] = new ReplicaUnderConstruction(reportedBlock, storage,
218              rState);
219          return;
220        }
221      }
222      ReplicaUnderConstruction[] newReplicas =
223          new ReplicaUnderConstruction[replicas.length + 1];
224      System.arraycopy(replicas, 0, newReplicas, 0, replicas.length);
225      newReplicas[newReplicas.length - 1] = new ReplicaUnderConstruction(
226          reportedBlock, storage, rState);
227      replicas = newReplicas;
228    }
229  }
230
231  @Override
232  public String toString() {
233    final StringBuilder b = new StringBuilder(100);
234    appendUCParts(b);
235    return b.toString();
236  }
237
238  private void appendUCParts(StringBuilder sb) {
239    sb.append("{UCState=").append(blockUCState)
240      .append(", truncateBlock=").append(truncateBlock)
241      .append(", primaryNodeIndex=").append(primaryNodeIndex)
242      .append(", replicas=[");
243    int i = 0;
244    for (ReplicaUnderConstruction r : replicas) {
245      r.appendStringTo(sb);
246      if (++i < replicas.length) {
247        sb.append(", ");
248      }
249    }
250    sb.append("]}");
251  }
252
253  public void appendUCPartsConcise(StringBuilder sb) {
254    sb.append("replicas=");
255    int i = 0;
256    for (ReplicaUnderConstruction r : replicas) {
257      sb.append(r.getExpectedStorageLocation().getDatanodeDescriptor());
258      if (++i < replicas.length) {
259        sb.append(", ");
260      }
261    }
262  }
263}