001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hdfs.server.namenode;
019
020import static org.apache.commons.lang.StringEscapeUtils.escapeJava;
021import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_DEFAULT;
022import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_KEY;
023import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_CALLER_CONTEXT_ENABLED_DEFAULT;
024import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_CALLER_CONTEXT_ENABLED_KEY;
025import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_CALLER_CONTEXT_MAX_SIZE_DEFAULT;
026import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_CALLER_CONTEXT_MAX_SIZE_KEY;
027import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_CALLER_CONTEXT_SIGNATURE_MAX_SIZE_DEFAULT;
028import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_CALLER_CONTEXT_SIGNATURE_MAX_SIZE_KEY;
029import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
030import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
031import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT;
032import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
033import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT;
034import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY;
035import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CHECKSUM_TYPE_DEFAULT;
036import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CHECKSUM_TYPE_KEY;
037import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT;
038import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY;
039import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_DEFAULT;
040import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY;
041import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_STANDBY_CHECKPOINTS_DEFAULT;
042import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_STANDBY_CHECKPOINTS_KEY;
043import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AUDIT_LOGGERS_KEY;
044import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AUDIT_LOG_ASYNC_DEFAULT;
045import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AUDIT_LOG_ASYNC_KEY;
046import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AUDIT_LOG_TOKEN_TRACKING_ID_DEFAULT;
047import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AUDIT_LOG_TOKEN_TRACKING_ID_KEY;
048import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_TXNS_DEFAULT;
049import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_TXNS_KEY;
050import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DEFAULT_AUDIT_LOGGER_NAME;
051import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT;
052import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DELEGATION_KEY_UPDATE_INTERVAL_KEY;
053import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_DEFAULT;
054import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY;
055import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT;
056import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_MAX_LIFETIME_KEY;
057import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT;
058import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_RENEW_INTERVAL_KEY;
059import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY;
060import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_REQUIRED_KEY;
061import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_EDIT_LOG_AUTOROLL_CHECK_INTERVAL_MS;
062import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_EDIT_LOG_AUTOROLL_CHECK_INTERVAL_MS_DEFAULT;
063import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_EDIT_LOG_AUTOROLL_MULTIPLIER_THRESHOLD;
064import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_EDIT_LOG_AUTOROLL_MULTIPLIER_THRESHOLD_DEFAULT;
065import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ENABLE_RETRY_CACHE_DEFAULT;
066import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ENABLE_RETRY_CACHE_KEY;
067import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC;
068import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC_DEFAULT;
069import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_INODE_ATTRIBUTES_PROVIDER_KEY;
070import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_MAX_OBJECTS_DEFAULT;
071import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_MAX_OBJECTS_KEY;
072import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY;
073import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REPL_QUEUE_THRESHOLD_PCT_KEY;
074import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RESOURCE_CHECK_INTERVAL_DEFAULT;
075import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RESOURCE_CHECK_INTERVAL_KEY;
076import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RETRY_CACHE_EXPIRYTIME_MILLIS_DEFAULT;
077import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RETRY_CACHE_EXPIRYTIME_MILLIS_KEY;
078import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RETRY_CACHE_HEAP_PERCENT_DEFAULT;
079import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RETRY_CACHE_HEAP_PERCENT_KEY;
080import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SAFEMODE_EXTENSION_KEY;
081import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SAFEMODE_MIN_DATANODES_DEFAULT;
082import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SAFEMODE_MIN_DATANODES_KEY;
083import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_DEFAULT;
084import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_KEY;
085import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY;
086import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LEASE_RECHECK_INTERVAL_MS_KEY;
087import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LEASE_RECHECK_INTERVAL_MS_DEFAULT;
088import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_MAX_LOCK_HOLD_TO_RELEASE_LEASE_MS_KEY;
089import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_MAX_LOCK_HOLD_TO_RELEASE_LEASE_MS_DEFAULT;
090import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_ENABLED_DEFAULT;
091import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY;
092import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_DEFAULT;
093import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_KEY;
094import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_DEFAULT;
095import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_KEY;
096import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SUPPORT_APPEND_DEFAULT;
097import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SUPPORT_APPEND_KEY;
098import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.SECURITY_XATTR_UNREADABLE_BY_SUPERUSER;
099import static org.apache.hadoop.hdfs.server.namenode.FSDirStatAndListingOp.*;
100import static org.apache.hadoop.util.Time.now;
101import static org.apache.hadoop.util.Time.monotonicNow;
102import static org.apache.hadoop.hdfs.server.namenode.top.metrics.TopMetrics.TOPMETRICS_METRICS_SOURCE_NAME;
103
104import java.io.BufferedWriter;
105import java.io.ByteArrayInputStream;
106import java.io.DataInput;
107import java.io.DataInputStream;
108import java.io.DataOutputStream;
109import java.io.File;
110import java.io.FileNotFoundException;
111import java.io.FileOutputStream;
112import java.io.IOException;
113import java.io.OutputStreamWriter;
114import java.io.PrintWriter;
115import java.io.StringWriter;
116import java.lang.management.ManagementFactory;
117import java.net.InetAddress;
118import java.net.URI;
119import java.util.ArrayList;
120import java.util.Arrays;
121import java.util.Collection;
122import java.util.Collections;
123import java.util.Date;
124import java.util.EnumSet;
125import java.util.HashMap;
126import java.util.HashSet;
127import java.util.Iterator;
128import java.util.LinkedHashSet;
129import java.util.List;
130import java.util.Map;
131import java.util.Map.Entry;
132import java.util.Set;
133import java.util.TreeMap;
134import java.util.concurrent.ExecutorService;
135import java.util.concurrent.Executors;
136import java.util.concurrent.TimeUnit;
137import java.util.concurrent.locks.Condition;
138import java.util.concurrent.locks.ReentrantLock;
139import java.util.concurrent.locks.ReentrantReadWriteLock;
140
141import javax.management.NotCompliantMBeanException;
142import javax.management.ObjectName;
143import javax.management.StandardMBean;
144
145import org.apache.commons.logging.Log;
146import org.apache.commons.logging.LogFactory;
147import org.apache.commons.logging.impl.Log4JLogger;
148import org.apache.hadoop.HadoopIllegalArgumentException;
149import org.apache.hadoop.classification.InterfaceAudience;
150import org.apache.hadoop.conf.Configuration;
151import org.apache.hadoop.crypto.CryptoProtocolVersion;
152import org.apache.hadoop.crypto.key.KeyProvider;
153import org.apache.hadoop.crypto.CryptoCodec;
154import org.apache.hadoop.crypto.key.KeyProvider.Metadata;
155import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
156import org.apache.hadoop.hdfs.AddBlockFlag;
157import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries;
158import org.apache.hadoop.fs.CacheFlag;
159import org.apache.hadoop.fs.ContentSummary;
160import org.apache.hadoop.fs.CreateFlag;
161import org.apache.hadoop.fs.FileEncryptionInfo;
162import org.apache.hadoop.fs.FileStatus;
163import org.apache.hadoop.fs.FileSystem;
164import org.apache.hadoop.fs.FsServerDefaults;
165import org.apache.hadoop.fs.InvalidPathException;
166import org.apache.hadoop.fs.Options;
167import org.apache.hadoop.fs.Path;
168import org.apache.hadoop.fs.UnresolvedLinkException;
169import org.apache.hadoop.fs.XAttr;
170import org.apache.hadoop.fs.XAttrSetFlag;
171import org.apache.hadoop.fs.permission.AclEntry;
172import org.apache.hadoop.fs.permission.AclStatus;
173import org.apache.hadoop.fs.permission.FsAction;
174import org.apache.hadoop.fs.permission.FsPermission;
175import org.apache.hadoop.fs.permission.PermissionStatus;
176import org.apache.hadoop.fs.StorageType;
177import org.apache.hadoop.fs.QuotaUsage;
178import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
179import org.apache.hadoop.ha.ServiceFailedException;
180import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
181import org.apache.hadoop.hdfs.DFSConfigKeys;
182import org.apache.hadoop.hdfs.DFSUtil;
183import org.apache.hadoop.hdfs.HAUtil;
184import org.apache.hadoop.hdfs.HdfsConfiguration;
185import org.apache.hadoop.hdfs.UnknownCryptoProtocolVersionException;
186import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
187import org.apache.hadoop.hdfs.protocol.Block;
188import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
189import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
190import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
191import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
192import org.apache.hadoop.hdfs.protocol.ClientProtocol;
193import org.apache.hadoop.hdfs.protocol.DatanodeID;
194import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
195import org.apache.hadoop.hdfs.protocol.DirectoryListing;
196import org.apache.hadoop.hdfs.protocol.EncryptionZone;
197import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
198import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus;
199import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
200import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
201import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
202import org.apache.hadoop.hdfs.protocol.LocatedBlock;
203import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
204import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
205import org.apache.hadoop.hdfs.protocol.RollingUpgradeException;
206import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
207import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException;
208import org.apache.hadoop.hdfs.protocol.SnapshotException;
209import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
210import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
211import org.apache.hadoop.hdfs.protocol.datatransfer.ReplaceDatanodeOnFailure;
212import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
213import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
214import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager;
215import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager.SecretManagerState;
216import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection;
217import org.apache.hadoop.hdfs.server.blockmanagement.BlockIdManager;
218import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
219import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
220import org.apache.hadoop.hdfs.server.blockmanagement.BlockUnderConstructionFeature;
221import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
222import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
223import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStatistics;
224import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
225import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
226import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
227import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.RollingUpgradeStartupOption;
228import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
229import org.apache.hadoop.hdfs.server.common.Storage;
230import org.apache.hadoop.hdfs.server.common.Storage.StorageDirType;
231import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
232import org.apache.hadoop.hdfs.server.common.Util;
233import org.apache.hadoop.hdfs.server.namenode.FSDirEncryptionZoneOp.EncryptionKeyInfo;
234import org.apache.hadoop.hdfs.server.namenode.FSDirectory.DirOp;
235import org.apache.hadoop.hdfs.server.namenode.FsImageProto.SecretManagerSection;
236import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
237import org.apache.hadoop.hdfs.server.namenode.JournalSet.JournalAndStream;
238import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
239import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
240import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
241import org.apache.hadoop.hdfs.server.namenode.NameNodeLayoutVersion.Feature;
242import org.apache.hadoop.hdfs.server.namenode.ha.EditLogTailer;
243import org.apache.hadoop.hdfs.server.namenode.ha.HAContext;
244import org.apache.hadoop.hdfs.server.namenode.ha.StandbyCheckpointer;
245import org.apache.hadoop.hdfs.server.namenode.metrics.FSNamesystemMBean;
246import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
247import org.apache.hadoop.hdfs.server.namenode.snapshot.DirectorySnapshottableFeature;
248import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
249import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotManager;
250import org.apache.hadoop.hdfs.server.namenode.startupprogress.Phase;
251import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress;
252import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress.Counter;
253import org.apache.hadoop.hdfs.server.namenode.startupprogress.Status;
254import org.apache.hadoop.hdfs.server.namenode.startupprogress.Step;
255import org.apache.hadoop.hdfs.server.namenode.startupprogress.StepType;
256import org.apache.hadoop.hdfs.server.namenode.top.TopAuditLogger;
257import org.apache.hadoop.hdfs.server.namenode.top.TopConf;
258import org.apache.hadoop.hdfs.server.namenode.top.metrics.TopMetrics;
259import org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager;
260import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
261import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
262import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
263import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
264import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat;
265import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
266import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
267import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
268import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
269import org.apache.hadoop.hdfs.server.protocol.StorageReport;
270import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
271import org.apache.hadoop.hdfs.web.JsonUtil;
272import org.apache.hadoop.io.IOUtils;
273import org.apache.hadoop.io.Text;
274import org.apache.hadoop.ipc.CallerContext;
275import org.apache.hadoop.ipc.RetriableException;
276import org.apache.hadoop.ipc.RetryCache;
277import org.apache.hadoop.ipc.Server;
278import org.apache.hadoop.ipc.StandbyException;
279import org.apache.hadoop.metrics2.annotation.Metric;
280import org.apache.hadoop.metrics2.annotation.Metrics;
281import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
282import org.apache.hadoop.metrics2.lib.MetricsRegistry;
283import org.apache.hadoop.metrics2.lib.MutableRatesWithAggregation;
284import org.apache.hadoop.metrics2.util.MBeans;
285import org.apache.hadoop.net.NetworkTopology;
286import org.apache.hadoop.net.Node;
287import org.apache.hadoop.security.AccessControlException;
288import org.apache.hadoop.security.UserGroupInformation;
289import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
290import org.apache.hadoop.security.token.SecretManager.InvalidToken;
291import org.apache.hadoop.security.token.Token;
292import org.apache.hadoop.security.token.TokenIdentifier;
293import org.apache.hadoop.security.token.delegation.DelegationKey;
294import org.apache.hadoop.util.Daemon;
295import org.apache.hadoop.util.DataChecksum;
296import org.apache.hadoop.util.ReflectionUtils;
297import org.apache.hadoop.util.StringUtils;
298import org.apache.hadoop.util.VersionInfo;
299import org.apache.log4j.Appender;
300import org.apache.log4j.AsyncAppender;
301import org.apache.log4j.Logger;
302import org.mortbay.util.ajax.JSON;
303
304import com.google.common.annotations.VisibleForTesting;
305import com.google.common.base.Charsets;
306import com.google.common.base.Preconditions;
307import com.google.common.collect.ImmutableMap;
308import com.google.common.collect.Lists;
309import com.google.common.util.concurrent.ThreadFactoryBuilder;
310
311/***************************************************
312 * FSNamesystem does the actual bookkeeping work for the
313 * DataNode.
314 *
315 * It tracks several important tables.
316 *
317 * 1)  valid fsname --> blocklist  (kept on disk, logged)
318 * 2)  Set of all valid blocks (inverted #1)
319 * 3)  block --> machinelist (kept in memory, rebuilt dynamically from reports)
320 * 4)  machine --> blocklist (inverted #2)
321 * 5)  LRU cache of updated-heartbeat machines
322 ***************************************************/
323@InterfaceAudience.Private
324@Metrics(context="dfs")
325public class FSNamesystem implements Namesystem, FSNamesystemMBean,
326  NameNodeMXBean {
327  public static final Log LOG = LogFactory.getLog(FSNamesystem.class);
328  private final MetricsRegistry registry = new MetricsRegistry("FSNamesystem");
329  @Metric final MutableRatesWithAggregation detailedLockHoldTimeMetrics =
330      registry.newRatesWithAggregation("detailedLockHoldTimeMetrics");
331
332  private final BlockIdManager blockIdManager;
333
334  boolean isAuditEnabled() {
335    return (!isDefaultAuditLogger || auditLog.isInfoEnabled())
336        && !auditLoggers.isEmpty();
337  }
338
339  private void logAuditEvent(boolean succeeded, String cmd, String src)
340      throws IOException {
341    logAuditEvent(succeeded, cmd, src, null, null);
342  }
343  
344  private void logAuditEvent(boolean succeeded, String cmd, String src,
345      String dst, HdfsFileStatus stat) throws IOException {
346    if (isAuditEnabled() && isExternalInvocation()) {
347      logAuditEvent(succeeded, Server.getRemoteUser(), Server.getRemoteIp(),
348                    cmd, src, dst, stat);
349    }
350  }
351
352  private void logAuditEvent(boolean succeeded,
353      UserGroupInformation ugi, InetAddress addr, String cmd, String src,
354      String dst, HdfsFileStatus stat) {
355    FileStatus status = null;
356    if (stat != null) {
357      Path symlink = stat.isSymlink() ? new Path(stat.getSymlink()) : null;
358      Path path = dst != null ? new Path(dst) : new Path(src);
359      status = new FileStatus(stat.getLen(), stat.isDir(),
360          stat.getReplication(), stat.getBlockSize(), stat.getModificationTime(),
361          stat.getAccessTime(), stat.getPermission(), stat.getOwner(),
362          stat.getGroup(), symlink, path);
363    }
364    final String ugiStr = ugi.toString();
365    for (AuditLogger logger : auditLoggers) {
366      if (logger instanceof HdfsAuditLogger) {
367        HdfsAuditLogger hdfsLogger = (HdfsAuditLogger) logger;
368        hdfsLogger.logAuditEvent(succeeded, ugiStr, addr, cmd, src, dst,
369            status, CallerContext.getCurrent(), ugi, dtSecretManager);
370      } else {
371        logger.logAuditEvent(succeeded, ugiStr, addr, cmd, src, dst, status);
372      }
373    }
374  }
375
376  /**
377   * Logger for audit events, noting successful FSNamesystem operations. Emits
378   * to FSNamesystem.audit at INFO. Each event causes a set of tab-separated
379   * <code>key=value</code> pairs to be written for the following properties:
380   * <code>
381   * ugi=&lt;ugi in RPC&gt;
382   * ip=&lt;remote IP&gt;
383   * cmd=&lt;command&gt;
384   * src=&lt;src path&gt;
385   * dst=&lt;dst path (optional)&gt;
386   * perm=&lt;permissions (optional)&gt;
387   * </code>
388   */
389  public static final Log auditLog = LogFactory.getLog(
390      FSNamesystem.class.getName() + ".audit");
391
392  static final int DEFAULT_MAX_CORRUPT_FILEBLOCKS_RETURNED = 100;
393  static int BLOCK_DELETION_INCREMENT = 1000;
394  private final boolean isPermissionEnabled;
395  private final UserGroupInformation fsOwner;
396  private final String supergroup;
397  private final boolean standbyShouldCheckpoint;
398
399  /** Interval between each check of lease to release. */
400  private final long leaseRecheckIntervalMs;
401  /** Maximum time the lock is hold to release lease. */
402  private final long maxLockHoldToReleaseLeaseMs;
403
404  // Scan interval is not configurable.
405  private static final long DELEGATION_TOKEN_REMOVER_SCAN_INTERVAL =
406    TimeUnit.MILLISECONDS.convert(1, TimeUnit.HOURS);
407  final DelegationTokenSecretManager dtSecretManager;
408  private final boolean alwaysUseDelegationTokensForTests;
409
410  private static final Step STEP_AWAITING_REPORTED_BLOCKS =
411    new Step(StepType.AWAITING_REPORTED_BLOCKS);
412
413  // Tracks whether the default audit logger is the only configured audit
414  // logger; this allows isAuditEnabled() to return false in case the
415  // underlying logger is disabled, and avoid some unnecessary work.
416  private final boolean isDefaultAuditLogger;
417  private final List<AuditLogger> auditLoggers;
418
419  /** The namespace tree. */
420  FSDirectory dir;
421  private final BlockManager blockManager;
422  private final SnapshotManager snapshotManager;
423  private final CacheManager cacheManager;
424  private final DatanodeStatistics datanodeStatistics;
425
426  private String nameserviceId;
427
428  private volatile RollingUpgradeInfo rollingUpgradeInfo = null;
429  /**
430   * A flag that indicates whether the checkpointer should checkpoint a rollback
431   * fsimage. The edit log tailer sets this flag. The checkpoint will create a
432   * rollback fsimage if the flag is true, and then change the flag to false.
433   */
434  private volatile boolean needRollbackFsImage;
435
436  // Block pool ID used by this namenode
437  private String blockPoolId;
438
439  final LeaseManager leaseManager = new LeaseManager(this); 
440
441  volatile Daemon smmthread = null;  // SafeModeMonitor thread
442  
443  Daemon nnrmthread = null; // NamenodeResourceMonitor thread
444
445  Daemon nnEditLogRoller = null; // NameNodeEditLogRoller thread
446
447  // A daemon to periodically clean up corrupt lazyPersist files
448  // from the name space.
449  Daemon lazyPersistFileScrubber = null;
450
451  // Executor to warm up EDEK cache
452  private ExecutorService edekCacheLoader = null;
453  private final int edekCacheLoaderDelay;
454  private final int edekCacheLoaderInterval;
455
456  /**
457   * When an active namenode will roll its own edit log, in # edits
458   */
459  private final long editLogRollerThreshold;
460  /**
461   * Check interval of an active namenode's edit log roller thread 
462   */
463  private final int editLogRollerInterval;
464
465  /**
466   * How frequently we scan and unlink corrupt lazyPersist files.
467   * (In seconds)
468   */
469  private final int lazyPersistFileScrubIntervalSec;
470
471  private volatile boolean hasResourcesAvailable = false;
472  private volatile boolean fsRunning = true;
473  
474  /** The start time of the namesystem. */
475  private final long startTime = now();
476
477  /** The interval of namenode checking for the disk space availability */
478  private final long resourceRecheckInterval;
479
480  // The actual resource checker instance.
481  NameNodeResourceChecker nnResourceChecker;
482
483  private final FsServerDefaults serverDefaults;
484  private final boolean supportAppends;
485  private final ReplaceDatanodeOnFailure dtpReplaceDatanodeOnFailure;
486
487  private volatile SafeModeInfo safeMode;  // safe mode information
488
489  private final long maxFsObjects;          // maximum number of fs objects
490
491  private final long minBlockSize;         // minimum block size
492  final long maxBlocksPerFile;     // maximum # of blocks per file
493  private final int numCommittedAllowed;
494
495  /** Lock to protect FSNamesystem. */
496  private final FSNamesystemLock fsLock;
497
498  /** 
499   * Checkpoint lock to protect FSNamesystem modification on standby NNs.
500   * Unlike fsLock, it does not affect block updates. On active NNs, this lock
501   * does not provide proper protection, because there are operations that
502   * modify both block and name system state.  Even on standby, fsLock is 
503   * used when block state changes need to be blocked.
504   */
505  private final ReentrantLock cpLock;
506
507  /**
508   * Used when this NN is in standby state to read from the shared edit log.
509   */
510  private EditLogTailer editLogTailer = null;
511
512  /**
513   * Used when this NN is in standby state to perform checkpoints.
514   */
515  private StandbyCheckpointer standbyCheckpointer;
516
517  /**
518   * Reference to the NN's HAContext object. This is only set once
519   * {@link #startCommonServices(Configuration, HAContext)} is called. 
520   */
521  private HAContext haContext;
522
523  private final boolean haEnabled;
524
525  /**
526   * Whether the namenode is in the middle of starting the active service
527   */
528  private volatile boolean startingActiveService = false;
529
530  private final RetryCache retryCache;
531
532  private KeyProviderCryptoExtension provider = null;
533
534  private volatile boolean imageLoaded = false;
535  private final Condition cond;
536
537  private final FSImage fsImage;
538
539  private final TopConf topConf;
540  private TopMetrics topMetrics;
541
542  private INodeAttributeProvider inodeAttributeProvider;
543
544  /**
545   * Notify that loading of this FSDirectory is complete, and
546   * it is imageLoaded for use
547   */
548  void imageLoadComplete() {
549    Preconditions.checkState(!imageLoaded, "FSDirectory already loaded");
550    setImageLoaded();
551  }
552
553  void setImageLoaded() {
554    if(imageLoaded) return;
555    writeLock();
556    try {
557      setImageLoaded(true);
558      dir.markNameCacheInitialized();
559      cond.signalAll();
560    } finally {
561      writeUnlock("setImageLoaded");
562    }
563  }
564
565  //This is for testing purposes only
566  @VisibleForTesting
567  boolean isImageLoaded() {
568    return imageLoaded;
569  }
570
571  // exposed for unit tests
572  protected void setImageLoaded(boolean flag) {
573    imageLoaded = flag;
574  }
575
576  /**
577   * Block until the object is imageLoaded to be used.
578   */
579  void waitForLoadingFSImage() {
580    if (!imageLoaded) {
581      writeLock();
582      try {
583        while (!imageLoaded) {
584          try {
585            cond.await(5000, TimeUnit.MILLISECONDS);
586          } catch (InterruptedException ignored) {
587          }
588        }
589      } finally {
590        writeUnlock();
591      }
592    }
593  }
594
595  /**
596   * Clear all loaded data
597   */
598  void clear() {
599    dir.reset();
600    dtSecretManager.reset();
601    blockIdManager.clear();
602    leaseManager.removeAllLeases();
603    snapshotManager.clearSnapshottableDirs();
604    cacheManager.clear();
605    setImageLoaded(false);
606    blockManager.clear();
607  }
608
609  @VisibleForTesting
610  LeaseManager getLeaseManager() {
611    return leaseManager;
612  }
613  
614  boolean isHaEnabled() {
615    return haEnabled;
616  }
617
618  /**
619   * Check the supplied configuration for correctness.
620   * @param conf Supplies the configuration to validate.
621   * @throws IOException if the configuration could not be queried.
622   * @throws IllegalArgumentException if the configuration is invalid.
623   */
624  private static void checkConfiguration(Configuration conf)
625      throws IOException {
626
627    final Collection<URI> namespaceDirs =
628        FSNamesystem.getNamespaceDirs(conf);
629    final Collection<URI> editsDirs =
630        FSNamesystem.getNamespaceEditsDirs(conf);
631    final Collection<URI> requiredEditsDirs =
632        FSNamesystem.getRequiredNamespaceEditsDirs(conf);
633    final Collection<URI> sharedEditsDirs =
634        FSNamesystem.getSharedEditsDirs(conf);
635
636    for (URI u : requiredEditsDirs) {
637      if (u.toString().compareTo(
638              DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_DEFAULT) == 0) {
639        continue;
640      }
641
642      // Each required directory must also be in editsDirs or in
643      // sharedEditsDirs.
644      if (!editsDirs.contains(u) &&
645          !sharedEditsDirs.contains(u)) {
646        throw new IllegalArgumentException("Required edits directory " + u
647            + " not found: "
648            + DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY + "=" + editsDirs + "; "
649            + DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_REQUIRED_KEY
650            + "=" + requiredEditsDirs + "; "
651            + DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY
652            + "=" + sharedEditsDirs);
653      }
654    }
655
656    if (namespaceDirs.size() == 1) {
657      LOG.warn("Only one image storage directory ("
658          + DFS_NAMENODE_NAME_DIR_KEY + ") configured. Beware of data loss"
659          + " due to lack of redundant storage directories!");
660    }
661    if (editsDirs.size() == 1) {
662      LOG.warn("Only one namespace edits storage directory ("
663          + DFS_NAMENODE_EDITS_DIR_KEY + ") configured. Beware of data loss"
664          + " due to lack of redundant storage directories!");
665    }
666  }
667
668  /**
669   * Instantiates an FSNamesystem loaded from the image and edits
670   * directories specified in the passed Configuration.
671   *
672   * @param conf the Configuration which specifies the storage directories
673   *             from which to load
674   * @return an FSNamesystem which contains the loaded namespace
675   * @throws IOException if loading fails
676   */
677  static FSNamesystem loadFromDisk(Configuration conf) throws IOException {
678
679    checkConfiguration(conf);
680    FSImage fsImage = new FSImage(conf,
681        FSNamesystem.getNamespaceDirs(conf),
682        FSNamesystem.getNamespaceEditsDirs(conf));
683    FSNamesystem namesystem = new FSNamesystem(conf, fsImage, false);
684    StartupOption startOpt = NameNode.getStartupOption(conf);
685    if (startOpt == StartupOption.RECOVER) {
686      namesystem.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
687    }
688
689    long loadStart = monotonicNow();
690    try {
691      namesystem.loadFSImage(startOpt);
692    } catch (IOException ioe) {
693      LOG.warn("Encountered exception loading fsimage", ioe);
694      fsImage.close();
695      throw ioe;
696    }
697    long timeTakenToLoadFSImage = monotonicNow() - loadStart;
698    LOG.info("Finished loading FSImage in " + timeTakenToLoadFSImage + " msecs");
699    NameNodeMetrics nnMetrics = NameNode.getNameNodeMetrics();
700    if (nnMetrics != null) {
701      nnMetrics.setFsImageLoadTime((int) timeTakenToLoadFSImage);
702    }
703    namesystem.getFSDirectory().createReservedStatuses(namesystem.getCTime());
704    return namesystem;
705  }
706  
707  FSNamesystem(Configuration conf, FSImage fsImage) throws IOException {
708    this(conf, fsImage, false);
709  }
710  
711  /**
712   * Create an FSNamesystem associated with the specified image.
713   * 
714   * Note that this does not load any data off of disk -- if you would
715   * like that behavior, use {@link #loadFromDisk(Configuration)}
716   *
717   * @param conf configuration
718   * @param fsImage The FSImage to associate with
719   * @param ignoreRetryCache Whether or not should ignore the retry cache setup
720   *                         step. For Secondary NN this should be set to true.
721   * @throws IOException on bad configuration
722   */
723  FSNamesystem(Configuration conf, FSImage fsImage, boolean ignoreRetryCache)
724      throws IOException {
725    provider = DFSUtil.createKeyProviderCryptoExtension(conf);
726    LOG.info("KeyProvider: " + provider);
727    if (conf.getBoolean(DFS_NAMENODE_AUDIT_LOG_ASYNC_KEY,
728                        DFS_NAMENODE_AUDIT_LOG_ASYNC_DEFAULT)) {
729      LOG.info("Enabling async auditlog");
730      enableAsyncAuditLog();
731    }
732    fsLock = new FSNamesystemLock(conf, detailedLockHoldTimeMetrics);
733    cond = fsLock.newWriteLockCondition();
734    cpLock = new ReentrantLock();
735
736    this.fsImage = fsImage;
737    try {
738      resourceRecheckInterval = conf.getLong(
739          DFS_NAMENODE_RESOURCE_CHECK_INTERVAL_KEY,
740          DFS_NAMENODE_RESOURCE_CHECK_INTERVAL_DEFAULT);
741
742      this.blockManager = new BlockManager(this, conf);
743      this.datanodeStatistics = blockManager.getDatanodeManager().getDatanodeStatistics();
744      this.blockIdManager = new BlockIdManager(blockManager);
745
746      this.fsOwner = UserGroupInformation.getCurrentUser();
747      this.supergroup = conf.get(DFS_PERMISSIONS_SUPERUSERGROUP_KEY, 
748                                 DFS_PERMISSIONS_SUPERUSERGROUP_DEFAULT);
749      this.isPermissionEnabled = conf.getBoolean(DFS_PERMISSIONS_ENABLED_KEY,
750                                                 DFS_PERMISSIONS_ENABLED_DEFAULT);
751      LOG.info("fsOwner             = " + fsOwner);
752      LOG.info("supergroup          = " + supergroup);
753      LOG.info("isPermissionEnabled = " + isPermissionEnabled);
754
755      // block allocation has to be persisted in HA using a shared edits directory
756      // so that the standby has up-to-date namespace information
757      nameserviceId = DFSUtil.getNamenodeNameServiceId(conf);
758      this.haEnabled = HAUtil.isHAEnabled(conf, nameserviceId);  
759      
760      // Sanity check the HA-related config.
761      if (nameserviceId != null) {
762        LOG.info("Determined nameservice ID: " + nameserviceId);
763      }
764      LOG.info("HA Enabled: " + haEnabled);
765      if (!haEnabled && HAUtil.usesSharedEditsDir(conf)) {
766        LOG.warn("Configured NNs:\n" + DFSUtil.nnAddressesAsString(conf));
767        throw new IOException("Invalid configuration: a shared edits dir " +
768            "must not be specified if HA is not enabled.");
769      }
770
771      // Get the checksum type from config
772      String checksumTypeStr = conf.get(DFS_CHECKSUM_TYPE_KEY, DFS_CHECKSUM_TYPE_DEFAULT);
773      DataChecksum.Type checksumType;
774      try {
775         checksumType = DataChecksum.Type.valueOf(checksumTypeStr);
776      } catch (IllegalArgumentException iae) {
777         throw new IOException("Invalid checksum type in "
778            + DFS_CHECKSUM_TYPE_KEY + ": " + checksumTypeStr);
779      }
780
781      this.serverDefaults = new FsServerDefaults(
782          conf.getLongBytes(DFS_BLOCK_SIZE_KEY, DFS_BLOCK_SIZE_DEFAULT),
783          conf.getInt(DFS_BYTES_PER_CHECKSUM_KEY, DFS_BYTES_PER_CHECKSUM_DEFAULT),
784          conf.getInt(DFS_CLIENT_WRITE_PACKET_SIZE_KEY, DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT),
785          (short) conf.getInt(DFS_REPLICATION_KEY, DFS_REPLICATION_DEFAULT),
786          conf.getInt(IO_FILE_BUFFER_SIZE_KEY, IO_FILE_BUFFER_SIZE_DEFAULT),
787          conf.getBoolean(DFS_ENCRYPT_DATA_TRANSFER_KEY, DFS_ENCRYPT_DATA_TRANSFER_DEFAULT),
788          conf.getLong(FS_TRASH_INTERVAL_KEY, FS_TRASH_INTERVAL_DEFAULT),
789          checksumType);
790      
791      this.maxFsObjects = conf.getLong(DFS_NAMENODE_MAX_OBJECTS_KEY, 
792                                       DFS_NAMENODE_MAX_OBJECTS_DEFAULT);
793
794      this.minBlockSize = conf.getLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY,
795          DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_DEFAULT);
796      this.maxBlocksPerFile = conf.getLong(DFSConfigKeys.DFS_NAMENODE_MAX_BLOCKS_PER_FILE_KEY,
797          DFSConfigKeys.DFS_NAMENODE_MAX_BLOCKS_PER_FILE_DEFAULT);
798      this.numCommittedAllowed = conf.getInt(
799              DFSConfigKeys.DFS_NAMENODE_FILE_CLOSE_NUM_COMMITTED_ALLOWED_KEY,
800              DFSConfigKeys.DFS_NAMENODE_FILE_CLOSE_NUM_COMMITTED_ALLOWED_DEFAULT);
801      this.supportAppends = conf.getBoolean(DFS_SUPPORT_APPEND_KEY, DFS_SUPPORT_APPEND_DEFAULT);
802      LOG.info("Append Enabled: " + supportAppends);
803
804      this.dtpReplaceDatanodeOnFailure = ReplaceDatanodeOnFailure.get(conf);
805      
806      this.standbyShouldCheckpoint = conf.getBoolean(
807          DFS_HA_STANDBY_CHECKPOINTS_KEY, DFS_HA_STANDBY_CHECKPOINTS_DEFAULT);
808      // # edit autoroll threshold is a multiple of the checkpoint threshold 
809      this.editLogRollerThreshold = (long)
810          (conf.getFloat(
811              DFS_NAMENODE_EDIT_LOG_AUTOROLL_MULTIPLIER_THRESHOLD,
812              DFS_NAMENODE_EDIT_LOG_AUTOROLL_MULTIPLIER_THRESHOLD_DEFAULT) *
813          conf.getLong(
814              DFS_NAMENODE_CHECKPOINT_TXNS_KEY,
815              DFS_NAMENODE_CHECKPOINT_TXNS_DEFAULT));
816      this.editLogRollerInterval = conf.getInt(
817          DFS_NAMENODE_EDIT_LOG_AUTOROLL_CHECK_INTERVAL_MS,
818          DFS_NAMENODE_EDIT_LOG_AUTOROLL_CHECK_INTERVAL_MS_DEFAULT);
819
820      this.lazyPersistFileScrubIntervalSec = conf.getInt(
821          DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC,
822          DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC_DEFAULT);
823
824      if (this.lazyPersistFileScrubIntervalSec < 0) {
825        throw new IllegalArgumentException(
826            DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC
827                + " must be zero (for disable) or greater than zero.");
828      }
829
830      this.edekCacheLoaderDelay = conf.getInt(
831          DFSConfigKeys.DFS_NAMENODE_EDEKCACHELOADER_INITIAL_DELAY_MS_KEY,
832          DFSConfigKeys.DFS_NAMENODE_EDEKCACHELOADER_INITIAL_DELAY_MS_DEFAULT);
833      this.edekCacheLoaderInterval = conf.getInt(
834          DFSConfigKeys.DFS_NAMENODE_EDEKCACHELOADER_INTERVAL_MS_KEY,
835          DFSConfigKeys.DFS_NAMENODE_EDEKCACHELOADER_INTERVAL_MS_DEFAULT);
836
837      this.leaseRecheckIntervalMs = conf.getLong(
838          DFS_NAMENODE_LEASE_RECHECK_INTERVAL_MS_KEY,
839          DFS_NAMENODE_LEASE_RECHECK_INTERVAL_MS_DEFAULT);
840      this.maxLockHoldToReleaseLeaseMs = conf.getLong(
841          DFS_NAMENODE_MAX_LOCK_HOLD_TO_RELEASE_LEASE_MS_KEY,
842          DFS_NAMENODE_MAX_LOCK_HOLD_TO_RELEASE_LEASE_MS_DEFAULT);
843
844      // For testing purposes, allow the DT secret manager to be started regardless
845      // of whether security is enabled.
846      alwaysUseDelegationTokensForTests = conf.getBoolean(
847          DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY,
848          DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_DEFAULT);
849      
850      this.dtSecretManager = createDelegationTokenSecretManager(conf);
851      this.dir = new FSDirectory(this, conf);
852      this.snapshotManager = new SnapshotManager(dir);
853      this.cacheManager = new CacheManager(this, conf, blockManager);
854      this.safeMode = new SafeModeInfo(conf);
855      this.topConf = new TopConf(conf);
856      this.auditLoggers = initAuditLoggers(conf);
857      this.isDefaultAuditLogger = auditLoggers.size() == 1 &&
858        auditLoggers.get(0) instanceof DefaultAuditLogger;
859      this.retryCache = ignoreRetryCache ? null : initRetryCache(conf);
860      Class<? extends INodeAttributeProvider> klass = conf.getClass(
861          DFS_NAMENODE_INODE_ATTRIBUTES_PROVIDER_KEY,
862          null, INodeAttributeProvider.class);
863      if (klass != null) {
864        inodeAttributeProvider = ReflectionUtils.newInstance(klass, conf);
865        LOG.info("Using INode attribute provider: " + klass.getName());
866      }
867    } catch(IOException e) {
868      LOG.error(getClass().getSimpleName() + " initialization failed.", e);
869      close();
870      throw e;
871    } catch (RuntimeException re) {
872      LOG.error(getClass().getSimpleName() + " initialization failed.", re);
873      close();
874      throw re;
875    }
876  }
877
878  @VisibleForTesting
879  public List<AuditLogger> getAuditLoggers() {
880    return auditLoggers;
881  }
882
883  @VisibleForTesting
884  public RetryCache getRetryCache() {
885    return retryCache;
886  }
887
888  @VisibleForTesting
889  public long getLeaseRecheckIntervalMs() {
890    return leaseRecheckIntervalMs;
891  }
892
893  @VisibleForTesting
894  public long getMaxLockHoldToReleaseLeaseMs() {
895    return maxLockHoldToReleaseLeaseMs;
896  }
897
898  void lockRetryCache() {
899    if (retryCache != null) {
900      retryCache.lock();
901    }
902  }
903
904  void unlockRetryCache() {
905    if (retryCache != null) {
906      retryCache.unlock();
907    }
908  }
909
910  /** Whether or not retry cache is enabled */
911  boolean hasRetryCache() {
912    return retryCache != null;
913  }
914  
915  void addCacheEntryWithPayload(byte[] clientId, int callId, Object payload) {
916    if (retryCache != null) {
917      retryCache.addCacheEntryWithPayload(clientId, callId, payload);
918    }
919  }
920  
921  void addCacheEntry(byte[] clientId, int callId) {
922    if (retryCache != null) {
923      retryCache.addCacheEntry(clientId, callId);
924    }
925  }
926
927  @VisibleForTesting
928  public KeyProviderCryptoExtension getProvider() {
929    return provider;
930  }
931
932  @VisibleForTesting
933  static RetryCache initRetryCache(Configuration conf) {
934    boolean enable = conf.getBoolean(DFS_NAMENODE_ENABLE_RETRY_CACHE_KEY,
935                                     DFS_NAMENODE_ENABLE_RETRY_CACHE_DEFAULT);
936    LOG.info("Retry cache on namenode is " + (enable ? "enabled" : "disabled"));
937    if (enable) {
938      float heapPercent = conf.getFloat(
939          DFS_NAMENODE_RETRY_CACHE_HEAP_PERCENT_KEY,
940          DFS_NAMENODE_RETRY_CACHE_HEAP_PERCENT_DEFAULT);
941      long entryExpiryMillis = conf.getLong(
942          DFS_NAMENODE_RETRY_CACHE_EXPIRYTIME_MILLIS_KEY,
943          DFS_NAMENODE_RETRY_CACHE_EXPIRYTIME_MILLIS_DEFAULT);
944      LOG.info("Retry cache will use " + heapPercent
945          + " of total heap and retry cache entry expiry time is "
946          + entryExpiryMillis + " millis");
947      long entryExpiryNanos = entryExpiryMillis * 1000 * 1000;
948      return new RetryCache("NameNodeRetryCache", heapPercent,
949          entryExpiryNanos);
950    }
951    return null;
952  }
953
954  private List<AuditLogger> initAuditLoggers(Configuration conf) {
955    // Initialize the custom access loggers if configured.
956    Collection<String> alClasses =
957        conf.getTrimmedStringCollection(DFS_NAMENODE_AUDIT_LOGGERS_KEY);
958    List<AuditLogger> auditLoggers = Lists.newArrayList();
959    if (alClasses != null && !alClasses.isEmpty()) {
960      for (String className : alClasses) {
961        try {
962          AuditLogger logger;
963          if (DFS_NAMENODE_DEFAULT_AUDIT_LOGGER_NAME.equals(className)) {
964            logger = new DefaultAuditLogger();
965          } else {
966            logger = (AuditLogger) Class.forName(className).newInstance();
967          }
968          logger.initialize(conf);
969          auditLoggers.add(logger);
970        } catch (RuntimeException re) {
971          throw re;
972        } catch (Exception e) {
973          throw new RuntimeException(e);
974        }
975      }
976    }
977
978    // Make sure there is at least one logger installed.
979    if (auditLoggers.isEmpty()) {
980      auditLoggers.add(new DefaultAuditLogger());
981    }
982
983    // Add audit logger to calculate top users
984    if (topConf.isEnabled) {
985      topMetrics = new TopMetrics(conf, topConf.nntopReportingPeriodsMs);
986      if (DefaultMetricsSystem.instance().getSource(
987          TOPMETRICS_METRICS_SOURCE_NAME) == null) {
988        DefaultMetricsSystem.instance().register(TOPMETRICS_METRICS_SOURCE_NAME,
989            "Top N operations by user", topMetrics);
990      }
991      auditLoggers.add(new TopAuditLogger(topMetrics));
992    }
993
994    return Collections.unmodifiableList(auditLoggers);
995  }
996
997  private void loadFSImage(StartupOption startOpt) throws IOException {
998    final FSImage fsImage = getFSImage();
999
1000    // format before starting up if requested
1001    if (startOpt == StartupOption.FORMAT) {
1002      
1003      fsImage.format(this, fsImage.getStorage().determineClusterId());// reuse current id
1004
1005      startOpt = StartupOption.REGULAR;
1006    }
1007    boolean success = false;
1008    writeLock();
1009    try {
1010      // We shouldn't be calling saveNamespace if we've come up in standby state.
1011      MetaRecoveryContext recovery = startOpt.createRecoveryContext();
1012      final boolean staleImage
1013          = fsImage.recoverTransitionRead(startOpt, this, recovery);
1014      if (RollingUpgradeStartupOption.ROLLBACK.matches(startOpt) ||
1015          RollingUpgradeStartupOption.DOWNGRADE.matches(startOpt)) {
1016        rollingUpgradeInfo = null;
1017      }
1018      final boolean needToSave = staleImage && !haEnabled && !isRollingUpgrade(); 
1019      LOG.info("Need to save fs image? " + needToSave
1020          + " (staleImage=" + staleImage + ", haEnabled=" + haEnabled
1021          + ", isRollingUpgrade=" + isRollingUpgrade() + ")");
1022      if (needToSave) {
1023        fsImage.saveNamespace(this);
1024      } else {
1025        // No need to save, so mark the phase done.
1026        StartupProgress prog = NameNode.getStartupProgress();
1027        prog.beginPhase(Phase.SAVING_CHECKPOINT);
1028        prog.endPhase(Phase.SAVING_CHECKPOINT);
1029      }
1030      // This will start a new log segment and write to the seen_txid file, so
1031      // we shouldn't do it when coming up in standby state
1032      if (!haEnabled || (haEnabled && startOpt == StartupOption.UPGRADE)
1033          || (haEnabled && startOpt == StartupOption.UPGRADEONLY)) {
1034        fsImage.openEditLogForWrite(getEffectiveLayoutVersion());
1035      }
1036      success = true;
1037    } finally {
1038      if (!success) {
1039        fsImage.close();
1040      }
1041      writeUnlock("loadFSImage");
1042    }
1043    imageLoadComplete();
1044  }
1045
1046  private void startSecretManager() {
1047    if (dtSecretManager != null) {
1048      try {
1049        dtSecretManager.startThreads();
1050      } catch (IOException e) {
1051        // Inability to start secret manager
1052        // can't be recovered from.
1053        throw new RuntimeException(e);
1054      }
1055    }
1056  }
1057  
1058  private void startSecretManagerIfNecessary() {
1059    boolean shouldRun = shouldUseDelegationTokens() &&
1060      !isInSafeMode() && getEditLog().isOpenForWrite();
1061    boolean running = dtSecretManager.isRunning();
1062    if (shouldRun && !running) {
1063      startSecretManager();
1064    }
1065  }
1066
1067  private void stopSecretManager() {
1068    if (dtSecretManager != null) {
1069      dtSecretManager.stopThreads();
1070    }
1071  }
1072  
1073  /** 
1074   * Start services common to both active and standby states
1075   */
1076  void startCommonServices(Configuration conf, HAContext haContext) throws IOException {
1077    this.registerMBean(); // register the MBean for the FSNamesystemState
1078    writeLock();
1079    this.haContext = haContext;
1080    try {
1081      nnResourceChecker = new NameNodeResourceChecker(conf);
1082      checkAvailableResources();
1083      assert safeMode != null && !blockManager.isPopulatingReplQueues();
1084      StartupProgress prog = NameNode.getStartupProgress();
1085      prog.beginPhase(Phase.SAFEMODE);
1086      long completeBlocksTotal = getCompleteBlocksTotal();
1087      prog.setTotal(Phase.SAFEMODE, STEP_AWAITING_REPORTED_BLOCKS,
1088          completeBlocksTotal);
1089      setBlockTotal(completeBlocksTotal);
1090      blockManager.activate(conf);
1091    } finally {
1092      writeUnlock("startCommonServices");
1093    }
1094    
1095    registerMXBean();
1096    DefaultMetricsSystem.instance().register(this);
1097    if (inodeAttributeProvider != null) {
1098      inodeAttributeProvider.start();
1099      dir.setINodeAttributeProvider(inodeAttributeProvider);
1100    }
1101    snapshotManager.registerMXBean();
1102  }
1103  
1104  /** 
1105   * Stop services common to both active and standby states
1106   */
1107  void stopCommonServices() {
1108    writeLock();
1109    if (inodeAttributeProvider != null) {
1110      dir.setINodeAttributeProvider(null);
1111      inodeAttributeProvider.stop();
1112    }
1113    try {
1114      if (blockManager != null) blockManager.close();
1115    } finally {
1116      writeUnlock("stopCommonServices");
1117    }
1118    RetryCache.clear(retryCache);
1119  }
1120  
1121  /**
1122   * Start services required in active state
1123   * @throws IOException
1124   */
1125  void startActiveServices() throws IOException {
1126    startingActiveService = true;
1127    LOG.info("Starting services required for active state");
1128    writeLock();
1129    try {
1130      FSEditLog editLog = getFSImage().getEditLog();
1131      
1132      if (!editLog.isOpenForWrite()) {
1133        // During startup, we're already open for write during initialization.
1134        editLog.initJournalsForWrite();
1135        // May need to recover
1136        editLog.recoverUnclosedStreams();
1137        
1138        LOG.info("Catching up to latest edits from old active before " +
1139            "taking over writer role in edits logs");
1140        editLogTailer.catchupDuringFailover();
1141        
1142        blockManager.setPostponeBlocksFromFuture(false);
1143        blockManager.getDatanodeManager().markAllDatanodesStale();
1144        blockManager.clearQueues();
1145        blockManager.processAllPendingDNMessages();
1146
1147        // Only need to re-process the queue, If not in SafeMode.
1148        if (!isInSafeMode()) {
1149          LOG.info("Reprocessing replication and invalidation queues");
1150          blockManager.initializeReplQueues();
1151        }
1152
1153        if (LOG.isDebugEnabled()) {
1154          LOG.debug("NameNode metadata after re-processing " +
1155              "replication and invalidation queues during failover:\n" +
1156              metaSaveAsString());
1157        }
1158        
1159        long nextTxId = getFSImage().getLastAppliedTxId() + 1;
1160        LOG.info("Will take over writing edit logs at txnid " + 
1161            nextTxId);
1162        editLog.setNextTxId(nextTxId);
1163
1164        getFSImage().editLog.openForWrite(getEffectiveLayoutVersion());
1165      }
1166
1167      // Initialize the quota.
1168      dir.updateCountForQuota();
1169      // Enable quota checks.
1170      dir.enableQuotaChecks();
1171      if (haEnabled) {
1172        // Renew all of the leases before becoming active.
1173        // This is because, while we were in standby mode,
1174        // the leases weren't getting renewed on this NN.
1175        // Give them all a fresh start here.
1176        leaseManager.renewAllLeases();
1177      }
1178      leaseManager.startMonitor();
1179      startSecretManagerIfNecessary();
1180
1181      //ResourceMonitor required only at ActiveNN. See HDFS-2914
1182      this.nnrmthread = new Daemon(new NameNodeResourceMonitor());
1183      nnrmthread.start();
1184
1185      nnEditLogRoller = new Daemon(new NameNodeEditLogRoller(
1186          editLogRollerThreshold, editLogRollerInterval));
1187      nnEditLogRoller.start();
1188
1189      if (lazyPersistFileScrubIntervalSec > 0) {
1190        lazyPersistFileScrubber = new Daemon(new LazyPersistFileScrubber(
1191            lazyPersistFileScrubIntervalSec));
1192        lazyPersistFileScrubber.start();
1193      } else {
1194        LOG.warn("Lazy persist file scrubber is disabled,"
1195            + " configured scrub interval is zero.");
1196      }
1197
1198      cacheManager.startMonitorThread();
1199      blockManager.getDatanodeManager().setShouldSendCachingCommands(true);
1200      if (provider != null) {
1201        edekCacheLoader = Executors.newSingleThreadExecutor(
1202            new ThreadFactoryBuilder().setDaemon(true)
1203                .setNameFormat("Warm Up EDEK Cache Thread #%d")
1204                .build());
1205        FSDirEncryptionZoneOp.warmUpEdekCache(edekCacheLoader, dir,
1206            edekCacheLoaderDelay, edekCacheLoaderInterval);
1207      }
1208    } finally {
1209      startingActiveService = false;
1210      checkSafeMode();
1211      writeUnlock("startActiveServices");
1212    }
1213  }
1214
1215  private boolean inActiveState() {
1216    return haContext != null &&
1217        haContext.getState().getServiceState() == HAServiceState.ACTIVE;
1218  }
1219
1220  /**
1221   * @return Whether the namenode is transitioning to active state and is in the
1222   *         middle of the {@link #startActiveServices()}
1223   */
1224  public boolean inTransitionToActive() {
1225    return haEnabled && inActiveState() && startingActiveService;
1226  }
1227
1228  private boolean shouldUseDelegationTokens() {
1229    return UserGroupInformation.isSecurityEnabled() ||
1230      alwaysUseDelegationTokensForTests;
1231  }
1232
1233  /** 
1234   * Stop services required in active state
1235   */
1236  void stopActiveServices() {
1237    LOG.info("Stopping services started for active state");
1238    writeLock();
1239    try {
1240      stopSecretManager();
1241      leaseManager.stopMonitor();
1242      if (nnrmthread != null) {
1243        ((NameNodeResourceMonitor) nnrmthread.getRunnable()).stopMonitor();
1244        nnrmthread.interrupt();
1245      }
1246      if (edekCacheLoader != null) {
1247        edekCacheLoader.shutdownNow();
1248      }
1249      if (nnEditLogRoller != null) {
1250        ((NameNodeEditLogRoller)nnEditLogRoller.getRunnable()).stop();
1251        nnEditLogRoller.interrupt();
1252      }
1253      if (lazyPersistFileScrubber != null) {
1254        ((LazyPersistFileScrubber) lazyPersistFileScrubber.getRunnable()).stop();
1255        lazyPersistFileScrubber.interrupt();
1256      }
1257      if (dir != null && getFSImage() != null) {
1258        if (getFSImage().editLog != null) {
1259          getFSImage().editLog.close();
1260        }
1261        // Update the fsimage with the last txid that we wrote
1262        // so that the tailer starts from the right spot.
1263        getFSImage().updateLastAppliedTxIdFromWritten();
1264      }
1265      if (cacheManager != null) {
1266        cacheManager.stopMonitorThread();
1267        cacheManager.clearDirectiveStats();
1268      }
1269      if (blockManager != null) {
1270        blockManager.getDatanodeManager().clearPendingCachingCommands();
1271        blockManager.getDatanodeManager().setShouldSendCachingCommands(false);
1272        // Don't want to keep replication queues when not in Active.
1273        blockManager.clearQueues();
1274        blockManager.setInitializedReplQueues(false);
1275      }
1276    } finally {
1277      writeUnlock("stopActiveServices");
1278    }
1279  }
1280  
1281  /**
1282   * Start services required in standby state 
1283   * 
1284   * @throws IOException
1285   */
1286  void startStandbyServices(final Configuration conf) throws IOException {
1287    LOG.info("Starting services required for standby state");
1288    if (!getFSImage().editLog.isOpenForRead()) {
1289      // During startup, we're already open for read.
1290      getFSImage().editLog.initSharedJournalsForRead();
1291    }
1292    
1293    blockManager.setPostponeBlocksFromFuture(true);
1294
1295    // Disable quota checks while in standby.
1296    dir.disableQuotaChecks();
1297    editLogTailer = new EditLogTailer(this, conf);
1298    editLogTailer.start();
1299    if (standbyShouldCheckpoint) {
1300      standbyCheckpointer = new StandbyCheckpointer(conf, this);
1301      standbyCheckpointer.start();
1302    }
1303  }
1304
1305  /**
1306   * Called when the NN is in Standby state and the editlog tailer tails the
1307   * OP_ROLLING_UPGRADE_START.
1308   */
1309  void triggerRollbackCheckpoint() {
1310    setNeedRollbackFsImage(true);
1311    if (standbyCheckpointer != null) {
1312      standbyCheckpointer.triggerRollbackCheckpoint();
1313    }
1314  }
1315
1316  /**
1317   * Called while the NN is in Standby state, but just about to be
1318   * asked to enter Active state. This cancels any checkpoints
1319   * currently being taken.
1320   */
1321  void prepareToStopStandbyServices() throws ServiceFailedException {
1322    if (standbyCheckpointer != null) {
1323      standbyCheckpointer.cancelAndPreventCheckpoints(
1324          "About to leave standby state");
1325    }
1326  }
1327
1328  /** Stop services required in standby state */
1329  void stopStandbyServices() throws IOException {
1330    LOG.info("Stopping services started for standby state");
1331    if (standbyCheckpointer != null) {
1332      standbyCheckpointer.stop();
1333    }
1334    if (editLogTailer != null) {
1335      editLogTailer.stop();
1336    }
1337    if (dir != null && getFSImage() != null && getFSImage().editLog != null) {
1338      getFSImage().editLog.close();
1339    }
1340  }
1341  
1342  @Override
1343  public void checkOperation(OperationCategory op) throws StandbyException {
1344    if (haContext != null) {
1345      // null in some unit tests
1346      haContext.checkOperation(op);
1347    }
1348  }
1349  
1350  /**
1351   * @throws RetriableException
1352   *           If 1) The NameNode is in SafeMode, 2) HA is enabled, and 3)
1353   *           NameNode is in active state
1354   * @throws SafeModeException
1355   *           Otherwise if NameNode is in SafeMode.
1356   */
1357  void checkNameNodeSafeMode(String errorMsg)
1358      throws RetriableException, SafeModeException {
1359    if (isInSafeMode()) {
1360      SafeModeException se = newSafemodeException(errorMsg);
1361      if (haEnabled && haContext != null
1362          && haContext.getState().getServiceState() == HAServiceState.ACTIVE
1363          && shouldRetrySafeMode(this.safeMode)) {
1364        throw new RetriableException(se);
1365      } else {
1366        throw se;
1367      }
1368    }
1369  }
1370
1371  private SafeModeException newSafemodeException(String errorMsg) {
1372    return new SafeModeException(errorMsg + ". Name node is in safe " +
1373        "mode.\n" + safeMode.getTurnOffTip());
1374  }
1375
1376  boolean isPermissionEnabled() {
1377    return isPermissionEnabled;
1378  }
1379
1380  /**
1381   * We already know that the safemode is on. We will throw a RetriableException
1382   * if the safemode is not manual or caused by low resource.
1383   */
1384  private boolean shouldRetrySafeMode(SafeModeInfo safeMode) {
1385    if (safeMode == null) {
1386      return false;
1387    } else {
1388      return !safeMode.isManual() && !safeMode.areResourcesLow();
1389    }
1390  }
1391  
1392  public static Collection<URI> getNamespaceDirs(Configuration conf) {
1393    return getStorageDirs(conf, DFS_NAMENODE_NAME_DIR_KEY);
1394  }
1395
1396  /**
1397   * Get all edits dirs which are required. If any shared edits dirs are
1398   * configured, these are also included in the set of required dirs.
1399   * 
1400   * @param conf the HDFS configuration.
1401   * @return all required dirs.
1402   */
1403  public static Collection<URI> getRequiredNamespaceEditsDirs(Configuration conf) {
1404    Set<URI> ret = new HashSet<URI>();
1405    ret.addAll(getStorageDirs(conf, DFS_NAMENODE_EDITS_DIR_REQUIRED_KEY));
1406    ret.addAll(getSharedEditsDirs(conf));
1407    return ret;
1408  }
1409
1410  private static Collection<URI> getStorageDirs(Configuration conf,
1411                                                String propertyName) {
1412    Collection<String> dirNames = conf.getTrimmedStringCollection(propertyName);
1413    StartupOption startOpt = NameNode.getStartupOption(conf);
1414    if(startOpt == StartupOption.IMPORT) {
1415      // In case of IMPORT this will get rid of default directories 
1416      // but will retain directories specified in hdfs-site.xml
1417      // When importing image from a checkpoint, the name-node can
1418      // start with empty set of storage directories.
1419      Configuration cE = new HdfsConfiguration(false);
1420      cE.addResource("core-default.xml");
1421      cE.addResource("core-site.xml");
1422      cE.addResource("hdfs-default.xml");
1423      Collection<String> dirNames2 = cE.getTrimmedStringCollection(propertyName);
1424      dirNames.removeAll(dirNames2);
1425      if(dirNames.isEmpty())
1426        LOG.warn("!!! WARNING !!!" +
1427          "\n\tThe NameNode currently runs without persistent storage." +
1428          "\n\tAny changes to the file system meta-data may be lost." +
1429          "\n\tRecommended actions:" +
1430          "\n\t\t- shutdown and restart NameNode with configured \"" 
1431          + propertyName + "\" in hdfs-site.xml;" +
1432          "\n\t\t- use Backup Node as a persistent and up-to-date storage " +
1433          "of the file system meta-data.");
1434    } else if (dirNames.isEmpty()) {
1435      dirNames = Collections.singletonList(
1436          DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_DEFAULT);
1437    }
1438    return Util.stringCollectionAsURIs(dirNames);
1439  }
1440
1441  /**
1442   * Return an ordered list of edits directories to write to.
1443   * The list is ordered such that all shared edits directories
1444   * are ordered before non-shared directories, and any duplicates
1445   * are removed. The order they are specified in the configuration
1446   * is retained.
1447   * @return Collection of shared edits directories.
1448   * @throws IOException if multiple shared edits directories are configured
1449   */
1450  public static List<URI> getNamespaceEditsDirs(Configuration conf)
1451      throws IOException {
1452    return getNamespaceEditsDirs(conf, true);
1453  }
1454  
1455  public static List<URI> getNamespaceEditsDirs(Configuration conf,
1456      boolean includeShared)
1457      throws IOException {
1458    // Use a LinkedHashSet so that order is maintained while we de-dup
1459    // the entries.
1460    LinkedHashSet<URI> editsDirs = new LinkedHashSet<URI>();
1461    
1462    if (includeShared) {
1463      List<URI> sharedDirs = getSharedEditsDirs(conf);
1464  
1465      // Fail until multiple shared edits directories are supported (HDFS-2782)
1466      if (sharedDirs.size() > 1) {
1467        throw new IOException(
1468            "Multiple shared edits directories are not yet supported");
1469      }
1470  
1471      // First add the shared edits dirs. It's critical that the shared dirs
1472      // are added first, since JournalSet syncs them in the order they are listed,
1473      // and we need to make sure all edits are in place in the shared storage
1474      // before they are replicated locally. See HDFS-2874.
1475      for (URI dir : sharedDirs) {
1476        if (!editsDirs.add(dir)) {
1477          LOG.warn("Edits URI " + dir + " listed multiple times in " + 
1478              DFS_NAMENODE_SHARED_EDITS_DIR_KEY + ". Ignoring duplicates.");
1479        }
1480      }
1481    }    
1482    // Now add the non-shared dirs.
1483    for (URI dir : getStorageDirs(conf, DFS_NAMENODE_EDITS_DIR_KEY)) {
1484      if (!editsDirs.add(dir)) {
1485        LOG.warn("Edits URI " + dir + " listed multiple times in " + 
1486            DFS_NAMENODE_SHARED_EDITS_DIR_KEY + " and " +
1487            DFS_NAMENODE_EDITS_DIR_KEY + ". Ignoring duplicates.");
1488      }
1489    }
1490
1491    if (editsDirs.isEmpty()) {
1492      // If this is the case, no edit dirs have been explicitly configured.
1493      // Image dirs are to be used for edits too.
1494      return Lists.newArrayList(getNamespaceDirs(conf));
1495    } else {
1496      return Lists.newArrayList(editsDirs);
1497    }
1498  }
1499  
1500  /**
1501   * Returns edit directories that are shared between primary and secondary.
1502   * @param conf configuration
1503   * @return collection of edit directories from {@code conf}
1504   */
1505  public static List<URI> getSharedEditsDirs(Configuration conf) {
1506    // don't use getStorageDirs here, because we want an empty default
1507    // rather than the dir in /tmp
1508    Collection<String> dirNames = conf.getTrimmedStringCollection(
1509        DFS_NAMENODE_SHARED_EDITS_DIR_KEY);
1510    return Util.stringCollectionAsURIs(dirNames);
1511  }
1512
1513  @Override
1514  public void readLock() {
1515    this.fsLock.readLock();
1516  }
1517  @Override
1518  public void readUnlock() {
1519    this.fsLock.readUnlock();
1520  }
1521  public void readUnlock(String opName) {
1522    this.fsLock.readUnlock(opName);
1523  }
1524  @Override
1525  public void writeLock() {
1526    this.fsLock.writeLock();
1527  }
1528  @Override
1529  public void writeLockInterruptibly() throws InterruptedException {
1530    this.fsLock.writeLockInterruptibly();
1531  }
1532  @Override
1533  public void writeUnlock() {
1534    this.fsLock.writeUnlock();
1535  }
1536  public void writeUnlock(String opName) {
1537    this.fsLock.writeUnlock(opName);
1538  }
1539  @Override
1540  public boolean hasWriteLock() {
1541    return this.fsLock.isWriteLockedByCurrentThread();
1542  }
1543  @Override
1544  public boolean hasReadLock() {
1545    return this.fsLock.getReadHoldCount() > 0 || hasWriteLock();
1546  }
1547
1548  public int getReadHoldCount() {
1549    return this.fsLock.getReadHoldCount();
1550  }
1551
1552  public int getWriteHoldCount() {
1553    return this.fsLock.getWriteHoldCount();
1554  }
1555
1556  /** Lock the checkpoint lock */
1557  public void cpLock() {
1558    this.cpLock.lock();
1559  }
1560
1561  /** Lock the checkpoint lock interrupibly */
1562  public void cpLockInterruptibly() throws InterruptedException {
1563    this.cpLock.lockInterruptibly();
1564  }
1565
1566  /** Unlock the checkpoint lock */
1567  public void cpUnlock() {
1568    this.cpLock.unlock();
1569  }
1570    
1571
1572  NamespaceInfo getNamespaceInfo() {
1573    readLock();
1574    try {
1575      return unprotectedGetNamespaceInfo();
1576    } finally {
1577      readUnlock("getNamespaceInfo");
1578    }
1579  }
1580
1581  /**
1582   * Get the creation time of the file system.
1583   * Notice that this time is initialized to NameNode format time, and updated
1584   * to upgrade time during upgrades.
1585   * @return time in milliseconds.
1586   * See {@link org.apache.hadoop.util.Time#now()}.
1587   */
1588  @VisibleForTesting
1589  long getCTime() {
1590    return fsImage == null ? 0 : fsImage.getStorage().getCTime();
1591  }
1592
1593  /**
1594   * Version of @see #getNamespaceInfo() that is not protected by a lock.
1595   */
1596  NamespaceInfo unprotectedGetNamespaceInfo() {
1597    return new NamespaceInfo(getFSImage().getStorage().getNamespaceID(),
1598        getClusterId(), getBlockPoolId(),
1599        getFSImage().getStorage().getCTime(), getState());
1600  }
1601
1602  /**
1603   * Close down this file system manager.
1604   * Causes heartbeat and lease daemons to stop; waits briefly for
1605   * them to finish, but a short timeout returns control back to caller.
1606   */
1607  void close() {
1608    fsRunning = false;
1609    try {
1610      stopCommonServices();
1611      if (smmthread != null) smmthread.interrupt();
1612    } finally {
1613      // using finally to ensure we also wait for lease daemon
1614      try {
1615        stopActiveServices();
1616        stopStandbyServices();
1617      } catch (IOException ie) {
1618      } finally {
1619        IOUtils.cleanup(LOG, dir);
1620        IOUtils.cleanup(LOG, fsImage);
1621      }
1622    }
1623  }
1624
1625  @Override
1626  public boolean isRunning() {
1627    return fsRunning;
1628  }
1629  
1630  @Override
1631  public boolean isInStandbyState() {
1632    if (haContext == null || haContext.getState() == null) {
1633      // We're still starting up. In this case, if HA is
1634      // on for the cluster, we always start in standby. Otherwise
1635      // start in active.
1636      return haEnabled;
1637    }
1638
1639    return HAServiceState.STANDBY == haContext.getState().getServiceState();
1640  }
1641
1642  /**
1643   * Dump all metadata into specified file
1644   */
1645  void metaSave(String filename) throws IOException {
1646    checkSuperuserPrivilege();
1647    checkOperation(OperationCategory.UNCHECKED);
1648    writeLock();
1649    try {
1650      checkOperation(OperationCategory.UNCHECKED);
1651      File file = new File(System.getProperty("hadoop.log.dir"), filename);
1652      PrintWriter out = new PrintWriter(new BufferedWriter(
1653          new OutputStreamWriter(new FileOutputStream(file), Charsets.UTF_8)));
1654      metaSave(out);
1655      out.flush();
1656      out.close();
1657    } finally {
1658      writeUnlock("metaSave");
1659    }
1660  }
1661
1662  private void metaSave(PrintWriter out) {
1663    assert hasWriteLock();
1664    long totalInodes = this.dir.totalInodes();
1665    long totalBlocks = this.getBlocksTotal();
1666    out.println(totalInodes + " files and directories, " + totalBlocks
1667        + " blocks = " + (totalInodes + totalBlocks) + " total");
1668
1669    blockManager.metaSave(out);
1670  }
1671
1672  private String metaSaveAsString() {
1673    StringWriter sw = new StringWriter();
1674    PrintWriter pw = new PrintWriter(sw);
1675    metaSave(pw);
1676    pw.flush();
1677    return sw.toString();
1678  }
1679
1680  FsServerDefaults getServerDefaults() throws StandbyException {
1681    checkOperation(OperationCategory.READ);
1682    return serverDefaults;
1683  }
1684
1685  /////////////////////////////////////////////////////////
1686  //
1687  // These methods are called by HadoopFS clients
1688  //
1689  /////////////////////////////////////////////////////////
1690  /**
1691   * Set permissions for an existing file.
1692   * @throws IOException
1693   */
1694  void setPermission(String src, FsPermission permission) throws IOException {
1695    final String operationName = "setPermission";
1696    HdfsFileStatus auditStat;
1697    checkOperation(OperationCategory.WRITE);
1698    writeLock();
1699    try {
1700      checkOperation(OperationCategory.WRITE);
1701      checkNameNodeSafeMode("Cannot set permission for " + src);
1702      auditStat = FSDirAttrOp.setPermission(dir, src, permission);
1703    } catch (AccessControlException e) {
1704      logAuditEvent(false, operationName, src);
1705      throw e;
1706    } finally {
1707      writeUnlock(operationName);
1708    }
1709    getEditLog().logSync();
1710    logAuditEvent(true, operationName, src, null, auditStat);
1711  }
1712
1713  /**
1714   * Set owner for an existing file.
1715   * @throws IOException
1716   */
1717  void setOwner(String src, String username, String group)
1718      throws IOException {
1719    final String operationName = "setOwner";
1720    HdfsFileStatus auditStat;
1721    checkOperation(OperationCategory.WRITE);
1722    writeLock();
1723    try {
1724      checkOperation(OperationCategory.WRITE);
1725      checkNameNodeSafeMode("Cannot set owner for " + src);
1726      auditStat = FSDirAttrOp.setOwner(dir, src, username, group);
1727    } catch (AccessControlException e) {
1728      logAuditEvent(false, operationName, src);
1729      throw e;
1730    } finally {
1731      writeUnlock(operationName);
1732    }
1733    getEditLog().logSync();
1734    logAuditEvent(true, operationName, src, null, auditStat);
1735  }
1736
1737  /**
1738   * Get block locations within the specified range.
1739   * @see ClientProtocol#getBlockLocations(String, long, long)
1740   */
1741  LocatedBlocks getBlockLocations(String clientMachine, String srcArg,
1742      long offset, long length) throws IOException {
1743    final String operationName = "open";
1744    checkOperation(OperationCategory.READ);
1745    GetBlockLocationsResult res = null;
1746    FSPermissionChecker pc = getPermissionChecker();
1747    readLock();
1748    try {
1749      checkOperation(OperationCategory.READ);
1750      res = FSDirStatAndListingOp.getBlockLocations(
1751          dir, pc, srcArg, offset, length, true);
1752      if (isInSafeMode()) {
1753        for (LocatedBlock b : res.blocks.getLocatedBlocks()) {
1754          // if safemode & no block locations yet then throw safemodeException
1755          if ((b.getLocations() == null) || (b.getLocations().length == 0)) {
1756            SafeModeException se = newSafemodeException(
1757                "Zero blocklocations for " + srcArg);
1758            if (haEnabled && haContext != null &&
1759                haContext.getState().getServiceState() == HAServiceState.ACTIVE) {
1760              throw new RetriableException(se);
1761            } else {
1762              throw se;
1763            }
1764          }
1765        }
1766      }
1767    } catch (AccessControlException e) {
1768      logAuditEvent(false, operationName, srcArg);
1769      throw e;
1770    } finally {
1771      readUnlock(operationName);
1772    }
1773
1774    logAuditEvent(true, operationName, srcArg);
1775
1776    if (!isInSafeMode() && res.updateAccessTime()) {
1777      String src = srcArg;
1778      writeLock();
1779      final long now = now();
1780      try {
1781        checkOperation(OperationCategory.WRITE);
1782        /**
1783         * Resolve the path again and update the atime only when the file
1784         * exists.
1785         *
1786         * XXX: Races can still occur even after resolving the path again.
1787         * For example:
1788         *
1789         * <ul>
1790         *   <li>Get the block location for "/a/b"</li>
1791         *   <li>Rename "/a/b" to "/c/b"</li>
1792         *   <li>The second resolution still points to "/a/b", which is
1793         *   wrong.</li>
1794         * </ul>
1795         *
1796         * The behavior is incorrect but consistent with the one before
1797         * HDFS-7463. A better fix is to change the edit log of SetTime to
1798         * use inode id instead of a path.
1799         */
1800        final INodesInPath iip = dir.resolvePath(pc, srcArg, DirOp.READ);
1801        src = iip.getPath();
1802
1803        INode inode = iip.getLastINode();
1804        boolean updateAccessTime = inode != null &&
1805            now > inode.getAccessTime() + dir.getAccessTimePrecision();
1806        if (!isInSafeMode() && updateAccessTime) {
1807          boolean changed = FSDirAttrOp.setTimes(dir, iip, -1, now, false);
1808          if (changed) {
1809            getEditLog().logTimes(src, -1, now);
1810          }
1811        }
1812      } catch (Throwable e) {
1813        LOG.warn("Failed to update the access time of " + src, e);
1814      } finally {
1815        writeUnlock(operationName);
1816      }
1817    }
1818
1819    LocatedBlocks blocks = res.blocks;
1820    if (blocks != null) {
1821      blockManager.getDatanodeManager().sortLocatedBlocks(
1822          clientMachine, blocks.getLocatedBlocks());
1823
1824      // lastBlock is not part of getLocatedBlocks(), might need to sort it too
1825      LocatedBlock lastBlock = blocks.getLastLocatedBlock();
1826      if (lastBlock != null) {
1827        ArrayList<LocatedBlock> lastBlockList = Lists.newArrayList(lastBlock);
1828        blockManager.getDatanodeManager().sortLocatedBlocks(
1829            clientMachine, lastBlockList);
1830      }
1831    }
1832    return blocks;
1833  }
1834
1835  /**
1836   * Moves all the blocks from {@code srcs} and appends them to {@code target}
1837   * To avoid rollbacks we will verify validity of ALL of the args
1838   * before we start actual move.
1839   * 
1840   * This does not support ".inodes" relative path
1841   * @param target target to concat into
1842   * @param srcs file that will be concatenated
1843   * @throws IOException on error
1844   */
1845  void concat(String target, String [] srcs, boolean logRetryCache)
1846      throws IOException {
1847    waitForLoadingFSImage();
1848    final String operationName = "concat";
1849    HdfsFileStatus stat = null;
1850    boolean success = false;
1851    writeLock();
1852    try {
1853      checkOperation(OperationCategory.WRITE);
1854      checkNameNodeSafeMode("Cannot concat " + target);
1855      stat = FSDirConcatOp.concat(dir, target, srcs, logRetryCache);
1856      success = true;
1857    } finally {
1858      writeUnlock(operationName);
1859      if (success) {
1860        getEditLog().logSync();
1861      }
1862      logAuditEvent(success, operationName, Arrays.toString(srcs),
1863          target, stat);
1864    }
1865  }
1866
1867  /**
1868   * stores the modification and access time for this inode. 
1869   * The access time is precise up to an hour. The transaction, if needed, is
1870   * written to the edits log but is not flushed.
1871   */
1872  void setTimes(String src, long mtime, long atime) throws IOException {
1873    final String operationName = "setTimes";
1874    HdfsFileStatus auditStat;
1875    checkOperation(OperationCategory.WRITE);
1876    writeLock();
1877    try {
1878      checkOperation(OperationCategory.WRITE);
1879      checkNameNodeSafeMode("Cannot set times " + src);
1880      auditStat = FSDirAttrOp.setTimes(dir, src, mtime, atime);
1881    } catch (AccessControlException e) {
1882      logAuditEvent(false, operationName, src);
1883      throw e;
1884    } finally {
1885      writeUnlock(operationName);
1886    }
1887    getEditLog().logSync();
1888    logAuditEvent(true, operationName, src, null, auditStat);
1889  }
1890
1891  /**
1892   * Create a symbolic link.
1893   */
1894  @SuppressWarnings("deprecation")
1895  void createSymlink(String target, String link,
1896      PermissionStatus dirPerms, boolean createParent, boolean logRetryCache)
1897      throws IOException {
1898    final String operationName = "createSymlink";
1899    if (!FileSystem.areSymlinksEnabled()) {
1900      throw new UnsupportedOperationException("Symlinks not supported");
1901    }
1902    HdfsFileStatus auditStat = null;
1903    checkOperation(OperationCategory.WRITE);
1904    writeLock();
1905    try {
1906      checkOperation(OperationCategory.WRITE);
1907      checkNameNodeSafeMode("Cannot create symlink " + link);
1908      auditStat = FSDirSymlinkOp.createSymlinkInt(this, target, link, dirPerms,
1909                                                  createParent, logRetryCache);
1910    } catch (AccessControlException e) {
1911      logAuditEvent(false, operationName, link, target, null);
1912      throw e;
1913    } finally {
1914      writeUnlock(operationName);
1915    }
1916    getEditLog().logSync();
1917    logAuditEvent(true, operationName, link, target, auditStat);
1918  }
1919
1920  /**
1921   * Set replication for an existing file.
1922   * 
1923   * The NameNode sets new replication and schedules either replication of 
1924   * under-replicated data blocks or removal of the excessive block copies 
1925   * if the blocks are over-replicated.
1926   * 
1927   * @see ClientProtocol#setReplication(String, short)
1928   * @param src file name
1929   * @param replication new replication
1930   * @return true if successful; 
1931   *         false if file does not exist or is a directory
1932   */
1933  boolean setReplication(final String src, final short replication)
1934      throws IOException {
1935    final String operationName = "setReplication";
1936    boolean success = false;
1937    waitForLoadingFSImage();
1938    checkOperation(OperationCategory.WRITE);
1939    writeLock();
1940    try {
1941      checkOperation(OperationCategory.WRITE);
1942      checkNameNodeSafeMode("Cannot set replication for " + src);
1943      success = FSDirAttrOp.setReplication(dir, blockManager, src, replication);
1944    } catch (AccessControlException e) {
1945      logAuditEvent(false, operationName, src);
1946      throw e;
1947    } finally {
1948      writeUnlock(operationName);
1949    }
1950    if (success) {
1951      getEditLog().logSync();
1952      logAuditEvent(true, operationName, src);
1953    }
1954    return success;
1955  }
1956
1957  /**
1958   * Truncate file to a lower length.
1959   * Truncate cannot be reverted / recovered from as it causes data loss.
1960   * Truncation at block boundary is atomic, otherwise it requires
1961   * block recovery to truncate the last block of the file.
1962   *
1963   * @return true if client does not need to wait for block recovery,
1964   *         false if client needs to wait for block recovery.
1965   */
1966  boolean truncate(String src, long newLength, String clientName,
1967      String clientMachine, long mtime) throws IOException,
1968      UnresolvedLinkException {
1969
1970    String operationName = "truncate";
1971    requireEffectiveLayoutVersionForFeature(Feature.TRUNCATE);
1972    final FSDirTruncateOp.TruncateResult r;
1973    try {
1974      NameNode.stateChangeLog.debug(
1975          "DIR* NameSystem.truncate: src={} newLength={}", src, newLength);
1976      if (newLength < 0) {
1977        throw new HadoopIllegalArgumentException(
1978            "Cannot truncate to a negative file size: " + newLength + ".");
1979      }
1980      final FSPermissionChecker pc = getPermissionChecker();
1981      checkOperation(OperationCategory.WRITE);
1982      writeLock();
1983      BlocksMapUpdateInfo toRemoveBlocks = new BlocksMapUpdateInfo();
1984      try {
1985        checkOperation(OperationCategory.WRITE);
1986        checkNameNodeSafeMode("Cannot truncate for " + src);
1987        r = FSDirTruncateOp.truncate(this, src, newLength, clientName,
1988            clientMachine, mtime, toRemoveBlocks, pc);
1989      } finally {
1990        writeUnlock(operationName);
1991      }
1992      getEditLog().logSync();
1993      if (!toRemoveBlocks.getToDeleteList().isEmpty()) {
1994        removeBlocks(toRemoveBlocks);
1995        toRemoveBlocks.clear();
1996      }
1997      logAuditEvent(true, operationName, src, null, r.getFileStatus());
1998    } catch (AccessControlException e) {
1999      logAuditEvent(false, operationName, src);
2000      throw e;
2001    }
2002    return r.getResult();
2003  }
2004
2005  /**
2006   * Set the storage policy for a file or a directory.
2007   *
2008   * @param src file/directory path
2009   * @param policyName storage policy name
2010   */
2011  void setStoragePolicy(String src, String policyName) throws IOException {
2012    final String operationName = "setStoragePolicy";
2013    HdfsFileStatus auditStat;
2014    waitForLoadingFSImage();
2015    checkOperation(OperationCategory.WRITE);
2016    writeLock();
2017    try {
2018      checkOperation(OperationCategory.WRITE);
2019      checkNameNodeSafeMode("Cannot set storage policy for " + src);
2020      auditStat = FSDirAttrOp.setStoragePolicy(dir, blockManager, src,
2021                                               policyName);
2022    } catch (AccessControlException e) {
2023      logAuditEvent(false, operationName, src);
2024      throw e;
2025    } finally {
2026      writeUnlock(operationName);
2027    }
2028    getEditLog().logSync();
2029    logAuditEvent(true, operationName, src, null, auditStat);
2030  }
2031
2032  /**
2033   * unset storage policy set for a given file or a directory.
2034   *
2035   * @param src file/directory path
2036   */
2037  void unsetStoragePolicy(String src) throws IOException {
2038    final String operationName = "unsetStoragePolicy";
2039    HdfsFileStatus auditStat;
2040    checkOperation(OperationCategory.WRITE);
2041    writeLock();
2042    try {
2043      checkOperation(OperationCategory.WRITE);
2044      checkNameNodeSafeMode("Cannot unset storage policy for " + src);
2045      auditStat = FSDirAttrOp.unsetStoragePolicy(dir, blockManager, src);
2046    } catch (AccessControlException e) {
2047      logAuditEvent(false, operationName, src);
2048      throw e;
2049    } finally {
2050      writeUnlock(operationName);
2051    }
2052    getEditLog().logSync();
2053    logAuditEvent(true, operationName, src, null, auditStat);
2054  }
2055  /**
2056   * Get the storage policy for a file or a directory.
2057   *
2058   * @param src
2059   *          file/directory path
2060   * @return storage policy object
2061   */
2062  BlockStoragePolicy getStoragePolicy(String src) throws IOException {
2063    checkOperation(OperationCategory.READ);
2064    waitForLoadingFSImage();
2065    readLock();
2066    try {
2067      checkOperation(OperationCategory.READ);
2068      return FSDirAttrOp.getStoragePolicy(dir, blockManager, src);
2069    } finally {
2070      readUnlock("getStoragePolicy");
2071    }
2072  }
2073
2074  /**
2075   * @return All the existing block storage policies
2076   */
2077  BlockStoragePolicy[] getStoragePolicies() throws IOException {
2078    checkOperation(OperationCategory.READ);
2079    waitForLoadingFSImage();
2080    readLock();
2081    try {
2082      checkOperation(OperationCategory.READ);
2083      return FSDirAttrOp.getStoragePolicies(blockManager);
2084    } finally {
2085      readUnlock("getStoragePolicies");
2086    }
2087  }
2088
2089  long getPreferredBlockSize(String src) throws IOException {
2090    checkOperation(OperationCategory.READ);
2091    readLock();
2092    try {
2093      checkOperation(OperationCategory.READ);
2094      return FSDirAttrOp.getPreferredBlockSize(dir, src);
2095    } finally {
2096      readUnlock("getPreferredBlockSize");
2097    }
2098  }
2099
2100  /**
2101   * If the file is within an encryption zone, select the appropriate 
2102   * CryptoProtocolVersion from the list provided by the client. Since the
2103   * client may be newer, we need to handle unknown versions.
2104   *
2105   * @param zone EncryptionZone of the file
2106   * @param supportedVersions List of supported protocol versions
2107   * @return chosen protocol version
2108   * @throws IOException
2109   */
2110  CryptoProtocolVersion chooseProtocolVersion(
2111      EncryptionZone zone, CryptoProtocolVersion[] supportedVersions)
2112      throws UnknownCryptoProtocolVersionException, UnresolvedLinkException,
2113        SnapshotAccessControlException {
2114    Preconditions.checkNotNull(zone);
2115    Preconditions.checkNotNull(supportedVersions);
2116    // Right now, we only support a single protocol version,
2117    // so simply look for it in the list of provided options
2118    final CryptoProtocolVersion required = zone.getVersion();
2119
2120    for (CryptoProtocolVersion c : supportedVersions) {
2121      if (c.equals(CryptoProtocolVersion.UNKNOWN)) {
2122        if (LOG.isDebugEnabled()) {
2123          LOG.debug("Ignoring unknown CryptoProtocolVersion provided by " +
2124              "client: " + c.getUnknownValue());
2125        }
2126        continue;
2127      }
2128      if (c.equals(required)) {
2129        return c;
2130      }
2131    }
2132    throw new UnknownCryptoProtocolVersionException(
2133        "No crypto protocol versions provided by the client are supported."
2134            + " Client provided: " + Arrays.toString(supportedVersions)
2135            + " NameNode supports: " + Arrays.toString(CryptoProtocolVersion
2136            .values()));
2137  }
2138
2139  /**
2140   * Create a new file entry in the namespace.
2141   * 
2142   * For description of parameters and exceptions thrown see
2143   * {@link ClientProtocol#create}, except it returns valid file status upon
2144   * success
2145   */
2146  HdfsFileStatus startFile(String src, PermissionStatus permissions,
2147      String holder, String clientMachine, EnumSet<CreateFlag> flag,
2148      boolean createParent, short replication, long blockSize, 
2149      CryptoProtocolVersion[] supportedVersions, boolean logRetryCache)
2150      throws IOException {
2151
2152    HdfsFileStatus status;
2153    try {
2154      status = startFileInt(src, permissions, holder, clientMachine, flag,
2155          createParent, replication, blockSize, supportedVersions,
2156          logRetryCache);
2157    } catch (AccessControlException e) {
2158      logAuditEvent(false, "create", src);
2159      throw e;
2160    }
2161    logAuditEvent(true, "create", src, null, status);
2162    return status;
2163  }
2164
2165  private HdfsFileStatus startFileInt(String src,
2166      PermissionStatus permissions, String holder, String clientMachine,
2167      EnumSet<CreateFlag> flag, boolean createParent, short replication,
2168      long blockSize, CryptoProtocolVersion[] supportedVersions,
2169      boolean logRetryCache)
2170      throws IOException {
2171    if (NameNode.stateChangeLog.isDebugEnabled()) {
2172      StringBuilder builder = new StringBuilder();
2173      builder.append("DIR* NameSystem.startFile: src=").append(src)
2174          .append(", holder=").append(holder)
2175          .append(", clientMachine=").append(clientMachine)
2176          .append(", createParent=").append(createParent)
2177          .append(", replication=").append(replication)
2178          .append(", createFlag=").append(flag)
2179          .append(", blockSize=").append(blockSize)
2180          .append(", supportedVersions=")
2181          .append(Arrays.toString(supportedVersions));
2182      NameNode.stateChangeLog.debug(builder.toString());
2183    }
2184    if (!DFSUtil.isValidName(src) ||
2185        FSDirectory.isExactReservedName(src) ||
2186        (FSDirectory.isReservedName(src)
2187            && !FSDirectory.isReservedRawName(src)
2188            && !FSDirectory.isReservedInodesName(src))) {
2189      throw new InvalidPathException(src);
2190    }
2191    blockManager.verifyReplication(src, replication, clientMachine);
2192    if (blockSize < minBlockSize) {
2193      throw new IOException("Specified block size is less than configured" +
2194          " minimum value (" + DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY
2195          + "): " + blockSize + " < " + minBlockSize);
2196    }
2197
2198    FSPermissionChecker pc = getPermissionChecker();
2199    waitForLoadingFSImage();
2200    INodesInPath iip = null;
2201    boolean skipSync = true; // until we do something that might create edits
2202    HdfsFileStatus stat = null;
2203    BlocksMapUpdateInfo toRemoveBlocks = null;
2204
2205    checkOperation(OperationCategory.WRITE);
2206    writeLock();
2207    try {
2208      checkOperation(OperationCategory.WRITE);
2209      checkNameNodeSafeMode("Cannot create file" + src);
2210
2211      iip = FSDirWriteFileOp.resolvePathForStartFile(
2212          dir, pc, src, flag, createParent);
2213
2214      FileEncryptionInfo feInfo = null;
2215      if (provider != null) {
2216        EncryptionKeyInfo ezInfo = FSDirEncryptionZoneOp.getEncryptionKeyInfo(
2217            this, iip, supportedVersions);
2218        // if the path has an encryption zone, the lock was released while
2219        // generating the EDEK.  re-resolve the path to ensure the namesystem
2220        // and/or EZ has not mutated
2221        if (ezInfo != null) {
2222          checkOperation(OperationCategory.WRITE);
2223          iip = FSDirWriteFileOp.resolvePathForStartFile(
2224              dir, pc, iip.getPath(), flag, createParent);
2225          feInfo = FSDirEncryptionZoneOp.getFileEncryptionInfo(
2226              dir, iip, ezInfo);
2227        }
2228      }
2229
2230      skipSync = false; // following might generate edits
2231      toRemoveBlocks = new BlocksMapUpdateInfo();
2232      dir.writeLock();
2233      try {
2234        stat = FSDirWriteFileOp.startFile(this, iip, permissions, holder,
2235            clientMachine, flag, createParent,
2236            replication, blockSize, feInfo,
2237            toRemoveBlocks, logRetryCache);
2238      } catch (IOException e) {
2239        skipSync = e instanceof StandbyException;
2240        throw e;
2241      } finally {
2242        dir.writeUnlock();
2243      }
2244    } finally {
2245      writeUnlock("create");
2246      // There might be transactions logged while trying to recover the lease.
2247      // They need to be sync'ed even when an exception was thrown.
2248      if (!skipSync) {
2249        getEditLog().logSync();
2250        if (toRemoveBlocks != null) {
2251          removeBlocks(toRemoveBlocks);
2252          toRemoveBlocks.clear();
2253        }
2254      }
2255    }
2256
2257    return stat;
2258  }
2259
2260  /**
2261   * Recover lease;
2262   * Immediately revoke the lease of the current lease holder and start lease
2263   * recovery so that the file can be forced to be closed.
2264   * 
2265   * @param src the path of the file to start lease recovery
2266   * @param holder the lease holder's name
2267   * @param clientMachine the client machine's name
2268   * @return true if the file is already closed or
2269   *         if the lease can be released and the file can be closed.
2270   * @throws IOException
2271   */
2272  boolean recoverLease(String src, String holder, String clientMachine)
2273      throws IOException {
2274    boolean skipSync = false;
2275    FSPermissionChecker pc = getPermissionChecker();
2276    checkOperation(OperationCategory.WRITE);
2277    writeLock();
2278    try {
2279      checkOperation(OperationCategory.WRITE);
2280      checkNameNodeSafeMode("Cannot recover the lease of " + src);
2281      final INodesInPath iip = dir.resolvePath(pc, src, DirOp.WRITE);
2282      src = iip.getPath();
2283      final INodeFile inode = INodeFile.valueOf(iip.getLastINode(), src);
2284      if (!inode.isUnderConstruction()) {
2285        return true;
2286      }
2287      if (isPermissionEnabled) {
2288        dir.checkPathAccess(pc, iip, FsAction.WRITE);
2289      }
2290  
2291      return recoverLeaseInternal(RecoverLeaseOp.RECOVER_LEASE,
2292          iip, src, holder, clientMachine, true);
2293    } catch (StandbyException se) {
2294      skipSync = true;
2295      throw se;
2296    } finally {
2297      writeUnlock("recoverLease");
2298      // There might be transactions logged while trying to recover the lease.
2299      // They need to be sync'ed even when an exception was thrown.
2300      if (!skipSync) {
2301        getEditLog().logSync();
2302      }
2303    }
2304  }
2305
2306  enum RecoverLeaseOp {
2307    CREATE_FILE,
2308    APPEND_FILE,
2309    TRUNCATE_FILE,
2310    RECOVER_LEASE;
2311    
2312    private String getExceptionMessage(String src, String holder,
2313        String clientMachine, String reason) {
2314      return "Failed to " + this + " " + src + " for " + holder +
2315          " on " + clientMachine + " because " + reason;
2316    }
2317  }
2318
2319  boolean recoverLeaseInternal(RecoverLeaseOp op, INodesInPath iip,
2320      String src, String holder, String clientMachine, boolean force)
2321      throws IOException {
2322    assert hasWriteLock();
2323    INodeFile file = iip.getLastINode().asFile();
2324    if (file.isUnderConstruction()) {
2325      //
2326      // If the file is under construction , then it must be in our
2327      // leases. Find the appropriate lease record.
2328      //
2329      Lease lease = leaseManager.getLease(holder);
2330
2331      if (!force && lease != null) {
2332        Lease leaseFile = leaseManager.getLease(file);
2333        if (leaseFile != null && leaseFile.equals(lease)) {
2334          // We found the lease for this file but the original
2335          // holder is trying to obtain it again.
2336          throw new AlreadyBeingCreatedException(
2337              op.getExceptionMessage(src, holder, clientMachine,
2338                  holder + " is already the current lease holder."));
2339        }
2340      }
2341      //
2342      // Find the original holder.
2343      //
2344      FileUnderConstructionFeature uc = file.getFileUnderConstructionFeature();
2345      String clientName = uc.getClientName();
2346      lease = leaseManager.getLease(clientName);
2347      if (lease == null) {
2348        throw new AlreadyBeingCreatedException(
2349            op.getExceptionMessage(src, holder, clientMachine,
2350                "the file is under construction but no leases found."));
2351      }
2352      if (force) {
2353        // close now: no need to wait for soft lease expiration and 
2354        // close only the file src
2355        LOG.info("recoverLease: " + lease + ", src=" + src +
2356          " from client " + clientName);
2357        return internalReleaseLease(lease, src, iip, holder);
2358      } else {
2359        assert lease.getHolder().equals(clientName) :
2360          "Current lease holder " + lease.getHolder() +
2361          " does not match file creator " + clientName;
2362        //
2363        // If the original holder has not renewed in the last SOFTLIMIT 
2364        // period, then start lease recovery.
2365        //
2366        if (lease.expiredSoftLimit()) {
2367          LOG.info("startFile: recover " + lease + ", src=" + src + " client "
2368              + clientName);
2369          if (internalReleaseLease(lease, src, iip, null)) {
2370            return true;
2371          } else {
2372            throw new RecoveryInProgressException(
2373                op.getExceptionMessage(src, holder, clientMachine,
2374                    "lease recovery is in progress. Try again later."));
2375          }
2376        } else {
2377          final BlockInfo lastBlock = file.getLastBlock();
2378          if (lastBlock != null
2379              && lastBlock.getBlockUCState() == BlockUCState.UNDER_RECOVERY) {
2380            throw new RecoveryInProgressException(
2381                op.getExceptionMessage(src, holder, clientMachine,
2382                    "another recovery is in progress by "
2383                        + clientName + " on " + uc.getClientMachine()));
2384          } else {
2385            throw new AlreadyBeingCreatedException(
2386                op.getExceptionMessage(src, holder, clientMachine,
2387                    "this file lease is currently owned by "
2388                        + clientName + " on " + uc.getClientMachine()));
2389          }
2390        }
2391      }
2392    } else {
2393      return true;
2394     }
2395  }
2396
2397  /**
2398   * Append to an existing file in the namespace.
2399   */
2400  LastBlockWithStatus appendFile(String srcArg, String holder,
2401      String clientMachine, EnumSet<CreateFlag> flag, boolean logRetryCache)
2402      throws IOException {
2403    final String operationName = "append";
2404    boolean newBlock = flag.contains(CreateFlag.NEW_BLOCK);
2405    if (newBlock) {
2406      requireEffectiveLayoutVersionForFeature(Feature.APPEND_NEW_BLOCK);
2407    }
2408
2409    if (!supportAppends) {
2410      throw new UnsupportedOperationException(
2411          "Append is not enabled on this NameNode. Use the " +
2412          DFS_SUPPORT_APPEND_KEY + " configuration option to enable it.");
2413    }
2414
2415    NameNode.stateChangeLog.debug(
2416        "DIR* NameSystem.appendFile: src={}, holder={}, clientMachine={}",
2417        srcArg, holder, clientMachine);
2418    try {
2419      boolean skipSync = false;
2420      LastBlockWithStatus lbs = null;
2421      final FSPermissionChecker pc = getPermissionChecker();
2422      checkOperation(OperationCategory.WRITE);
2423      writeLock();
2424      try {
2425        checkOperation(OperationCategory.WRITE);
2426        checkNameNodeSafeMode("Cannot append to file" + srcArg);
2427        lbs = FSDirAppendOp.appendFile(this, srcArg, pc, holder, clientMachine,
2428            newBlock, logRetryCache);
2429      } catch (StandbyException se) {
2430        skipSync = true;
2431        throw se;
2432      } finally {
2433        writeUnlock(operationName);
2434        // There might be transactions logged while trying to recover the lease
2435        // They need to be sync'ed even when an exception was thrown.
2436        if (!skipSync) {
2437          getEditLog().logSync();
2438        }
2439      }
2440      logAuditEvent(true, operationName, srcArg);
2441      return lbs;
2442    } catch (AccessControlException e) {
2443      logAuditEvent(false, operationName, srcArg);
2444      throw e;
2445    }
2446  }
2447
2448  ExtendedBlock getExtendedBlock(Block blk) {
2449    return new ExtendedBlock(blockPoolId, blk);
2450  }
2451  
2452  void setBlockPoolId(String bpid) {
2453    blockPoolId = bpid;
2454    blockManager.setBlockPoolId(blockPoolId);
2455  }
2456
2457  /**
2458   * The client would like to obtain an additional block for the indicated
2459   * filename (which is being written-to).  Return an array that consists
2460   * of the block, plus a set of machines.  The first on this list should
2461   * be where the client writes data.  Subsequent items in the list must
2462   * be provided in the connection to the first datanode.
2463   *
2464   * Make sure the previous blocks have been reported by datanodes and
2465   * are replicated.  Will return an empty 2-elt array if we want the
2466   * client to "try again later".
2467   */
2468  LocatedBlock getAdditionalBlock(
2469      String src, long fileId, String clientName, ExtendedBlock previous,
2470      DatanodeInfo[] excludedNodes, String[] favoredNodes,
2471      EnumSet<AddBlockFlag> flags) throws IOException {
2472    final String operationName = "getAdditionalBlock";
2473    NameNode.stateChangeLog.debug("BLOCK* getAdditionalBlock: {} inodeId {}" +
2474        " for {}", src, fileId, clientName);
2475
2476    waitForLoadingFSImage();
2477    LocatedBlock[] onRetryBlock = new LocatedBlock[1];
2478    FSDirWriteFileOp.ValidateAddBlockResult r;
2479    FSPermissionChecker pc = getPermissionChecker();
2480    checkOperation(OperationCategory.READ);
2481    readLock();
2482    try {
2483      checkOperation(OperationCategory.READ);
2484      r = FSDirWriteFileOp.validateAddBlock(this, pc, src, fileId, clientName,
2485                                            previous, onRetryBlock);
2486    } finally {
2487      readUnlock(operationName);
2488    }
2489
2490    if (r == null) {
2491      assert onRetryBlock[0] != null : "Retry block is null";
2492      // This is a retry. Just return the last block.
2493      return onRetryBlock[0];
2494    }
2495
2496    DatanodeStorageInfo[] targets = FSDirWriteFileOp.chooseTargetForNewBlock(
2497        blockManager, src, excludedNodes, favoredNodes, flags, r);
2498
2499    checkOperation(OperationCategory.WRITE);
2500    writeLock();
2501    LocatedBlock lb;
2502    try {
2503      checkOperation(OperationCategory.WRITE);
2504      lb = FSDirWriteFileOp.storeAllocatedBlock(
2505          this, src, fileId, clientName, previous, targets);
2506    } finally {
2507      writeUnlock(operationName);
2508    }
2509    getEditLog().logSync();
2510    return lb;
2511  }
2512
2513  /** @see ClientProtocol#getAdditionalDatanode */
2514  LocatedBlock getAdditionalDatanode(String src, long fileId,
2515      final ExtendedBlock blk, final DatanodeInfo[] existings,
2516      final String[] storageIDs,
2517      final Set<Node> excludes,
2518      final int numAdditionalNodes, final String clientName
2519      ) throws IOException {
2520    //check if the feature is enabled
2521    dtpReplaceDatanodeOnFailure.checkEnabled();
2522
2523    Node clientnode = null;
2524    String clientMachine;
2525    final long preferredblocksize;
2526    final byte storagePolicyID;
2527    final List<DatanodeStorageInfo> chosen;
2528    checkOperation(OperationCategory.READ);
2529    FSPermissionChecker pc = getPermissionChecker();
2530    readLock();
2531    try {
2532      checkOperation(OperationCategory.READ);
2533      //check safe mode
2534      checkNameNodeSafeMode("Cannot add datanode; src=" + src + ", blk=" + blk);
2535      final INodesInPath iip = dir.resolvePath(pc, src, fileId);
2536      src = iip.getPath();
2537
2538      //check lease
2539      final INodeFile file = checkLease(iip, clientName, fileId);
2540      clientMachine = file.getFileUnderConstructionFeature().getClientMachine();
2541      clientnode = blockManager.getDatanodeManager().getDatanodeByHost(clientMachine);
2542      preferredblocksize = file.getPreferredBlockSize();
2543      storagePolicyID = file.getStoragePolicyID();
2544
2545      //find datanode storages
2546      final DatanodeManager dm = blockManager.getDatanodeManager();
2547      chosen = Arrays.asList(dm.getDatanodeStorageInfos(existings, storageIDs,
2548          "src=%s, fileId=%d, blk=%s, clientName=%s, clientMachine=%s",
2549          src, fileId, blk, clientName, clientMachine));
2550    } finally {
2551      readUnlock("getAdditionalDatanode");
2552    }
2553
2554    if (clientnode == null) {
2555      clientnode = FSDirWriteFileOp.getClientNode(blockManager, clientMachine);
2556    }
2557
2558    // choose new datanodes.
2559    final DatanodeStorageInfo[] targets = blockManager.chooseTarget4AdditionalDatanode(
2560        src, numAdditionalNodes, clientnode, chosen, 
2561        excludes, preferredblocksize, storagePolicyID);
2562    final LocatedBlock lb = BlockManager.newLocatedBlock(
2563        blk, targets, -1, false);
2564    blockManager.setBlockToken(lb, BlockTokenIdentifier.AccessMode.COPY);
2565    return lb;
2566  }
2567
2568  /**
2569   * The client would like to let go of the given block
2570   */
2571  void abandonBlock(ExtendedBlock b, long fileId, String src, String holder)
2572      throws IOException {
2573    NameNode.stateChangeLog.debug(
2574        "BLOCK* NameSystem.abandonBlock: {} of file {}", b, src);
2575    waitForLoadingFSImage();
2576    checkOperation(OperationCategory.WRITE);
2577    FSPermissionChecker pc = getPermissionChecker();
2578    writeLock();
2579    try {
2580      checkOperation(OperationCategory.WRITE);
2581      checkNameNodeSafeMode("Cannot abandon block " + b + " for file" + src);
2582      FSDirWriteFileOp.abandonBlock(dir, pc, b, fileId, src, holder);
2583      NameNode.stateChangeLog.debug("BLOCK* NameSystem.abandonBlock: {} is" +
2584          " removed from pendingCreates", b);
2585    } finally {
2586      writeUnlock("abandonBlock");
2587    }
2588    getEditLog().logSync();
2589  }
2590
2591  private String leaseExceptionString(String src, long fileId, String holder) {
2592    final Lease lease = leaseManager.getLease(holder);
2593    return src + " (inode " + fileId + ") " + (lease != null? lease.toString()
2594        : "Holder " + holder + " does not have any open files.");
2595  }
2596
2597  INodeFile checkLease(INodesInPath iip, String holder, long fileId)
2598      throws LeaseExpiredException, FileNotFoundException {
2599    String src = iip.getPath();
2600    INode inode = iip.getLastINode();
2601    assert hasReadLock();
2602    if (inode == null) {
2603      throw new FileNotFoundException("File does not exist: "
2604          + leaseExceptionString(src, fileId, holder));
2605    }
2606    if (!inode.isFile()) {
2607      throw new LeaseExpiredException("INode is not a regular file: "
2608          + leaseExceptionString(src, fileId, holder));
2609    }
2610    final INodeFile file = inode.asFile();
2611    if (!file.isUnderConstruction()) {
2612      throw new LeaseExpiredException("File is not open for writing: "
2613          + leaseExceptionString(src, fileId, holder));
2614    }
2615    // No further modification is allowed on a deleted file.
2616    // A file is considered deleted, if it is not in the inodeMap or is marked
2617    // as deleted in the snapshot feature.
2618    if (isFileDeleted(file)) {
2619      throw new FileNotFoundException("File is deleted: "
2620          + leaseExceptionString(src, fileId, holder));
2621    }
2622    final String owner = file.getFileUnderConstructionFeature().getClientName();
2623    if (holder != null && !owner.equals(holder)) {
2624      throw new LeaseExpiredException("Client (=" + holder
2625          + ") is not the lease owner (=" + owner + ": "
2626          + leaseExceptionString(src, fileId, holder));
2627    }
2628    return file;
2629  }
2630 
2631  /**
2632   * Complete in-progress write to the given file.
2633   * @return true if successful, false if the client should continue to retry
2634   *         (e.g if not all blocks have reached minimum replication yet)
2635   * @throws IOException on error (eg lease mismatch, file not open, file deleted)
2636   */
2637  boolean completeFile(final String src, String holder,
2638                       ExtendedBlock last, long fileId)
2639    throws IOException {
2640    boolean success = false;
2641    checkOperation(OperationCategory.WRITE);
2642    waitForLoadingFSImage();
2643    FSPermissionChecker pc = getPermissionChecker();
2644    writeLock();
2645    try {
2646      checkOperation(OperationCategory.WRITE);
2647      checkNameNodeSafeMode("Cannot complete file " + src);
2648      success = FSDirWriteFileOp.completeFile(this, pc, src, holder, last,
2649                                              fileId);
2650    } finally {
2651      writeUnlock("completeFile");
2652    }
2653    getEditLog().logSync();
2654    return success;
2655  }
2656
2657  /**
2658   * Create new block with a unique block id and a new generation stamp.
2659   */
2660  Block createNewBlock() throws IOException {
2661    assert hasWriteLock();
2662    Block b = new Block(nextBlockId(), 0, 0);
2663    // Increment the generation stamp for every new block.
2664    b.setGenerationStamp(nextGenerationStamp(false));
2665    return b;
2666  }
2667
2668  /**
2669   * Check that the indicated file's blocks are present and
2670   * replicated.  If not, return false. If checkall is true, then check
2671   * all blocks, otherwise check only penultimate block.
2672   */
2673  boolean checkFileProgress(String src, INodeFile v, boolean checkall) {
2674    assert hasReadLock();
2675    if (checkall) {
2676      return checkBlocksComplete(src, true, v.getBlocks());
2677    } else {
2678      final BlockInfo[] blocks = v.getBlocks();
2679      final int i = blocks.length - numCommittedAllowed - 2;
2680      return i < 0 || blocks[i] == null
2681          || checkBlocksComplete(src, false, blocks[i]);
2682    }
2683  }
2684
2685  /**
2686   * Check if the blocks are COMPLETE;
2687   * it may allow the last block to be COMMITTED.
2688   */
2689  private boolean checkBlocksComplete(String src, boolean allowCommittedBlock,
2690      BlockInfo... blocks) {
2691    final int n = allowCommittedBlock? numCommittedAllowed: 0;
2692    for(int i = 0; i < blocks.length; i++) {
2693      final short min = blockManager.getMinReplication();
2694      final String err = INodeFile.checkBlockComplete(blocks, i, n, min);
2695      if (err != null) {
2696        final int numNodes = blocks[i].numNodes();
2697        LOG.info("BLOCK* " + err + "(numNodes= " + numNodes
2698            + (numNodes < min ? " < " : " >= ")
2699            + " minimum = " + min + ") in file " + src);
2700        return false;
2701      }
2702    }
2703    return true;
2704  }
2705
2706  /**
2707   * Change the indicated filename. 
2708   * @deprecated Use {@link #renameTo(String, String, boolean,
2709   * Options.Rename...)} instead.
2710   */
2711  @Deprecated
2712  boolean renameTo(String src, String dst, boolean logRetryCache)
2713      throws IOException {
2714    waitForLoadingFSImage();
2715    final String operationName = "rename";
2716    FSDirRenameOp.RenameResult ret = null;
2717    writeLock();
2718    try {
2719      checkOperation(OperationCategory.WRITE);
2720      checkNameNodeSafeMode("Cannot rename " + src);
2721      ret = FSDirRenameOp.renameToInt(dir, src, dst, logRetryCache);
2722    } catch (AccessControlException e)  {
2723      logAuditEvent(false, operationName, src, dst, null);
2724      throw e;
2725    } finally {
2726      writeUnlock(operationName);
2727    }
2728    boolean success = ret.success;
2729    if (success) {
2730      getEditLog().logSync();
2731    }
2732    logAuditEvent(success, operationName, src, dst, ret.auditStat);
2733    return success;
2734  }
2735
2736  void renameTo(final String src, final String dst,
2737                boolean logRetryCache, Options.Rename... options)
2738      throws IOException {
2739    waitForLoadingFSImage();
2740    final String operationName = "rename";
2741    FSDirRenameOp.RenameResult res = null;
2742    writeLock();
2743    try {
2744      checkOperation(OperationCategory.WRITE);
2745      checkNameNodeSafeMode("Cannot rename " + src);
2746      res = FSDirRenameOp.renameToInt(dir, src, dst, logRetryCache, options);
2747    } catch (AccessControlException e) {
2748      logAuditEvent(false, operationName + " (options=" +
2749          Arrays.toString(options) + ")", src, dst, null);
2750      throw e;
2751    } finally {
2752      writeUnlock(operationName);
2753    }
2754
2755    getEditLog().logSync();
2756
2757    BlocksMapUpdateInfo collectedBlocks = res.collectedBlocks;
2758    if (!collectedBlocks.getToDeleteList().isEmpty()) {
2759      removeBlocks(collectedBlocks);
2760      collectedBlocks.clear();
2761    }
2762
2763    logAuditEvent(true, operationName + " (options=" +
2764        Arrays.toString(options) + ")", src, dst, res.auditStat);
2765  }
2766
2767  /**
2768   * Remove the indicated file from namespace.
2769   * 
2770   * @see ClientProtocol#delete(String, boolean) for detailed description and 
2771   * description of exceptions
2772   */
2773  boolean delete(String src, boolean recursive, boolean logRetryCache)
2774      throws IOException {
2775    waitForLoadingFSImage();
2776    final String operationName = "delete";
2777    BlocksMapUpdateInfo toRemovedBlocks = null;
2778    writeLock();
2779    boolean ret = false;
2780    try {
2781      checkOperation(OperationCategory.WRITE);
2782      checkNameNodeSafeMode("Cannot delete " + src);
2783      toRemovedBlocks = FSDirDeleteOp.delete(
2784          this, src, recursive, logRetryCache);
2785      ret = toRemovedBlocks != null;
2786    } catch (AccessControlException e) {
2787      logAuditEvent(false, operationName, src);
2788      throw e;
2789    } finally {
2790      writeUnlock(operationName);
2791    }
2792    getEditLog().logSync();
2793    if (toRemovedBlocks != null) {
2794      removeBlocks(toRemovedBlocks); // Incremental deletion of blocks
2795    }
2796    logAuditEvent(true, operationName, src);
2797    return ret;
2798  }
2799
2800  FSPermissionChecker getPermissionChecker()
2801      throws AccessControlException {
2802    return dir.getPermissionChecker();
2803  }
2804
2805  /**
2806   * From the given list, incrementally remove the blocks from blockManager
2807   * Writelock is dropped and reacquired every BLOCK_DELETION_INCREMENT to
2808   * ensure that other waiters on the lock can get in. See HDFS-2938
2809   * 
2810   * @param blocks
2811   *          An instance of {@link BlocksMapUpdateInfo} which contains a list
2812   *          of blocks that need to be removed from blocksMap
2813   */
2814  void removeBlocks(BlocksMapUpdateInfo blocks) {
2815    List<BlockInfo> toDeleteList = blocks.getToDeleteList();
2816    Iterator<BlockInfo> iter = toDeleteList.iterator();
2817    while (iter.hasNext()) {
2818      writeLock();
2819      try {
2820        for (int i = 0; i < BLOCK_DELETION_INCREMENT && iter.hasNext(); i++) {
2821          blockManager.removeBlock(iter.next());
2822        }
2823      } finally {
2824        writeUnlock("removeBlocks");
2825      }
2826    }
2827  }
2828  
2829  /**
2830   * Remove leases and inodes related to a given path
2831   * @param removedUCFiles INodes whose leases need to be released
2832   * @param removedINodes Containing the list of inodes to be removed from
2833   *                      inodesMap
2834   * @param acquireINodeMapLock Whether to acquire the lock for inode removal
2835   */
2836  void removeLeasesAndINodes(List<Long> removedUCFiles,
2837      List<INode> removedINodes,
2838      final boolean acquireINodeMapLock) {
2839    assert hasWriteLock();
2840    for(long i : removedUCFiles) {
2841      leaseManager.removeLease(i);
2842    }
2843    // remove inodes from inodesMap
2844    if (removedINodes != null) {
2845      if (acquireINodeMapLock) {
2846        dir.writeLock();
2847      }
2848      try {
2849        dir.removeFromInodeMap(removedINodes);
2850      } finally {
2851        if (acquireINodeMapLock) {
2852          dir.writeUnlock();
2853        }
2854      }
2855      removedINodes.clear();
2856    }
2857  }
2858
2859  /**
2860   * Removes the blocks from blocksmap and updates the safemode blocks total
2861   * 
2862   * @param blocks
2863   *          An instance of {@link BlocksMapUpdateInfo} which contains a list
2864   *          of blocks that need to be removed from blocksMap
2865   */
2866  void removeBlocksAndUpdateSafemodeTotal(BlocksMapUpdateInfo blocks) {
2867    assert hasWriteLock();
2868    // In the case that we are a Standby tailing edits from the
2869    // active while in safe-mode, we need to track the total number
2870    // of blocks and safe blocks in the system.
2871    boolean trackBlockCounts = isSafeModeTrackingBlocks();
2872    int numRemovedComplete = 0, numRemovedSafe = 0;
2873
2874    for (BlockInfo b : blocks.getToDeleteList()) {
2875      if (trackBlockCounts) {
2876        if (b.isComplete()) {
2877          numRemovedComplete++;
2878          if (blockManager.checkMinReplication(b)) {
2879            numRemovedSafe++;
2880          }
2881        }
2882      }
2883      blockManager.removeBlock(b);
2884    }
2885    if (trackBlockCounts) {
2886      if (LOG.isDebugEnabled()) {
2887        LOG.debug("Adjusting safe-mode totals for deletion."
2888            + "decreasing safeBlocks by " + numRemovedSafe
2889            + ", totalBlocks by " + numRemovedComplete);
2890      }
2891      adjustSafeModeBlockTotals(-numRemovedSafe, -numRemovedComplete);
2892    }
2893  }
2894
2895  /**
2896   * @see SafeModeInfo#shouldIncrementallyTrackBlocks
2897   */
2898  private boolean isSafeModeTrackingBlocks() {
2899    if (!haEnabled) {
2900      // Never track blocks incrementally in non-HA code.
2901      return false;
2902    }
2903    SafeModeInfo sm = this.safeMode;
2904    return sm != null && sm.shouldIncrementallyTrackBlocks();
2905  }
2906
2907  /**
2908   * Get the file info for a specific file.
2909   *
2910   * @param src The string representation of the path to the file
2911   * @param resolveLink whether to throw UnresolvedLinkException
2912   *        if src refers to a symlink
2913   *
2914   * @throws AccessControlException if access is denied
2915   * @throws UnresolvedLinkException if a symlink is encountered.
2916   *
2917   * @return object containing information regarding the file
2918   *         or null if file not found
2919   * @throws StandbyException
2920   */
2921  HdfsFileStatus getFileInfo(final String src, boolean resolveLink)
2922    throws IOException {
2923    final String operationName = "getfileinfo";
2924    checkOperation(OperationCategory.READ);
2925    HdfsFileStatus stat = null;
2926    readLock();
2927    try {
2928      checkOperation(OperationCategory.READ);
2929      stat = FSDirStatAndListingOp.getFileInfo(dir, src, resolveLink);
2930    } catch (AccessControlException e) {
2931      logAuditEvent(false, operationName, src);
2932      throw e;
2933    } finally {
2934      readUnlock(operationName);
2935    }
2936    logAuditEvent(true, operationName, src);
2937    return stat;
2938  }
2939
2940  /**
2941   * Returns true if the file is closed
2942   */
2943  boolean isFileClosed(final String src) throws IOException {
2944    final String operationName = "isFileClosed";
2945    checkOperation(OperationCategory.READ);
2946    readLock();
2947    try {
2948      checkOperation(OperationCategory.READ);
2949      return FSDirStatAndListingOp.isFileClosed(dir, src);
2950    } catch (AccessControlException e) {
2951      logAuditEvent(false, operationName, src);
2952      throw e;
2953    } finally {
2954      readUnlock(operationName);
2955    }
2956  }
2957
2958  /**
2959   * Create all the necessary directories
2960   */
2961  boolean mkdirs(String src, PermissionStatus permissions,
2962      boolean createParent) throws IOException {
2963    final String operationName = "mkdirs";
2964    HdfsFileStatus auditStat = null;
2965    checkOperation(OperationCategory.WRITE);
2966    writeLock();
2967    try {
2968      checkOperation(OperationCategory.WRITE);
2969      checkNameNodeSafeMode("Cannot create directory " + src);
2970      auditStat = FSDirMkdirOp.mkdirs(this, src, permissions, createParent);
2971    } catch (AccessControlException e) {
2972      logAuditEvent(false, operationName, src);
2973      throw e;
2974    } finally {
2975      writeUnlock(operationName);
2976    }
2977    getEditLog().logSync();
2978    logAuditEvent(true, operationName, src, null, auditStat);
2979    return true;
2980  }
2981
2982  /**
2983   * Get the content summary for a specific file/dir.
2984   *
2985   * @param src The string representation of the path to the file
2986   *
2987   * @throws AccessControlException if access is denied
2988   * @throws UnresolvedLinkException if a symlink is encountered.
2989   * @throws FileNotFoundException if no file exists
2990   * @throws StandbyException
2991   * @throws IOException for issues with writing to the audit log
2992   *
2993   * @return object containing information regarding the file
2994   *         or null if file not found
2995   */
2996  ContentSummary getContentSummary(final String src) throws IOException {
2997    checkOperation(OperationCategory.READ);
2998    final String operationName = "contentSummary";
2999    readLock();
3000    boolean success = true;
3001    try {
3002      checkOperation(OperationCategory.READ);
3003      return FSDirStatAndListingOp.getContentSummary(dir, src);
3004    } catch (AccessControlException ace) {
3005      success = false;
3006      throw ace;
3007    } finally {
3008      readUnlock(operationName);
3009      logAuditEvent(success, operationName, src);
3010    }
3011  }
3012
3013  /**
3014   * Get the quota usage for a specific file/dir.
3015   *
3016   * @param src The string representation of the path to the file
3017   *
3018   * @throws AccessControlException if access is denied
3019   * @throws UnresolvedLinkException if a symlink is encountered.
3020   * @throws FileNotFoundException if no file exists
3021   * @throws StandbyException
3022   * @throws IOException for issues with writing to the audit log
3023   *
3024   * @return object containing information regarding the file
3025   *         or null if file not found
3026   */
3027  QuotaUsage getQuotaUsage(final String src) throws IOException {
3028    checkOperation(OperationCategory.READ);
3029    final String operationName = "quotaUsage";
3030    readLock();
3031    boolean success = true;
3032    try {
3033      checkOperation(OperationCategory.READ);
3034      return FSDirStatAndListingOp.getQuotaUsage(dir, src);
3035    } catch (AccessControlException ace) {
3036      success = false;
3037      throw ace;
3038    } finally {
3039      readUnlock(operationName);
3040      logAuditEvent(success, operationName, src);
3041    }
3042  }
3043
3044  /**
3045   * Set the namespace quota and storage space quota for a directory.
3046   * See {@link ClientProtocol#setQuota(String, long, long, StorageType)} for the
3047   * contract.
3048   * 
3049   * Note: This does not support ".inodes" relative path.
3050   */
3051  void setQuota(String src, long nsQuota, long ssQuota, StorageType type)
3052      throws IOException {
3053    if (type != null) {
3054      requireEffectiveLayoutVersionForFeature(Feature.QUOTA_BY_STORAGE_TYPE);
3055    }
3056    checkOperation(OperationCategory.WRITE);
3057    final String operationName = "setQuota";
3058    writeLock();
3059    boolean success = false;
3060    try {
3061      checkOperation(OperationCategory.WRITE);
3062      checkNameNodeSafeMode("Cannot set quota on " + src);
3063      FSDirAttrOp.setQuota(dir, src, nsQuota, ssQuota, type);
3064      success = true;
3065    } finally {
3066      writeUnlock(operationName);
3067      if (success) {
3068        getEditLog().logSync();
3069      }
3070      logAuditEvent(success, operationName, src);
3071    }
3072  }
3073
3074  /** Persist all metadata about this file.
3075   * @param src The string representation of the path
3076   * @param fileId The inode ID that we're fsyncing.  Older clients will pass
3077   *               INodeId.GRANDFATHER_INODE_ID here.
3078   * @param clientName The string representation of the client
3079   * @param lastBlockLength The length of the last block 
3080   *                        under construction reported from client.
3081   * @throws IOException if path does not exist
3082   */
3083  void fsync(String src, long fileId, String clientName, long lastBlockLength)
3084      throws IOException {
3085    NameNode.stateChangeLog.info("BLOCK* fsync: " + src + " for " + clientName);
3086    checkOperation(OperationCategory.WRITE);
3087
3088    FSPermissionChecker pc = getPermissionChecker();
3089    waitForLoadingFSImage();
3090    writeLock();
3091    try {
3092      checkOperation(OperationCategory.WRITE);
3093      checkNameNodeSafeMode("Cannot fsync file " + src);
3094      INodesInPath iip = dir.resolvePath(pc, src, fileId);
3095      src = iip.getPath();
3096      final INodeFile pendingFile = checkLease(iip, clientName, fileId);
3097      if (lastBlockLength > 0) {
3098        pendingFile.getFileUnderConstructionFeature().updateLengthOfLastBlock(
3099            pendingFile, lastBlockLength);
3100      }
3101      FSDirWriteFileOp.persistBlocks(dir, src, pendingFile, false);
3102    } finally {
3103      writeUnlock("fsync");
3104    }
3105    getEditLog().logSync();
3106  }
3107
3108  /**
3109   * Move a file that is being written to be immutable.
3110   * @param src The filename
3111   * @param lease The lease for the client creating the file
3112   * @param recoveryLeaseHolder reassign lease to this holder if the last block
3113   *        needs recovery; keep current holder if null.
3114   * @throws AlreadyBeingCreatedException if file is waiting to achieve minimal
3115   *         replication;<br>
3116   *         RecoveryInProgressException if lease recovery is in progress.<br>
3117   *         IOException in case of an error.
3118   * @return true  if file has been successfully finalized and closed or 
3119   *         false if block recovery has been initiated. Since the lease owner
3120   *         has been changed and logged, caller should call logSync().
3121   */
3122  boolean internalReleaseLease(Lease lease, String src, INodesInPath iip,
3123      String recoveryLeaseHolder) throws IOException {
3124    LOG.info("Recovering " + lease + ", src=" + src);
3125    assert !isInSafeMode();
3126    assert hasWriteLock();
3127
3128    final INodeFile pendingFile = iip.getLastINode().asFile();
3129    int nrBlocks = pendingFile.numBlocks();
3130    BlockInfo[] blocks = pendingFile.getBlocks();
3131
3132    int nrCompleteBlocks;
3133    BlockInfo curBlock = null;
3134    for(nrCompleteBlocks = 0; nrCompleteBlocks < nrBlocks; nrCompleteBlocks++) {
3135      curBlock = blocks[nrCompleteBlocks];
3136      if(!curBlock.isComplete())
3137        break;
3138      assert blockManager.checkMinReplication(curBlock) :
3139              "A COMPLETE block is not minimally replicated in " + src;
3140    }
3141
3142    // If there are no incomplete blocks associated with this file,
3143    // then reap lease immediately and close the file.
3144    if(nrCompleteBlocks == nrBlocks) {
3145      finalizeINodeFileUnderConstruction(src, pendingFile,
3146          iip.getLatestSnapshotId(), false);
3147      NameNode.stateChangeLog.warn("BLOCK*" +
3148          " internalReleaseLease: All existing blocks are COMPLETE," +
3149          " lease removed, file " + src + " closed.");
3150      return true;  // closed!
3151    }
3152
3153    // Only the last and the penultimate blocks may be in non COMPLETE state.
3154    // If the penultimate block is not COMPLETE, then it must be COMMITTED.
3155    if(nrCompleteBlocks < nrBlocks - 2 ||
3156       nrCompleteBlocks == nrBlocks - 2 &&
3157         curBlock != null &&
3158         curBlock.getBlockUCState() != BlockUCState.COMMITTED) {
3159      final String message = "DIR* NameSystem.internalReleaseLease: "
3160        + "attempt to release a create lock on "
3161        + src + " but file is already closed.";
3162      NameNode.stateChangeLog.warn(message);
3163      throw new IOException(message);
3164    }
3165
3166    // The last block is not COMPLETE, and
3167    // that the penultimate block if exists is either COMPLETE or COMMITTED
3168    final BlockInfo lastBlock = pendingFile.getLastBlock();
3169    BlockUCState lastBlockState = lastBlock.getBlockUCState();
3170    BlockInfo penultimateBlock = pendingFile.getPenultimateBlock();
3171
3172    // If penultimate block doesn't exist then its minReplication is met
3173    boolean penultimateBlockMinReplication = penultimateBlock == null
3174        || blockManager.checkMinReplication(penultimateBlock);
3175
3176    switch(lastBlockState) {
3177    case COMPLETE:
3178      assert false : "Already checked that the last block is incomplete";
3179      break;
3180    case COMMITTED:
3181      // Close file if committed blocks are minimally replicated
3182      if(penultimateBlockMinReplication &&
3183          blockManager.checkMinReplication(lastBlock)) {
3184        finalizeINodeFileUnderConstruction(src, pendingFile,
3185            iip.getLatestSnapshotId(), false);
3186        NameNode.stateChangeLog.warn("BLOCK*" +
3187            " internalReleaseLease: Committed blocks are minimally" +
3188            " replicated, lease removed, file" + src + " closed.");
3189        return true;  // closed!
3190      }
3191      // Cannot close file right now, since some blocks 
3192      // are not yet minimally replicated.
3193      // This may potentially cause infinite loop in lease recovery
3194      // if there are no valid replicas on data-nodes.
3195      String message = "DIR* NameSystem.internalReleaseLease: " +
3196          "Failed to release lease for file " + src +
3197          ". Committed blocks are waiting to be minimally replicated." +
3198          " Try again later.";
3199      NameNode.stateChangeLog.warn(message);
3200      throw new AlreadyBeingCreatedException(message);
3201    case UNDER_CONSTRUCTION:
3202    case UNDER_RECOVERY:
3203      BlockUnderConstructionFeature uc =
3204          lastBlock.getUnderConstructionFeature();
3205      // determine if last block was intended to be truncated
3206      Block recoveryBlock = uc.getTruncateBlock();
3207      boolean truncateRecovery = recoveryBlock != null;
3208      boolean copyOnTruncate = truncateRecovery &&
3209          recoveryBlock.getBlockId() != lastBlock.getBlockId();
3210      assert !copyOnTruncate ||
3211          recoveryBlock.getBlockId() < lastBlock.getBlockId() &&
3212          recoveryBlock.getGenerationStamp() < lastBlock.getGenerationStamp() &&
3213          recoveryBlock.getNumBytes() > lastBlock.getNumBytes() :
3214            "wrong recoveryBlock";
3215
3216      // setup the last block locations from the blockManager if not known
3217      if (uc.getNumExpectedLocations() == 0) {
3218        uc.setExpectedLocations(lastBlock,
3219            blockManager.getStorages(lastBlock));
3220      }
3221
3222      if (uc.getNumExpectedLocations() == 0 && lastBlock.getNumBytes() == 0) {
3223        // There is no datanode reported to this block.
3224        // may be client have crashed before writing data to pipeline.
3225        // This blocks doesn't need any recovery.
3226        // We can remove this block and close the file.
3227        pendingFile.removeLastBlock(lastBlock);
3228        finalizeINodeFileUnderConstruction(src, pendingFile,
3229            iip.getLatestSnapshotId(), false);
3230        NameNode.stateChangeLog.warn("BLOCK* internalReleaseLease: "
3231            + "Removed empty last block and closed file " + src);
3232        return true;
3233      }
3234      // start recovery of the last block for this file
3235      long blockRecoveryId = nextGenerationStamp(
3236          blockIdManager.isLegacyBlock(lastBlock));
3237      lease = reassignLease(lease, src, recoveryLeaseHolder, pendingFile);
3238      if(copyOnTruncate) {
3239        lastBlock.setGenerationStamp(blockRecoveryId);
3240      } else if(truncateRecovery) {
3241        recoveryBlock.setGenerationStamp(blockRecoveryId);
3242      }
3243      uc.initializeBlockRecovery(lastBlock, blockRecoveryId);
3244      leaseManager.renewLease(lease);
3245      // Cannot close file right now, since the last block requires recovery.
3246      // This may potentially cause infinite loop in lease recovery
3247      // if there are no valid replicas on data-nodes.
3248      NameNode.stateChangeLog.warn(
3249                "DIR* NameSystem.internalReleaseLease: " +
3250                "File " + src + " has not been closed." +
3251               " Lease recovery is in progress. " +
3252                "RecoveryId = " + blockRecoveryId + " for block " + lastBlock);
3253      break;
3254    }
3255    return false;
3256  }
3257
3258  private Lease reassignLease(Lease lease, String src, String newHolder,
3259      INodeFile pendingFile) {
3260    assert hasWriteLock();
3261    if(newHolder == null)
3262      return lease;
3263    // The following transaction is not synced. Make sure it's sync'ed later.
3264    logReassignLease(lease.getHolder(), src, newHolder);
3265    return reassignLeaseInternal(lease, newHolder, pendingFile);
3266  }
3267  
3268  Lease reassignLeaseInternal(Lease lease, String newHolder, INodeFile pendingFile) {
3269    assert hasWriteLock();
3270    pendingFile.getFileUnderConstructionFeature().setClientName(newHolder);
3271    return leaseManager.reassignLease(lease, pendingFile, newHolder);
3272  }
3273
3274  void commitOrCompleteLastBlock(
3275      final INodeFile fileINode, final INodesInPath iip,
3276      final Block commitBlock) throws IOException {
3277    assert hasWriteLock();
3278    Preconditions.checkArgument(fileINode.isUnderConstruction());
3279    blockManager.commitOrCompleteLastBlock(fileINode, commitBlock, iip);
3280  }
3281
3282  void addCommittedBlocksToPending(final INodeFile pendingFile) {
3283    final BlockInfo[] blocks = pendingFile.getBlocks();
3284    int i = blocks.length - numCommittedAllowed;
3285    if (i < 0) {
3286      i = 0;
3287    }
3288    for(; i < blocks.length; i++) {
3289      final BlockInfo b = blocks[i];
3290      if (b != null && b.getBlockUCState() == BlockUCState.COMMITTED) {
3291        // b is COMMITTED but not yet COMPLETE, add it to pending replication.
3292        blockManager.addExpectedReplicasToPending(b, pendingFile);
3293      }
3294    }
3295  }
3296
3297  void finalizeINodeFileUnderConstruction(String src, INodeFile pendingFile,
3298      int latestSnapshot, boolean allowCommittedBlock) throws IOException {
3299    assert hasWriteLock();
3300
3301    FileUnderConstructionFeature uc = pendingFile.getFileUnderConstructionFeature();
3302    if (uc == null) {
3303      throw new IOException("Cannot finalize file " + src
3304          + " because it is not under construction");
3305    }
3306
3307    pendingFile.recordModification(latestSnapshot);
3308
3309    // The file is no longer pending.
3310    // Create permanent INode, update blocks. No need to replace the inode here
3311    // since we just remove the uc feature from pendingFile
3312    pendingFile.toCompleteFile(now(),
3313        allowCommittedBlock? numCommittedAllowed: 0,
3314        blockManager.getMinReplication());
3315
3316    leaseManager.removeLease(uc.getClientName(), pendingFile);
3317
3318    waitForLoadingFSImage();
3319    // close file and persist block allocations for this file
3320    closeFile(src, pendingFile);
3321
3322    blockManager.checkReplication(pendingFile);
3323  }
3324
3325  @VisibleForTesting
3326  BlockInfo getStoredBlock(Block block) {
3327    return blockManager.getStoredBlock(block);
3328  }
3329  
3330  @Override
3331  public boolean isInSnapshot(long blockCollectionID) {
3332    assert hasReadLock();
3333    final INodeFile bc = getBlockCollection(blockCollectionID);
3334    if (bc == null || !bc.isUnderConstruction()) {
3335      return false;
3336    }
3337
3338    String fullName = bc.getName();
3339    try {
3340      if (fullName != null && fullName.startsWith(Path.SEPARATOR)
3341          && dir.getINode(fullName, DirOp.READ) == bc) {
3342        // If file exists in normal path then no need to look in snapshot
3343        return false;
3344      }
3345    } catch (IOException e) {
3346      // the snapshot path and current path may contain symlinks, ancestor
3347      // dirs replaced by files, etc.
3348      LOG.error("Error while resolving the path : " + fullName, e);
3349      return false;
3350    }
3351    /*
3352     * 1. if bc is under construction and also with snapshot, and
3353     * bc is not in the current fsdirectory tree, bc must represent a snapshot
3354     * file. 
3355     * 2. if fullName is not an absolute path, bc cannot be existent in the 
3356     * current fsdirectory tree. 
3357     * 3. if bc is not the current node associated with fullName, bc must be a
3358     * snapshot inode.
3359     */
3360    return true;
3361  }
3362
3363  INodeFile getBlockCollection(BlockInfo b) {
3364    return getBlockCollection(b.getBlockCollectionId());
3365  }
3366
3367  @Override
3368  public INodeFile getBlockCollection(long id) {
3369    INode inode = getFSDirectory().getInode(id);
3370    return inode == null ? null : inode.asFile();
3371  }
3372
3373  void commitBlockSynchronization(ExtendedBlock oldBlock,
3374      long newgenerationstamp, long newlength,
3375      boolean closeFile, boolean deleteblock, DatanodeID[] newtargets,
3376      String[] newtargetstorages) throws IOException {
3377    LOG.info("commitBlockSynchronization(oldBlock=" + oldBlock
3378             + ", newgenerationstamp=" + newgenerationstamp
3379             + ", newlength=" + newlength
3380             + ", newtargets=" + Arrays.asList(newtargets)
3381             + ", closeFile=" + closeFile
3382             + ", deleteBlock=" + deleteblock
3383             + ")");
3384    checkOperation(OperationCategory.WRITE);
3385    final String src;
3386    waitForLoadingFSImage();
3387    writeLock();
3388    boolean copyTruncate = false;
3389    BlockInfo truncatedBlock = null;
3390    try {
3391      checkOperation(OperationCategory.WRITE);
3392      // If a DN tries to commit to the standby, the recovery will
3393      // fail, and the next retry will succeed on the new NN.
3394  
3395      checkNameNodeSafeMode(
3396          "Cannot commitBlockSynchronization while in safe mode");
3397      final BlockInfo storedBlock = getStoredBlock(
3398          ExtendedBlock.getLocalBlock(oldBlock));
3399      if (storedBlock == null) {
3400        if (deleteblock) {
3401          // This may be a retry attempt so ignore the failure
3402          // to locate the block.
3403          if (LOG.isDebugEnabled()) {
3404            LOG.debug("Block (=" + oldBlock + ") not found");
3405          }
3406          return;
3407        } else {
3408          throw new IOException("Block (=" + oldBlock + ") not found");
3409        }
3410      }
3411      final long oldGenerationStamp = storedBlock.getGenerationStamp();
3412      final long oldNumBytes = storedBlock.getNumBytes();
3413      //
3414      // The implementation of delete operation (see @deleteInternal method)
3415      // first removes the file paths from namespace, and delays the removal
3416      // of blocks to later time for better performance. When
3417      // commitBlockSynchronization (this method) is called in between, the
3418      // blockCollection of storedBlock could have been assigned to null by
3419      // the delete operation, throw IOException here instead of NPE; if the
3420      // file path is already removed from namespace by the delete operation,
3421      // throw FileNotFoundException here, so not to proceed to the end of
3422      // this method to add a CloseOp to the edit log for an already deleted
3423      // file (See HDFS-6825).
3424      //
3425      if (storedBlock.isDeleted()) {
3426        throw new IOException("The blockCollection of " + storedBlock
3427            + " is null, likely because the file owning this block was"
3428            + " deleted and the block removal is delayed");
3429      }
3430      final INodeFile iFile = getBlockCollection(storedBlock);
3431      src = iFile.getFullPathName();
3432      if (isFileDeleted(iFile)) {
3433        throw new FileNotFoundException("File not found: "
3434            + src + ", likely due to delayed block removal");
3435      }
3436      if ((!iFile.isUnderConstruction() || storedBlock.isComplete()) &&
3437          iFile.getLastBlock().isComplete()) {
3438        if (LOG.isDebugEnabled()) {
3439          LOG.debug("Unexpected block (=" + oldBlock
3440                    + ") since the file (=" + iFile.getLocalName()
3441                    + ") is not under construction");
3442        }
3443        return;
3444      }
3445
3446      truncatedBlock = iFile.getLastBlock();
3447      long recoveryId = truncatedBlock.getUnderConstructionFeature()
3448          .getBlockRecoveryId();
3449      copyTruncate = truncatedBlock.getBlockId() != storedBlock.getBlockId();
3450      if(recoveryId != newgenerationstamp) {
3451        throw new IOException("The recovery id " + newgenerationstamp
3452                              + " does not match current recovery id "
3453                              + recoveryId + " for block " + oldBlock);
3454      }
3455
3456      if (deleteblock) {
3457        Block blockToDel = ExtendedBlock.getLocalBlock(oldBlock);
3458        boolean remove = iFile.removeLastBlock(blockToDel) != null;
3459        if (remove) {
3460          blockManager.removeBlock(storedBlock);
3461        }
3462      }
3463      else {
3464        // update last block
3465        if(!copyTruncate) {
3466          storedBlock.setGenerationStamp(newgenerationstamp);
3467          storedBlock.setNumBytes(newlength);
3468        }
3469
3470        // find the DatanodeDescriptor objects
3471        ArrayList<DatanodeDescriptor> trimmedTargets =
3472            new ArrayList<DatanodeDescriptor>(newtargets.length);
3473        ArrayList<String> trimmedStorages =
3474            new ArrayList<String>(newtargets.length);
3475        if (newtargets.length > 0) {
3476          for (int i = 0; i < newtargets.length; ++i) {
3477            // try to get targetNode
3478            DatanodeDescriptor targetNode =
3479                blockManager.getDatanodeManager().getDatanode(newtargets[i]);
3480            if (targetNode != null) {
3481              trimmedTargets.add(targetNode);
3482              trimmedStorages.add(newtargetstorages[i]);
3483            } else if (LOG.isDebugEnabled()) {
3484              LOG.debug("DatanodeDescriptor (=" + newtargets[i] + ") not found");
3485            }
3486          }
3487        }
3488        if ((closeFile) && !trimmedTargets.isEmpty()) {
3489          // the file is getting closed. Insert block locations into blockManager.
3490          // Otherwise fsck will report these blocks as MISSING, especially if the
3491          // blocksReceived from Datanodes take a long time to arrive.
3492          for (int i = 0; i < trimmedTargets.size(); i++) {
3493            DatanodeStorageInfo storageInfo =
3494                trimmedTargets.get(i).getStorageInfo(trimmedStorages.get(i));
3495            if (storageInfo != null) {
3496              if(copyTruncate) {
3497                storageInfo.addBlock(truncatedBlock);
3498              } else {
3499                storageInfo.addBlock(storedBlock);
3500              }
3501            }
3502          }
3503        }
3504
3505        // add pipeline locations into the INodeUnderConstruction
3506        DatanodeStorageInfo[] trimmedStorageInfos =
3507            blockManager.getDatanodeManager().getDatanodeStorageInfos(
3508                trimmedTargets.toArray(new DatanodeID[trimmedTargets.size()]),
3509                trimmedStorages.toArray(new String[trimmedStorages.size()]),
3510                "src=%s, oldBlock=%s, newgenerationstamp=%d, newlength=%d",
3511                src, oldBlock, newgenerationstamp, newlength);
3512
3513        if(copyTruncate) {
3514          iFile.convertLastBlockToUC(truncatedBlock, trimmedStorageInfos);
3515        } else {
3516          iFile.convertLastBlockToUC(storedBlock, trimmedStorageInfos);
3517          if (closeFile) {
3518            blockManager.markBlockReplicasAsCorrupt(storedBlock,
3519                oldGenerationStamp, oldNumBytes, trimmedStorageInfos);
3520          }
3521        }
3522      }
3523
3524      if (closeFile) {
3525        if(copyTruncate) {
3526          closeFileCommitBlocks(src, iFile, truncatedBlock);
3527          if(!iFile.isBlockInLatestSnapshot(storedBlock)) {
3528            blockManager.removeBlock(storedBlock);
3529          }
3530        } else {
3531          closeFileCommitBlocks(src, iFile, storedBlock);
3532        }
3533      } else {
3534        // If this commit does not want to close the file, persist blocks
3535        FSDirWriteFileOp.persistBlocks(dir, src, iFile, false);
3536      }
3537    } finally {
3538      writeUnlock("commitBlockSynchronization");
3539    }
3540    getEditLog().logSync();
3541    if (closeFile) {
3542      LOG.info("commitBlockSynchronization(oldBlock=" + oldBlock
3543          + ", file=" + src
3544          + (copyTruncate ? ", newBlock=" + truncatedBlock
3545              : ", newgenerationstamp=" + newgenerationstamp)
3546          + ", newlength=" + newlength
3547          + ", newtargets=" + Arrays.asList(newtargets) + ") successful");
3548    } else {
3549      LOG.info("commitBlockSynchronization(" + oldBlock + ") successful");
3550    }
3551  }
3552
3553  /**
3554   * @param pendingFile open file that needs to be closed
3555   * @param storedBlock last block
3556   * @throws IOException on error
3557   */
3558  @VisibleForTesting
3559  void closeFileCommitBlocks(String src, INodeFile pendingFile,
3560      BlockInfo storedBlock) throws IOException {
3561    final INodesInPath iip = INodesInPath.fromINode(pendingFile);
3562
3563    // commit the last block and complete it if it has minimum replicas
3564    commitOrCompleteLastBlock(pendingFile, iip, storedBlock);
3565
3566    //remove lease, close file
3567    int s = Snapshot.findLatestSnapshot(pendingFile, Snapshot.CURRENT_STATE_ID);
3568    finalizeINodeFileUnderConstruction(src, pendingFile, s, false);
3569  }
3570
3571  /**
3572   * Renew the lease(s) held by the given client
3573   */
3574  void renewLease(String holder) throws IOException {
3575    checkOperation(OperationCategory.WRITE);
3576    readLock();
3577    try {
3578      checkOperation(OperationCategory.WRITE);
3579      checkNameNodeSafeMode("Cannot renew lease for " + holder);
3580      leaseManager.renewLease(holder);
3581    } finally {
3582      readUnlock("renewLease");
3583    }
3584  }
3585
3586  /**
3587   * Get a partial listing of the indicated directory
3588   *
3589   * @param src the directory name
3590   * @param startAfter the name to start after
3591   * @param needLocation if blockLocations need to be returned
3592   * @return a partial listing starting after startAfter
3593   * 
3594   * @throws AccessControlException if access is denied
3595   * @throws UnresolvedLinkException if symbolic link is encountered
3596   * @throws IOException if other I/O error occurred
3597   */
3598  DirectoryListing getListing(String src, byte[] startAfter,
3599      boolean needLocation) 
3600      throws IOException {
3601    checkOperation(OperationCategory.READ);
3602    final String operationName = "listStatus";
3603    DirectoryListing dl = null;
3604    readLock();
3605    try {
3606      checkOperation(NameNode.OperationCategory.READ);
3607      dl = getListingInt(dir, src, startAfter, needLocation);
3608    } catch (AccessControlException e) {
3609      logAuditEvent(false, operationName, src);
3610      throw e;
3611    } finally {
3612      readUnlock(operationName);
3613    }
3614    logAuditEvent(true, operationName, src);
3615    return dl;
3616  }
3617
3618  /////////////////////////////////////////////////////////
3619  //
3620  // These methods are called by datanodes
3621  //
3622  /////////////////////////////////////////////////////////
3623  /**
3624   * Register Datanode.
3625   * <p>
3626   * The purpose of registration is to identify whether the new datanode
3627   * serves a new data storage, and will report new data block copies,
3628   * which the namenode was not aware of; or the datanode is a replacement
3629   * node for the data storage that was previously served by a different
3630   * or the same (in terms of host:port) datanode.
3631   * The data storages are distinguished by their storageIDs. When a new
3632   * data storage is reported the namenode issues a new unique storageID.
3633   * <p>
3634   * Finally, the namenode returns its namespaceID as the registrationID
3635   * for the datanodes. 
3636   * namespaceID is a persistent attribute of the name space.
3637   * The registrationID is checked every time the datanode is communicating
3638   * with the namenode. 
3639   * Datanodes with inappropriate registrationID are rejected.
3640   * If the namenode stops, and then restarts it can restore its 
3641   * namespaceID and will continue serving the datanodes that has previously
3642   * registered with the namenode without restarting the whole cluster.
3643   * 
3644   * @see org.apache.hadoop.hdfs.server.datanode.DataNode
3645   */
3646  void registerDatanode(DatanodeRegistration nodeReg) throws IOException {
3647    writeLock();
3648    try {
3649      getBlockManager().getDatanodeManager().registerDatanode(nodeReg);
3650      checkSafeMode();
3651    } finally {
3652      writeUnlock("registerDatanode");
3653    }
3654  }
3655  
3656  /**
3657   * Get registrationID for datanodes based on the namespaceID.
3658   * 
3659   * @see #registerDatanode(DatanodeRegistration)
3660   * @return registration ID
3661   */
3662  String getRegistrationID() {
3663    return Storage.getRegistrationID(getFSImage().getStorage());
3664  }
3665
3666  /**
3667   * The given node has reported in.  This method should:
3668   * 1) Record the heartbeat, so the datanode isn't timed out
3669   * 2) Adjust usage stats for future block allocation
3670   * 
3671   * If a substantial amount of time passed since the last datanode 
3672   * heartbeat then request an immediate block report.  
3673   * 
3674   * @return an array of datanode commands 
3675   * @throws IOException
3676   */
3677  HeartbeatResponse handleHeartbeat(DatanodeRegistration nodeReg,
3678      StorageReport[] reports, long cacheCapacity, long cacheUsed,
3679      int xceiverCount, int xmitsInProgress, int failedVolumes,
3680      VolumeFailureSummary volumeFailureSummary,
3681      boolean requestFullBlockReportLease) throws IOException {
3682    readLock();
3683    try {
3684      //get datanode commands
3685      final int maxTransfer = blockManager.getMaxReplicationStreams()
3686          - xmitsInProgress;
3687      DatanodeCommand[] cmds = blockManager.getDatanodeManager().handleHeartbeat(
3688          nodeReg, reports, blockPoolId, cacheCapacity, cacheUsed,
3689          xceiverCount, maxTransfer, failedVolumes, volumeFailureSummary);
3690      long blockReportLeaseId = 0;
3691      if (requestFullBlockReportLease) {
3692        blockReportLeaseId =  blockManager.requestBlockReportLeaseId(nodeReg);
3693      }
3694      //create ha status
3695      final NNHAStatusHeartbeat haState = new NNHAStatusHeartbeat(
3696          haContext.getState().getServiceState(),
3697          getFSImage().getCorrectLastAppliedOrWrittenTxId());
3698
3699      return new HeartbeatResponse(cmds, haState, rollingUpgradeInfo,
3700          blockReportLeaseId);
3701    } finally {
3702      readUnlock("handleHeartbeat");
3703    }
3704  }
3705
3706  /**
3707   * Handles a lifeline message sent by a DataNode.  This method updates contact
3708   * information and statistics for the DataNode, so that it doesn't time out.
3709   * Unlike a heartbeat, this method does not dispatch any commands back to the
3710   * DataNode for local execution.  This method also cannot request a lease for
3711   * sending a full block report.  Lifeline messages are used only as a fallback
3712   * in case something prevents successful delivery of heartbeat messages.
3713   * Therefore, the implementation of this method must remain lightweight
3714   * compared to heartbeat handling.  It should avoid lock contention and
3715   * expensive computation.
3716   *
3717   * @param nodeReg registration info for DataNode sending the lifeline
3718   * @param reports storage reports from DataNode
3719   * @param cacheCapacity cache capacity at DataNode
3720   * @param cacheUsed cache used at DataNode
3721   * @param xceiverCount estimated count of transfer threads running at DataNode
3722   * @param xmitsInProgress count of transfers running at DataNode
3723   * @param failedVolumes count of failed volumes at DataNode
3724   * @param volumeFailureSummary info on failed volumes at DataNode
3725   * @throws IOException if there is an error
3726   */
3727  void handleLifeline(DatanodeRegistration nodeReg, StorageReport[] reports,
3728      long cacheCapacity, long cacheUsed, int xceiverCount, int xmitsInProgress,
3729      int failedVolumes, VolumeFailureSummary volumeFailureSummary)
3730      throws IOException {
3731    int maxTransfer = blockManager.getMaxReplicationStreams() - xmitsInProgress;
3732    blockManager.getDatanodeManager().handleLifeline(nodeReg, reports,
3733        getBlockPoolId(), cacheCapacity, cacheUsed, xceiverCount, maxTransfer,
3734        failedVolumes, volumeFailureSummary);
3735  }
3736
3737  /**
3738   * Returns whether or not there were available resources at the last check of
3739   * resources.
3740   *
3741   * @return true if there were sufficient resources available, false otherwise.
3742   */
3743  boolean nameNodeHasResourcesAvailable() {
3744    return hasResourcesAvailable;
3745  }
3746
3747  /**
3748   * Perform resource checks and cache the results.
3749   */
3750  void checkAvailableResources() {
3751    Preconditions.checkState(nnResourceChecker != null,
3752        "nnResourceChecker not initialized");
3753    hasResourcesAvailable = nnResourceChecker.hasAvailableDiskSpace();
3754  }
3755
3756  /**
3757   * Close file.
3758   * @param path
3759   * @param file
3760   */
3761  private void closeFile(String path, INodeFile file) {
3762    assert hasWriteLock();
3763    waitForLoadingFSImage();
3764    // file is closed
3765    getEditLog().logCloseFile(path, file);
3766    NameNode.stateChangeLog.debug("closeFile: {} with {} bloks is persisted" +
3767        " to the file system", path, file.getBlocks().length);
3768  }
3769
3770  /**
3771   * Periodically calls hasAvailableResources of NameNodeResourceChecker, and if
3772   * there are found to be insufficient resources available, causes the NN to
3773   * enter safe mode. If resources are later found to have returned to
3774   * acceptable levels, this daemon will cause the NN to exit safe mode.
3775   */
3776  class NameNodeResourceMonitor implements Runnable  {
3777    boolean shouldNNRmRun = true;
3778    @Override
3779    public void run () {
3780      try {
3781        while (fsRunning && shouldNNRmRun) {
3782          checkAvailableResources();
3783          if(!nameNodeHasResourcesAvailable()) {
3784            String lowResourcesMsg = "NameNode low on available disk space. ";
3785            if (!isInSafeMode()) {
3786              LOG.warn(lowResourcesMsg + "Entering safe mode.");
3787            } else {
3788              LOG.warn(lowResourcesMsg + "Already in safe mode.");
3789            }
3790            enterSafeMode(true);
3791          }
3792          try {
3793            Thread.sleep(resourceRecheckInterval);
3794          } catch (InterruptedException ie) {
3795            // Deliberately ignore
3796          }
3797        }
3798      } catch (Exception e) {
3799        FSNamesystem.LOG.error("Exception in NameNodeResourceMonitor: ", e);
3800      }
3801    }
3802
3803    public void stopMonitor() {
3804      shouldNNRmRun = false;
3805    }
3806 }
3807
3808  class NameNodeEditLogRoller implements Runnable {
3809
3810    private boolean shouldRun = true;
3811    private final long rollThreshold;
3812    private final long sleepIntervalMs;
3813
3814    public NameNodeEditLogRoller(long rollThreshold, int sleepIntervalMs) {
3815        this.rollThreshold = rollThreshold;
3816        this.sleepIntervalMs = sleepIntervalMs;
3817    }
3818
3819    @Override
3820    public void run() {
3821      while (fsRunning && shouldRun) {
3822        try {
3823          long numEdits = getCorrectTransactionsSinceLastLogRoll();
3824          if (numEdits > rollThreshold) {
3825            FSNamesystem.LOG.info("NameNode rolling its own edit log because"
3826                + " number of edits in open segment exceeds threshold of "
3827                + rollThreshold);
3828            rollEditLog();
3829          }
3830        } catch (Exception e) {
3831          FSNamesystem.LOG.error("Swallowing exception in "
3832              + NameNodeEditLogRoller.class.getSimpleName() + ":", e);
3833        }
3834        try {
3835          Thread.sleep(sleepIntervalMs);
3836        } catch (InterruptedException e) {
3837          FSNamesystem.LOG.info(NameNodeEditLogRoller.class.getSimpleName()
3838              + " was interrupted, exiting");
3839          break;
3840        }
3841      }
3842    }
3843
3844    public void stop() {
3845      shouldRun = false;
3846    }
3847  }
3848
3849  /**
3850   * Daemon to periodically scan the namespace for lazyPersist files
3851   * with missing blocks and unlink them.
3852   */
3853  class LazyPersistFileScrubber implements Runnable {
3854    private volatile boolean shouldRun = true;
3855    final int scrubIntervalSec;
3856    public LazyPersistFileScrubber(final int scrubIntervalSec) {
3857      this.scrubIntervalSec = scrubIntervalSec;
3858    }
3859
3860    /**
3861     * Periodically go over the list of lazyPersist files with missing
3862     * blocks and unlink them from the namespace.
3863     */
3864    private void clearCorruptLazyPersistFiles()
3865        throws IOException {
3866
3867      BlockStoragePolicy lpPolicy = blockManager.getStoragePolicy("LAZY_PERSIST");
3868
3869      List<BlockCollection> filesToDelete = new ArrayList<>();
3870      boolean changed = false;
3871      writeLock();
3872      try {
3873        final Iterator<BlockInfo> it =
3874            blockManager.getCorruptReplicaBlockIterator();
3875
3876        while (it.hasNext()) {
3877          Block b = it.next();
3878          BlockInfo blockInfo = blockManager.getStoredBlock(b);
3879          BlockCollection bc = getBlockCollection(blockInfo);
3880          if (bc.getStoragePolicyID() == lpPolicy.getId()) {
3881            filesToDelete.add(bc);
3882          }
3883        }
3884
3885        for (BlockCollection bc : filesToDelete) {
3886          LOG.warn("Removing lazyPersist file " + bc.getName() + " with no replicas.");
3887          BlocksMapUpdateInfo toRemoveBlocks =
3888              FSDirDeleteOp.deleteInternal(
3889                  FSNamesystem.this,
3890                  INodesInPath.fromINode((INodeFile) bc), false);
3891          changed |= toRemoveBlocks != null;
3892          if (toRemoveBlocks != null) {
3893            removeBlocks(toRemoveBlocks); // Incremental deletion of blocks
3894          }
3895        }
3896      } finally {
3897        writeUnlock("clearCorruptLazyPersistFiles");
3898      }
3899      if (changed) {
3900        getEditLog().logSync();
3901      }
3902    }
3903
3904    @Override
3905    public void run() {
3906      while (fsRunning && shouldRun) {
3907        try {
3908          if (!isInSafeMode()) {
3909            clearCorruptLazyPersistFiles();
3910          } else {
3911            if (FSNamesystem.LOG.isDebugEnabled()) {
3912              FSNamesystem.LOG
3913                  .debug("Namenode is in safemode, skipping scrubbing of corrupted lazy-persist files.");
3914            }
3915          }
3916        } catch (Exception e) {
3917          FSNamesystem.LOG.error(
3918              "Ignoring exception in LazyPersistFileScrubber:", e);
3919        }
3920
3921        try {
3922          Thread.sleep(scrubIntervalSec * 1000);
3923        } catch (InterruptedException e) {
3924          FSNamesystem.LOG.info(
3925              "LazyPersistFileScrubber was interrupted, exiting");
3926          break;
3927        }
3928      }
3929    }
3930
3931    public void stop() {
3932      shouldRun = false;
3933    }
3934  }
3935
3936  public FSImage getFSImage() {
3937    return fsImage;
3938  }
3939
3940  public FSEditLog getEditLog() {
3941    return getFSImage().getEditLog();
3942  }
3943
3944  @Metric({"MissingBlocks", "Number of missing blocks"})
3945  public long getMissingBlocksCount() {
3946    // not locking
3947    return blockManager.getMissingBlocksCount();
3948  }
3949
3950  @Metric({"MissingReplOneBlocks", "Number of missing blocks " +
3951      "with replication factor 1"})
3952  public long getMissingReplOneBlocksCount() {
3953    // not locking
3954    return blockManager.getMissingReplOneBlocksCount();
3955  }
3956  
3957  @Metric({"ExpiredHeartbeats", "Number of expired heartbeats"})
3958  public int getExpiredHeartbeats() {
3959    return datanodeStatistics.getExpiredHeartbeats();
3960  }
3961  
3962  @Metric({"TransactionsSinceLastCheckpoint",
3963      "Number of transactions since last checkpoint"})
3964  public long getTransactionsSinceLastCheckpoint() {
3965    return getFSImage().getLastAppliedOrWrittenTxId() -
3966        getFSImage().getStorage().getMostRecentCheckpointTxId();
3967  }
3968  
3969  @Metric({"TransactionsSinceLastLogRoll",
3970      "Number of transactions since last edit log roll"})
3971  public long getTransactionsSinceLastLogRoll() {
3972    if (isInStandbyState() || !getEditLog().isSegmentOpenWithoutLock()) {
3973      return 0;
3974    } else {
3975      return getEditLog().getLastWrittenTxIdWithoutLock() -
3976          getEditLog().getCurSegmentTxIdWithoutLock() + 1;
3977    }
3978  }
3979
3980  /**
3981   * Get the correct number of transactions since last edit log roll.
3982   * This method holds a lock of FSEditLog and must not be used for metrics.
3983   */
3984  private long getCorrectTransactionsSinceLastLogRoll() {
3985    if (isInStandbyState() || !getEditLog().isSegmentOpen()) {
3986      return 0;
3987    } else {
3988      return getEditLog().getLastWrittenTxId() -
3989          getEditLog().getCurSegmentTxId() + 1;
3990    }
3991  }
3992
3993  @Metric({"LastWrittenTransactionId", "Transaction ID written to the edit log"})
3994  public long getLastWrittenTransactionId() {
3995    return getEditLog().getLastWrittenTxIdWithoutLock();
3996  }
3997  
3998  @Metric({"LastCheckpointTime",
3999      "Time in milliseconds since the epoch of the last checkpoint"})
4000  public long getLastCheckpointTime() {
4001    return getFSImage().getStorage().getMostRecentCheckpointTime();
4002  }
4003
4004  /** @see ClientProtocol#getStats() */
4005  long[] getStats() {
4006    final long[] stats = datanodeStatistics.getStats();
4007    stats[ClientProtocol.GET_STATS_UNDER_REPLICATED_IDX] = getUnderReplicatedBlocks();
4008    stats[ClientProtocol.GET_STATS_CORRUPT_BLOCKS_IDX] = getCorruptReplicaBlocks();
4009    stats[ClientProtocol.GET_STATS_MISSING_BLOCKS_IDX] = getMissingBlocksCount();
4010    stats[ClientProtocol.GET_STATS_MISSING_REPL_ONE_BLOCKS_IDX] =
4011        getMissingReplOneBlocksCount();
4012    stats[ClientProtocol.GET_STATS_BYTES_IN_FUTURE_BLOCKS_IDX] =
4013        blockManager.getBytesInFuture();
4014    stats[ClientProtocol.GET_STATS_PENDING_DELETION_BLOCKS_IDX] =
4015        blockManager.getPendingDeletionBlocksCount();
4016    return stats;
4017  }
4018
4019  @Override // FSNamesystemMBean
4020  @Metric({"CapacityTotal",
4021      "Total raw capacity of data nodes in bytes"})
4022  public long getCapacityTotal() {
4023    return datanodeStatistics.getCapacityTotal();
4024  }
4025
4026  @Metric({"CapacityTotalGB",
4027      "Total raw capacity of data nodes in GB"})
4028  public float getCapacityTotalGB() {
4029    return DFSUtil.roundBytesToGB(getCapacityTotal());
4030  }
4031
4032  @Override // FSNamesystemMBean
4033  @Metric({"CapacityUsed",
4034      "Total used capacity across all data nodes in bytes"})
4035  public long getCapacityUsed() {
4036    return datanodeStatistics.getCapacityUsed();
4037  }
4038
4039  @Metric({"CapacityUsedGB",
4040      "Total used capacity across all data nodes in GB"})
4041  public float getCapacityUsedGB() {
4042    return DFSUtil.roundBytesToGB(getCapacityUsed());
4043  }
4044
4045  @Override // FSNamesystemMBean
4046  @Metric({"CapacityRemaining", "Remaining capacity in bytes"})
4047  public long getCapacityRemaining() {
4048    return datanodeStatistics.getCapacityRemaining();
4049  }
4050
4051  @Metric({"CapacityRemainingGB", "Remaining capacity in GB"})
4052  public float getCapacityRemainingGB() {
4053    return DFSUtil.roundBytesToGB(getCapacityRemaining());
4054  }
4055
4056  @Metric({"CapacityUsedNonDFS",
4057      "Total space used by data nodes for non DFS purposes in bytes"})
4058  public long getCapacityUsedNonDFS() {
4059    return datanodeStatistics.getCapacityUsedNonDFS();
4060  }
4061
4062  /**
4063   * Total number of connections.
4064   */
4065  @Override // FSNamesystemMBean
4066  @Metric
4067  public int getTotalLoad() {
4068    return datanodeStatistics.getXceiverCount();
4069  }
4070  
4071  @Metric({ "SnapshottableDirectories", "Number of snapshottable directories" })
4072  public int getNumSnapshottableDirs() {
4073    return this.snapshotManager.getNumSnapshottableDirs();
4074  }
4075
4076  @Metric({ "Snapshots", "The number of snapshots" })
4077  public int getNumSnapshots() {
4078    return this.snapshotManager.getNumSnapshots();
4079  }
4080
4081  @Override
4082  public String getSnapshotStats() {
4083    Map<String, Object> info = new HashMap<String, Object>();
4084    info.put("SnapshottableDirectories", this.getNumSnapshottableDirs());
4085    info.put("Snapshots", this.getNumSnapshots());
4086    return JSON.toString(info);
4087  }
4088
4089  @Override // FSNamesystemMBean
4090  @Metric({ "NumEncryptionZones", "The number of encryption zones" })
4091  public int getNumEncryptionZones() {
4092    return dir.ezManager.getNumEncryptionZones();
4093  }
4094
4095  /**
4096   * Returns the length of the wait Queue for the FSNameSystemLock.
4097   *
4098   * A larger number here indicates lots of threads are waiting for
4099   * FSNameSystemLock.
4100   *
4101   * @return int - Number of Threads waiting to acquire FSNameSystemLock
4102   */
4103  @Override
4104  @Metric({"LockQueueLength", "Number of threads waiting to " +
4105      "acquire FSNameSystemLock"})
4106  public int getFsLockQueueLength() {
4107    return fsLock.getQueueLength();
4108  }
4109
4110  int getNumberOfDatanodes(DatanodeReportType type) {
4111    readLock();
4112    try {
4113      return getBlockManager().getDatanodeManager().getDatanodeListForReport(
4114          type).size(); 
4115    } finally {
4116      readUnlock("getNumberOfDatanodes");
4117    }
4118  }
4119
4120  DatanodeInfo[] datanodeReport(final DatanodeReportType type
4121      ) throws AccessControlException, StandbyException {
4122    checkSuperuserPrivilege();
4123    checkOperation(OperationCategory.UNCHECKED);
4124    readLock();
4125    try {
4126      checkOperation(OperationCategory.UNCHECKED);
4127      final DatanodeManager dm = getBlockManager().getDatanodeManager();      
4128      final List<DatanodeDescriptor> results = dm.getDatanodeListForReport(type);
4129
4130      DatanodeInfo[] arr = new DatanodeInfo[results.size()];
4131      for (int i=0; i<arr.length; i++) {
4132        arr[i] = new DatanodeInfo(results.get(i));
4133      }
4134      return arr;
4135    } finally {
4136      readUnlock("datanodeReport");
4137    }
4138  }
4139
4140  DatanodeStorageReport[] getDatanodeStorageReport(final DatanodeReportType type
4141      ) throws AccessControlException, StandbyException {
4142    checkSuperuserPrivilege();
4143    checkOperation(OperationCategory.UNCHECKED);
4144    readLock();
4145    try {
4146      checkOperation(OperationCategory.UNCHECKED);
4147      final DatanodeManager dm = getBlockManager().getDatanodeManager();      
4148      final List<DatanodeDescriptor> datanodes = dm.getDatanodeListForReport(type);
4149
4150      DatanodeStorageReport[] reports = new DatanodeStorageReport[datanodes.size()];
4151      for (int i = 0; i < reports.length; i++) {
4152        final DatanodeDescriptor d = datanodes.get(i);
4153        reports[i] = new DatanodeStorageReport(new DatanodeInfo(d),
4154            d.getStorageReports());
4155      }
4156      return reports;
4157    } finally {
4158      readUnlock("getDatanodeStorageReport");
4159    }
4160  }
4161
4162  /**
4163   * Save namespace image.
4164   * This will save current namespace into fsimage file and empty edits file.
4165   * Requires superuser privilege and safe mode.
4166   * 
4167   * @throws AccessControlException if superuser privilege is violated.
4168   * @throws IOException if 
4169   */
4170  void saveNamespace() throws AccessControlException, IOException {
4171    checkOperation(OperationCategory.UNCHECKED);
4172    checkSuperuserPrivilege();
4173
4174    cpLock();  // Block if a checkpointing is in progress on standby.
4175    readLock();
4176    try {
4177      checkOperation(OperationCategory.UNCHECKED);
4178
4179      if (!isInSafeMode()) {
4180        throw new IOException("Safe mode should be turned ON "
4181            + "in order to create namespace image.");
4182      }
4183      getFSImage().saveNamespace(this);
4184    } finally {
4185      readUnlock("saveNamespace");
4186      cpUnlock();
4187    }
4188    LOG.info("New namespace image has been created");
4189  }
4190  
4191  /**
4192   * Enables/Disables/Checks restoring failed storage replicas if the storage becomes available again.
4193   * Requires superuser privilege.
4194   * 
4195   * @throws AccessControlException if superuser privilege is violated.
4196   */
4197  boolean restoreFailedStorage(String arg) throws AccessControlException,
4198      StandbyException {
4199    checkSuperuserPrivilege();
4200    checkOperation(OperationCategory.UNCHECKED);
4201    cpLock();  // Block if a checkpointing is in progress on standby.
4202    writeLock();
4203    try {
4204      checkOperation(OperationCategory.UNCHECKED);
4205      
4206      // if it is disabled - enable it and vice versa.
4207      if(arg.equals("check"))
4208        return getFSImage().getStorage().getRestoreFailedStorage();
4209      
4210      boolean val = arg.equals("true");  // false if not
4211      getFSImage().getStorage().setRestoreFailedStorage(val);
4212      
4213      return val;
4214    } finally {
4215      writeUnlock("restoreFailedStorage");
4216      cpUnlock();
4217    }
4218  }
4219
4220  Date getStartTime() {
4221    return new Date(startTime); 
4222  }
4223    
4224  void finalizeUpgrade() throws IOException {
4225    checkSuperuserPrivilege();
4226    checkOperation(OperationCategory.UNCHECKED);
4227    cpLock();  // Block if a checkpointing is in progress on standby.
4228    writeLock();
4229    try {
4230      checkOperation(OperationCategory.UNCHECKED);
4231      getFSImage().finalizeUpgrade(this.isHaEnabled() && inActiveState());
4232    } finally {
4233      writeUnlock("finalizeUpgrade");
4234      cpUnlock();
4235    }
4236  }
4237
4238  void refreshNodes() throws IOException {
4239    checkOperation(OperationCategory.UNCHECKED);
4240    checkSuperuserPrivilege();
4241    getBlockManager().getDatanodeManager().refreshNodes(new HdfsConfiguration());
4242  }
4243
4244  void setBalancerBandwidth(long bandwidth) throws IOException {
4245    checkOperation(OperationCategory.UNCHECKED);
4246    checkSuperuserPrivilege();
4247    getBlockManager().getDatanodeManager().setBalancerBandwidth(bandwidth);
4248  }
4249
4250  /**
4251   * SafeModeInfo contains information related to the safe mode.
4252   * <p>
4253   * An instance of {@link SafeModeInfo} is created when the name node
4254   * enters safe mode.
4255   * <p>
4256   * During name node startup {@link SafeModeInfo} counts the number of
4257   * <em>safe blocks</em>, those that have at least the minimal number of
4258   * replicas, and calculates the ratio of safe blocks to the total number
4259   * of blocks in the system, which is the size of blocks in
4260   * {@link FSNamesystem#blockManager}. When the ratio reaches the
4261   * {@link #threshold} it starts the SafeModeMonitor daemon in order
4262   * to monitor whether the safe mode {@link #extension} is passed.
4263   * Then it leaves safe mode and destroys itself.
4264   * <p>
4265   * If safe mode is turned on manually then the number of safe blocks is
4266   * not tracked because the name node is not intended to leave safe mode
4267   * automatically in the case.
4268   *
4269   * @see ClientProtocol#setSafeMode(HdfsConstants.SafeModeAction, boolean)
4270   */
4271  public class SafeModeInfo {
4272    // configuration fields
4273    /** Safe mode threshold condition %.*/
4274    private final double threshold;
4275    /** Safe mode minimum number of datanodes alive */
4276    private final int datanodeThreshold;
4277    /**
4278     * Safe mode extension after the threshold.
4279     * Make it volatile so that getSafeModeTip can read the latest value
4280     * without taking a lock.
4281     */
4282    private volatile int extension;
4283    /** Min replication required by safe mode. */
4284    private final int safeReplication;
4285    /** threshold for populating needed replication queues */
4286    private final double replQueueThreshold;
4287    // internal fields
4288    /** Time when threshold was reached.
4289     * <br> -1 safe mode is off
4290     * <br> 0 safe mode is on, and threshold is not reached yet
4291     * <br> >0 safe mode is on, but we are in extension period 
4292     */
4293    private long reached = -1;  
4294    private long reachedTimestamp = -1;
4295    /** Total number of blocks. */
4296    int blockTotal; 
4297    /** Number of safe blocks. */
4298    int blockSafe;
4299    /** Number of blocks needed to satisfy safe mode threshold condition */
4300    private int blockThreshold;
4301    /** Number of blocks needed before populating replication queues */
4302    private int blockReplQueueThreshold;
4303    /** time of the last status printout */
4304    private long lastStatusReport = 0;
4305    /**
4306     * Was safemode entered automatically because available resources were low.
4307     * Make it volatile so that getSafeModeTip can read the latest value
4308     * without taking a lock.
4309     */
4310    private volatile boolean resourcesLow = false;
4311    /** Should safemode adjust its block totals as blocks come in */
4312    private boolean shouldIncrementallyTrackBlocks = false;
4313    /** counter for tracking startup progress of reported blocks */
4314    private Counter awaitingReportedBlocksCounter;
4315    
4316    /**
4317     * Creates SafeModeInfo when the name node enters
4318     * automatic safe mode at startup.
4319     *  
4320     * @param conf configuration
4321     */
4322    private SafeModeInfo(Configuration conf) {
4323      this.threshold = conf.getFloat(DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_KEY,
4324          DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_DEFAULT);
4325      if(threshold > 1.0) {
4326        LOG.warn("The threshold value should't be greater than 1, threshold: " + threshold);
4327      }
4328      this.datanodeThreshold = conf.getInt(
4329        DFS_NAMENODE_SAFEMODE_MIN_DATANODES_KEY,
4330        DFS_NAMENODE_SAFEMODE_MIN_DATANODES_DEFAULT);
4331      this.extension = conf.getInt(DFS_NAMENODE_SAFEMODE_EXTENSION_KEY, 0);
4332      int minReplication =
4333          conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY,
4334              DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_DEFAULT);
4335      // DFS_NAMENODE_SAFEMODE_REPLICATION_MIN_KEY is an expert level setting,
4336      // setting this lower than the min replication is not recommended
4337      // and/or dangerous for production setups.
4338      // When it's unset, safeReplication will use dfs.namenode.replication.min
4339      this.safeReplication =
4340          conf.getInt(DFSConfigKeys.DFS_NAMENODE_SAFEMODE_REPLICATION_MIN_KEY,
4341              minReplication);
4342
4343      LOG.info(DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_KEY + " = " + threshold);
4344      LOG.info(DFS_NAMENODE_SAFEMODE_MIN_DATANODES_KEY + " = " + datanodeThreshold);
4345      LOG.info(DFS_NAMENODE_SAFEMODE_EXTENSION_KEY + "     = " + extension);
4346
4347      // default to safe mode threshold (i.e., don't populate queues before leaving safe mode)
4348      this.replQueueThreshold = 
4349        conf.getFloat(DFS_NAMENODE_REPL_QUEUE_THRESHOLD_PCT_KEY,
4350                      (float) threshold);
4351      this.blockTotal = 0; 
4352      this.blockSafe = 0;
4353    }
4354
4355    /**
4356     * In the HA case, the StandbyNode can be in safemode while the namespace
4357     * is modified by the edit log tailer. In this case, the number of total
4358     * blocks changes as edits are processed (eg blocks are added and deleted).
4359     * However, we don't want to do the incremental tracking during the
4360     * startup-time loading process -- only once the initial total has been
4361     * set after the image has been loaded.
4362     */
4363    private boolean shouldIncrementallyTrackBlocks() {
4364      return shouldIncrementallyTrackBlocks;
4365    }
4366
4367    /**
4368     * Creates SafeModeInfo when safe mode is entered manually, or because
4369     * available resources are low.
4370     *
4371     * The {@link #threshold} is set to 1.5 so that it could never be reached.
4372     * {@link #blockTotal} is set to -1 to indicate that safe mode is manual.
4373     * 
4374     * @see SafeModeInfo
4375     */
4376    private SafeModeInfo(boolean resourcesLow) {
4377      this.threshold = 1.5f;  // this threshold can never be reached
4378      this.datanodeThreshold = Integer.MAX_VALUE;
4379      this.extension = Integer.MAX_VALUE;
4380      this.safeReplication = Short.MAX_VALUE + 1; // more than maxReplication
4381      this.replQueueThreshold = 1.5f; // can never be reached
4382      this.blockTotal = -1;
4383      this.blockSafe = -1;
4384      this.resourcesLow = resourcesLow;
4385      enter();
4386      reportStatus("STATE* Safe mode is ON.", true);
4387    }
4388      
4389    /**
4390     * Check if safe mode is on.
4391     * @return true if in safe mode
4392     */
4393    private synchronized boolean isOn() {
4394      doConsistencyCheck();
4395      return this.reached >= 0;
4396    }
4397      
4398    /**
4399     * Enter safe mode.
4400     */
4401    private void enter() {
4402      this.reached = 0;
4403      this.reachedTimestamp = 0;
4404    }
4405      
4406    /**
4407     * Leave safe mode.
4408     * <p>
4409     * Check for invalid, under- & over-replicated blocks in the end of startup.
4410     * @param force - true to force exit
4411     */
4412    private synchronized void leave(boolean force) {
4413      // if not done yet, initialize replication queues.
4414      // In the standby, do not populate repl queues
4415      if (!blockManager.isPopulatingReplQueues() && blockManager.shouldPopulateReplQueues()) {
4416        blockManager.initializeReplQueues();
4417      }
4418
4419
4420      if (!force && (blockManager.getBytesInFuture() > 0)) {
4421        LOG.error("Refusing to leave safe mode without a force flag. " +
4422            "Exiting safe mode will cause a deletion of " + blockManager
4423            .getBytesInFuture() + " byte(s). Please use " +
4424            "-forceExit flag to exit safe mode forcefully if data loss is " +
4425            "acceptable.");
4426        return;
4427      }
4428
4429      long timeInSafemode = now() - startTime;
4430      NameNode.stateChangeLog.info("STATE* Leaving safe mode after " 
4431                                    + timeInSafemode/1000 + " secs");
4432      NameNode.getNameNodeMetrics().setSafeModeTime((int) timeInSafemode);
4433
4434      //Log the following only once (when transitioning from ON -> OFF)
4435      if (reached >= 0) {
4436        NameNode.stateChangeLog.info("STATE* Safe mode is OFF"); 
4437      }
4438      reached = -1;
4439      reachedTimestamp = -1;
4440      safeMode = null;
4441      final NetworkTopology nt = blockManager.getDatanodeManager().getNetworkTopology();
4442      NameNode.stateChangeLog.info("STATE* Network topology has "
4443          + nt.getNumOfRacks() + " racks and "
4444          + nt.getNumOfLeaves() + " datanodes");
4445      NameNode.stateChangeLog.info("STATE* UnderReplicatedBlocks has "
4446          + blockManager.numOfUnderReplicatedBlocks() + " blocks");
4447
4448      startSecretManagerIfNecessary();
4449
4450      // If startup has not yet completed, end safemode phase.
4451      StartupProgress prog = NameNode.getStartupProgress();
4452      if (prog.getStatus(Phase.SAFEMODE) != Status.COMPLETE) {
4453        prog.endStep(Phase.SAFEMODE, STEP_AWAITING_REPORTED_BLOCKS);
4454        prog.endPhase(Phase.SAFEMODE);
4455      }
4456    }
4457
4458    /**
4459     * Check whether we have reached the threshold for 
4460     * initializing replication queues.
4461     */
4462    private synchronized boolean canInitializeReplQueues() {
4463      return blockManager.shouldPopulateReplQueues()
4464          && blockSafe >= blockReplQueueThreshold;
4465    }
4466      
4467    /** 
4468     * Safe mode can be turned off iff 
4469     * the threshold is reached and 
4470     * the extension time have passed.
4471     * @return true if can leave or false otherwise.
4472     */
4473    private synchronized boolean canLeave() {
4474      if (reached == 0) {
4475        return false;
4476      }
4477
4478      if (monotonicNow() - reached < extension) {
4479        reportStatus("STATE* Safe mode ON, in safe mode extension.", false);
4480        return false;
4481      }
4482
4483      if (needEnter()) {
4484        reportStatus("STATE* Safe mode ON, thresholds not met.", false);
4485        return false;
4486      }
4487
4488      return true;
4489    }
4490      
4491    /** 
4492     * There is no need to enter safe mode 
4493     * if DFS is empty or {@link #threshold} == 0
4494     */
4495    private boolean needEnter() {
4496      return (threshold != 0 && blockSafe < blockThreshold) ||
4497        (datanodeThreshold != 0 && getNumLiveDataNodes() < datanodeThreshold) ||
4498        (!nameNodeHasResourcesAvailable());
4499    }
4500      
4501    /**
4502     * Check and trigger safe mode if needed. 
4503     */
4504    private void checkMode() {
4505      // Have to have write-lock since leaving safemode initializes
4506      // repl queues, which requires write lock
4507      assert hasWriteLock();
4508      if (inTransitionToActive()) {
4509        return;
4510      }
4511      // if smmthread is already running, the block threshold must have been 
4512      // reached before, there is no need to enter the safe mode again
4513      if (smmthread == null && needEnter()) {
4514        enter();
4515        // check if we are ready to initialize replication queues
4516        if (canInitializeReplQueues() && !blockManager.isPopulatingReplQueues()
4517            && !haEnabled) {
4518          blockManager.initializeReplQueues();
4519        }
4520        reportStatus("STATE* Safe mode ON.", false);
4521        return;
4522      }
4523      // the threshold is reached or was reached before
4524      if (!isOn() ||                           // safe mode is off
4525          extension <= 0 || threshold <= 0) {  // don't need to wait
4526        this.leave(false); // leave safe mode
4527        return;
4528      }
4529      if (reached > 0) {  // threshold has already been reached before
4530        reportStatus("STATE* Safe mode ON.", false);
4531        return;
4532      }
4533      // start monitor
4534      reached = monotonicNow();
4535      reachedTimestamp = now();
4536      if (smmthread == null) {
4537        smmthread = new Daemon(new SafeModeMonitor());
4538        smmthread.start();
4539        reportStatus("STATE* Safe mode extension entered.", true);
4540      }
4541
4542      // check if we are ready to initialize replication queues
4543      if (canInitializeReplQueues() && !blockManager.isPopulatingReplQueues() && !haEnabled) {
4544        blockManager.initializeReplQueues();
4545      }
4546    }
4547      
4548    /**
4549     * Set total number of blocks.
4550     */
4551    private synchronized void setBlockTotal(int total) {
4552      this.blockTotal = total;
4553      this.blockThreshold = (int) (blockTotal * threshold);
4554      this.blockReplQueueThreshold = 
4555        (int) (blockTotal * replQueueThreshold);
4556      if (haEnabled) {
4557        // After we initialize the block count, any further namespace
4558        // modifications done while in safe mode need to keep track
4559        // of the number of total blocks in the system.
4560        this.shouldIncrementallyTrackBlocks = true;
4561      }
4562      if(blockSafe < 0)
4563        this.blockSafe = 0;
4564      checkMode();
4565    }
4566      
4567    /**
4568     * Increment number of safe blocks if current block has 
4569     * reached minimal replication.
4570     * @param replication current replication 
4571     */
4572    private synchronized void incrementSafeBlockCount(short replication) {
4573      if (replication == safeReplication) {
4574        this.blockSafe++;
4575
4576        // Report startup progress only if we haven't completed startup yet.
4577        StartupProgress prog = NameNode.getStartupProgress();
4578        if (prog.getStatus(Phase.SAFEMODE) != Status.COMPLETE) {
4579          if (this.awaitingReportedBlocksCounter == null) {
4580            this.awaitingReportedBlocksCounter = prog.getCounter(Phase.SAFEMODE,
4581              STEP_AWAITING_REPORTED_BLOCKS);
4582          }
4583          this.awaitingReportedBlocksCounter.increment();
4584        }
4585
4586        checkMode();
4587      }
4588    }
4589      
4590    /**
4591     * Decrement number of safe blocks if current block has 
4592     * fallen below minimal replication.
4593     * @param replication current replication 
4594     */
4595    private synchronized void decrementSafeBlockCount(short replication) {
4596      if (replication == safeReplication-1) {
4597        this.blockSafe--;
4598        //blockSafe is set to -1 in manual / low resources safemode
4599        assert blockSafe >= 0 || isManual() || areResourcesLow();
4600        checkMode();
4601      }
4602    }
4603
4604    /**
4605     * Check if safe mode was entered manually
4606     */
4607    private boolean isManual() {
4608      return extension == Integer.MAX_VALUE;
4609    }
4610
4611    /**
4612     * Set manual safe mode.
4613     */
4614    private synchronized void setManual() {
4615      extension = Integer.MAX_VALUE;
4616    }
4617
4618    /**
4619     * Check if safe mode was entered due to resources being low.
4620     */
4621    private boolean areResourcesLow() {
4622      return resourcesLow;
4623    }
4624
4625    /**
4626     * Set that resources are low for this instance of safe mode.
4627     */
4628    private void setResourcesLow() {
4629      resourcesLow = true;
4630    }
4631
4632    /**
4633     * A tip on how safe mode is to be turned off: manually or automatically.
4634     */
4635    String getTurnOffTip() {
4636      if(!isOn()) {
4637        return "Safe mode is OFF.";
4638      }
4639
4640      //Manual OR low-resource safemode. (Admin intervention required)
4641      String adminMsg = "It was turned on manually. ";
4642      if (areResourcesLow()) {
4643        adminMsg = "Resources are low on NN. Please add or free up more "
4644          + "resources then turn off safe mode manually. NOTE:  If you turn off"
4645          + " safe mode before adding resources, "
4646          + "the NN will immediately return to safe mode. ";
4647      }
4648      if (isManual() || areResourcesLow()) {
4649        return adminMsg
4650          + "Use \"hdfs dfsadmin -safemode leave\" to turn safe mode off.";
4651      }
4652
4653      boolean thresholdsMet = true;
4654      int numLive = getNumLiveDataNodes();
4655      String msg = "";
4656      if (blockSafe < blockThreshold) {
4657        msg += String.format(
4658          "The reported blocks %d needs additional %d"
4659          + " blocks to reach the threshold %.4f of total blocks %d.%n",
4660                blockSafe, (blockThreshold - blockSafe), threshold, blockTotal);
4661        thresholdsMet = false;
4662      } else {
4663        msg += String.format("The reported blocks %d has reached the threshold"
4664            + " %.4f of total blocks %d. ", blockSafe, threshold, blockTotal);
4665      }
4666      if (numLive < datanodeThreshold) {
4667        msg += String.format(
4668          "The number of live datanodes %d needs an additional %d live "
4669          + "datanodes to reach the minimum number %d.%n",
4670          numLive, (datanodeThreshold - numLive), datanodeThreshold);
4671        thresholdsMet = false;
4672      } else {
4673        msg += String.format("The number of live datanodes %d has reached "
4674            + "the minimum number %d. ",
4675            numLive, datanodeThreshold);
4676      }
4677
4678      if(blockManager.getBytesInFuture() > 0) {
4679        msg += "Name node detected blocks with generation stamps " +
4680            "in future. This means that Name node metadata is inconsistent." +
4681            "This can happen if Name node metadata files have been manually " +
4682            "replaced. Exiting safe mode will cause loss of " + blockManager
4683            .getBytesInFuture() + " byte(s). Please restart name node with " +
4684            "right metadata or use \"hdfs dfsadmin -safemode forceExit" +
4685            "if you are certain that the NameNode was started with the" +
4686            "correct FsImage and edit logs. If you encountered this during" +
4687            "a rollback, it is safe to exit with -safemode forceExit.";
4688        return msg;
4689      }
4690
4691
4692      msg += (reached > 0) ? "In safe mode extension. " : "";
4693      msg += "Safe mode will be turned off automatically ";
4694
4695      if (!thresholdsMet) {
4696        msg += "once the thresholds have been reached.";
4697      } else if (reached + extension - monotonicNow() > 0) {
4698        msg += ("in " + (reached + extension - monotonicNow()) / 1000 + " seconds.");
4699      } else {
4700        msg += "soon.";
4701      }
4702
4703      return msg;
4704    }
4705
4706    /**
4707     * Print status every 20 seconds.
4708     */
4709    private void reportStatus(String msg, boolean rightNow) {
4710      long curTime = now();
4711      if(!rightNow && (curTime - lastStatusReport < 20 * 1000))
4712        return;
4713      NameNode.stateChangeLog.info(msg + " \n" + getTurnOffTip());
4714      lastStatusReport = curTime;
4715    }
4716
4717    @Override
4718    public String toString() {
4719      String resText = "Current safe blocks = " 
4720        + blockSafe 
4721        + ". Target blocks = " + blockThreshold + " for threshold = %" + threshold
4722        + ". Minimal replication = " + safeReplication + ".";
4723      if (reached > 0) 
4724        resText += " Threshold was reached " + new Date(reachedTimestamp) + ".";
4725      return resText;
4726    }
4727      
4728    /**
4729     * Checks consistency of the class state.
4730     * This is costly so only runs if asserts are enabled.
4731     */
4732    private void doConsistencyCheck() {
4733      boolean assertsOn = false;
4734      assert assertsOn = true; // set to true if asserts are on
4735      if (!assertsOn) return;
4736      
4737      if (blockTotal == -1 && blockSafe == -1) {
4738        return; // manual safe mode
4739      }
4740      int activeBlocks = blockManager.getActiveBlockCount();
4741      if ((blockTotal != activeBlocks) &&
4742          !(blockSafe >= 0 && blockSafe <= blockTotal)) {
4743        throw new AssertionError(
4744            " SafeMode: Inconsistent filesystem state: "
4745        + "SafeMode data: blockTotal=" + blockTotal
4746        + " blockSafe=" + blockSafe + "; "
4747        + "BlockManager data: active="  + activeBlocks);
4748      }
4749    }
4750
4751    private synchronized void adjustBlockTotals(int deltaSafe, int deltaTotal) {
4752      if (!shouldIncrementallyTrackBlocks) {
4753        return;
4754      }
4755      assert haEnabled;
4756      
4757      if (LOG.isDebugEnabled()) {
4758        LOG.debug("Adjusting block totals from " +
4759            blockSafe + "/" + blockTotal + " to " +
4760            (blockSafe + deltaSafe) + "/" + (blockTotal + deltaTotal));
4761      }
4762      assert blockSafe + deltaSafe >= 0 : "Can't reduce blockSafe " +
4763        blockSafe + " by " + deltaSafe + ": would be negative";
4764      assert blockTotal + deltaTotal >= 0 : "Can't reduce blockTotal " +
4765        blockTotal + " by " + deltaTotal + ": would be negative";
4766      
4767      blockSafe += deltaSafe;
4768      setBlockTotal(blockTotal + deltaTotal);
4769    }
4770  }
4771    
4772  /**
4773   * Periodically check whether it is time to leave safe mode.
4774   * This thread starts when the threshold level is reached.
4775   *
4776   */
4777  class SafeModeMonitor implements Runnable {
4778    /** interval in msec for checking safe mode: {@value} */
4779    private static final long recheckInterval = 1000;
4780      
4781    /**
4782     */
4783    @Override
4784    public void run() {
4785      while (fsRunning) {
4786        writeLock();
4787        try {
4788          if (safeMode == null) { // Not in safe mode.
4789            break;
4790          }
4791          if (safeMode.canLeave()) {
4792            // Leave safe mode.
4793            safeMode.leave(false);
4794            smmthread = null;
4795            break;
4796          }
4797        } finally {
4798          writeUnlock();
4799        }
4800
4801        try {
4802          Thread.sleep(recheckInterval);
4803        } catch (InterruptedException ie) {
4804          // Ignored
4805        }
4806      }
4807      if (!fsRunning) {
4808        LOG.info("NameNode is being shutdown, exit SafeModeMonitor thread");
4809      }
4810    }
4811  }
4812    
4813  boolean setSafeMode(SafeModeAction action) throws IOException {
4814    if (action != SafeModeAction.SAFEMODE_GET) {
4815      checkSuperuserPrivilege();
4816      switch(action) {
4817      case SAFEMODE_LEAVE: // leave safe mode
4818        if (blockManager.getBytesInFuture() > 0) {
4819          LOG.error("Refusing to leave safe mode without a force flag. " +
4820              "Exiting safe mode will cause a deletion of " + blockManager
4821              .getBytesInFuture() + " byte(s). Please use " +
4822              "-forceExit flag to exit safe mode forcefully and data loss is " +
4823              "acceptable.");
4824          return isInSafeMode();
4825        }
4826        leaveSafeMode();
4827        break;
4828      case SAFEMODE_ENTER: // enter safe mode
4829        enterSafeMode(false);
4830        break;
4831      case SAFEMODE_FORCE_EXIT:
4832        if (blockManager.getBytesInFuture() > 0) {
4833          LOG.warn("Leaving safe mode due to forceExit. This will cause a data "
4834              + "loss of " + blockManager.getBytesInFuture() + " byte(s).");
4835          safeMode.leave(true);
4836          blockManager.clearBytesInFuture();
4837        } else {
4838          LOG.warn("forceExit used when normal exist would suffice. Treating " +
4839              "force exit as normal safe mode exit.");
4840        }
4841        leaveSafeMode();
4842        break;
4843      default:
4844        LOG.error("Unexpected safe mode action");
4845      }
4846    }
4847    return isInSafeMode();
4848  }
4849
4850  @Override
4851  public void checkSafeMode() {
4852    // safeMode is volatile, and may be set to null at any time
4853    SafeModeInfo safeMode = this.safeMode;
4854    if (safeMode != null) {
4855      safeMode.checkMode();
4856    }
4857  }
4858
4859  @Override
4860  public boolean isInSafeMode() {
4861    // safeMode is volatile, and may be set to null at any time
4862    SafeModeInfo safeMode = this.safeMode;
4863    if (safeMode == null)
4864      return false;
4865    return safeMode.isOn();
4866  }
4867
4868  @Override
4869  public boolean isInStartupSafeMode() {
4870    // safeMode is volatile, and may be set to null at any time
4871    SafeModeInfo safeMode = this.safeMode;
4872    if (safeMode == null)
4873      return false;
4874    // If the NN is in safemode, and not due to manual / low resources, we
4875    // assume it must be because of startup. If the NN had low resources during
4876    // startup, we assume it came out of startup safemode and it is now in low
4877    // resources safemode
4878    return !safeMode.isManual() && !safeMode.areResourcesLow()
4879      && safeMode.isOn();
4880  }
4881
4882  @Override
4883  public void incrementSafeBlockCount(int replication) {
4884    // safeMode is volatile, and may be set to null at any time
4885    SafeModeInfo safeMode = this.safeMode;
4886    if (safeMode == null)
4887      return;
4888    safeMode.incrementSafeBlockCount((short)replication);
4889  }
4890
4891  @Override
4892  public void decrementSafeBlockCount(BlockInfo b) {
4893    // safeMode is volatile, and may be set to null at any time
4894    SafeModeInfo safeMode = this.safeMode;
4895    if (safeMode == null) // mostly true
4896      return;
4897    BlockInfo storedBlock = getStoredBlock(b);
4898    if (storedBlock.isComplete()) {
4899      safeMode.decrementSafeBlockCount((short)blockManager.countNodes(b).liveReplicas());
4900    }
4901  }
4902  
4903  /**
4904   * Adjust the total number of blocks safe and expected during safe mode.
4905   * If safe mode is not currently on, this is a no-op.
4906   * @param deltaSafe the change in number of safe blocks
4907   * @param deltaTotal the change i nnumber of total blocks expected
4908   */
4909  @Override
4910  public void adjustSafeModeBlockTotals(int deltaSafe, int deltaTotal) {
4911    // safeMode is volatile, and may be set to null at any time
4912    SafeModeInfo safeMode = this.safeMode;
4913    if (safeMode == null)
4914      return;
4915    safeMode.adjustBlockTotals(deltaSafe, deltaTotal);
4916  }
4917
4918  /**
4919   * Set the total number of blocks in the system. 
4920   */
4921  public void setBlockTotal(long completeBlocksTotal) {
4922    // safeMode is volatile, and may be set to null at any time
4923    SafeModeInfo safeMode = this.safeMode;
4924    if (safeMode == null)
4925      return;
4926    safeMode.setBlockTotal((int) completeBlocksTotal);
4927  }
4928
4929  /**
4930   * Get the total number of blocks in the system. 
4931   */
4932  @Override // FSNamesystemMBean
4933  @Metric
4934  public long getBlocksTotal() {
4935    return blockManager.getTotalBlocks();
4936  }
4937
4938  /**
4939   * Get the number of files under construction in the system.
4940   */
4941  @Metric({ "NumFilesUnderConstruction",
4942      "Number of files under construction" })
4943  public long getNumFilesUnderConstruction() {
4944    return leaseManager.countPath();
4945  }
4946
4947  /**
4948   * Get the total number of active clients holding lease in the system.
4949   */
4950  @Metric({ "NumActiveClients", "Number of active clients holding lease" })
4951  public long getNumActiveClients() {
4952    return leaseManager.countLease();
4953  }
4954
4955  /**
4956   * Get the total number of COMPLETE blocks in the system.
4957   * For safe mode only complete blocks are counted.
4958   * This is invoked only during NN startup and checkpointing.
4959   */
4960  public long getCompleteBlocksTotal() {
4961    // Calculate number of blocks under construction
4962    long numUCBlocks = 0;
4963    readLock();
4964    try {
4965      numUCBlocks = leaseManager.getNumUnderConstructionBlocks();
4966      return getBlocksTotal() - numUCBlocks;
4967    } finally {
4968      readUnlock("getCompleteBlocksTotal");
4969    }
4970  }
4971
4972  /**
4973   * Enter safe mode. If resourcesLow is false, then we assume it is manual
4974   * @throws IOException
4975   */
4976  void enterSafeMode(boolean resourcesLow) throws IOException {
4977    writeLock();
4978    try {
4979      // Stop the secret manager, since rolling the master key would
4980      // try to write to the edit log
4981      stopSecretManager();
4982
4983      // Ensure that any concurrent operations have been fully synced
4984      // before entering safe mode. This ensures that the FSImage
4985      // is entirely stable on disk as soon as we're in safe mode.
4986      boolean isEditlogOpenForWrite = getEditLog().isOpenForWrite();
4987      // Before Editlog is in OpenForWrite mode, editLogStream will be null. So,
4988      // logSyncAll call can be called only when Edlitlog is in OpenForWrite mode
4989      if (isEditlogOpenForWrite) {
4990        getEditLog().logSyncAll();
4991      }
4992      if (!isInSafeMode()) {
4993        safeMode = new SafeModeInfo(resourcesLow);
4994        return;
4995      }
4996      if (resourcesLow) {
4997        safeMode.setResourcesLow();
4998      } else {
4999        safeMode.setManual();
5000      }
5001      if (isEditlogOpenForWrite) {
5002        getEditLog().logSyncAll();
5003      }
5004      NameNode.stateChangeLog.info("STATE* Safe mode is ON"
5005          + safeMode.getTurnOffTip());
5006    } finally {
5007      writeUnlock("enterSafeMode");
5008    }
5009  }
5010
5011  /**
5012   * Leave safe mode.
5013   */
5014  void leaveSafeMode() {
5015    writeLock();
5016    try {
5017      if (!isInSafeMode()) {
5018        NameNode.stateChangeLog.info("STATE* Safe mode is already OFF"); 
5019        return;
5020      }
5021      safeMode.leave(false);
5022    } finally {
5023      writeUnlock("leaveSafeMode");
5024    }
5025  }
5026    
5027  String getSafeModeTip() {
5028    // There is no need to take readLock.
5029    // Don't use isInSafeMode as this.safeMode might be set to null.
5030    // after isInSafeMode returns.
5031    boolean inSafeMode;
5032    SafeModeInfo safeMode = this.safeMode;
5033    if (safeMode == null) {
5034      inSafeMode = false;
5035    } else {
5036      inSafeMode = safeMode.isOn();
5037    }
5038
5039    if (!inSafeMode) {
5040      return "";
5041    } else {
5042      return safeMode.getTurnOffTip();
5043    }
5044  }
5045
5046  CheckpointSignature rollEditLog() throws IOException {
5047    checkSuperuserPrivilege();
5048    checkOperation(OperationCategory.JOURNAL);
5049    writeLock();
5050    try {
5051      checkOperation(OperationCategory.JOURNAL);
5052      checkNameNodeSafeMode("Log not rolled");
5053      if (Server.isRpcInvocation()) {
5054        LOG.info("Roll Edit Log from " + Server.getRemoteAddress());
5055      }
5056      return getFSImage().rollEditLog(getEffectiveLayoutVersion());
5057    } finally {
5058      writeUnlock("rollEditLog");
5059    }
5060  }
5061
5062  NamenodeCommand startCheckpoint(NamenodeRegistration backupNode,
5063      NamenodeRegistration activeNamenode) throws IOException {
5064    checkOperation(OperationCategory.CHECKPOINT);
5065    writeLock();
5066    try {
5067      checkOperation(OperationCategory.CHECKPOINT);
5068      checkNameNodeSafeMode("Checkpoint not started");
5069      
5070      LOG.info("Start checkpoint for " + backupNode.getAddress());
5071      NamenodeCommand cmd = getFSImage().startCheckpoint(backupNode,
5072          activeNamenode, getEffectiveLayoutVersion());
5073      getEditLog().logSync();
5074      return cmd;
5075    } finally {
5076      writeUnlock("startCheckpoint");
5077    }
5078  }
5079
5080  public void processIncrementalBlockReport(final DatanodeID nodeID,
5081      final StorageReceivedDeletedBlocks srdb)
5082      throws IOException {
5083    writeLock();
5084    try {
5085      blockManager.processIncrementalBlockReport(nodeID, srdb);
5086    } finally {
5087      writeUnlock("processIncrementalBlockReport");
5088    }
5089  }
5090  
5091  void endCheckpoint(NamenodeRegistration registration,
5092                            CheckpointSignature sig) throws IOException {
5093    checkOperation(OperationCategory.CHECKPOINT);
5094    readLock();
5095    try {
5096      checkOperation(OperationCategory.CHECKPOINT);
5097      checkNameNodeSafeMode("Checkpoint not ended");
5098      LOG.info("End checkpoint for " + registration.getAddress());
5099      getFSImage().endCheckpoint(sig);
5100    } finally {
5101      readUnlock("endCheckpoint");
5102    }
5103  }
5104
5105  PermissionStatus createFsOwnerPermissions(FsPermission permission) {
5106    return new PermissionStatus(fsOwner.getShortUserName(), supergroup, permission);
5107  }
5108
5109  @Override
5110  public void checkSuperuserPrivilege()
5111      throws AccessControlException {
5112    if (isPermissionEnabled) {
5113      FSPermissionChecker pc = getPermissionChecker();
5114      pc.checkSuperuserPrivilege();
5115    }
5116  }
5117
5118  /**
5119   * Check to see if we have exceeded the limit on the number
5120   * of inodes.
5121   */
5122  void checkFsObjectLimit() throws IOException {
5123    if (maxFsObjects != 0 &&
5124        maxFsObjects <= dir.totalInodes() + getBlocksTotal()) {
5125      throw new IOException("Exceeded the configured number of objects " +
5126                             maxFsObjects + " in the filesystem.");
5127    }
5128  }
5129
5130  /**
5131   * Get the total number of objects in the system. 
5132   */
5133  @Override // FSNamesystemMBean
5134  public long getMaxObjects() {
5135    return maxFsObjects;
5136  }
5137
5138  @Override // FSNamesystemMBean
5139  @Metric
5140  public long getFilesTotal() {
5141    // There is no need to take fSNamesystem's lock as
5142    // FSDirectory has its own lock.
5143    return this.dir.totalInodes();
5144  }
5145
5146  @Override // FSNamesystemMBean
5147  @Metric
5148  public long getPendingReplicationBlocks() {
5149    return blockManager.getPendingReplicationBlocksCount();
5150  }
5151
5152  @Override // FSNamesystemMBean
5153  @Metric
5154  public long getUnderReplicatedBlocks() {
5155    return blockManager.getUnderReplicatedBlocksCount();
5156  }
5157
5158  /** Returns number of blocks with corrupt replicas */
5159  @Metric({"CorruptBlocks", "Number of blocks with corrupt replicas"})
5160  public long getCorruptReplicaBlocks() {
5161    return blockManager.getCorruptReplicaBlocksCount();
5162  }
5163
5164  @Override // FSNamesystemMBean
5165  @Metric
5166  public long getScheduledReplicationBlocks() {
5167    return blockManager.getScheduledReplicationBlocksCount();
5168  }
5169
5170  @Override
5171  @Metric
5172  public long getPendingDeletionBlocks() {
5173    return blockManager.getPendingDeletionBlocksCount();
5174  }
5175
5176  @Override
5177  public long getBlockDeletionStartTime() {
5178    return startTime + blockManager.getStartupDelayBlockDeletionInMs();
5179  }
5180
5181  @Metric
5182  public long getExcessBlocks() {
5183    return blockManager.getExcessBlocksCount();
5184  }
5185
5186  @Metric
5187  public long getNumTimedOutPendingReplications() {
5188    return blockManager.getNumTimedOutPendingReplications();
5189  }
5190
5191  // HA-only metric
5192  @Metric
5193  public long getPostponedMisreplicatedBlocks() {
5194    return blockManager.getPostponedMisreplicatedBlocksCount();
5195  }
5196
5197  // HA-only metric
5198  @Metric
5199  public int getPendingDataNodeMessageCount() {
5200    return blockManager.getPendingDataNodeMessageCount();
5201  }
5202  
5203  // HA-only metric
5204  @Metric
5205  public String getHAState() {
5206    return haContext.getState().toString();
5207  }
5208
5209  // HA-only metric
5210  @Metric
5211  public long getMillisSinceLastLoadedEdits() {
5212    if (isInStandbyState() && editLogTailer != null) {
5213      return monotonicNow() - editLogTailer.getLastLoadTimeMs();
5214    } else {
5215      return 0;
5216    }
5217  }
5218
5219  @Metric
5220  public int getBlockCapacity() {
5221    return blockManager.getCapacity();
5222  }
5223
5224  public HAServiceState getState() {
5225    return haContext == null ? null : haContext.getState().getServiceState();
5226  }
5227
5228  @Override // FSNamesystemMBean
5229  public String getFSState() {
5230    return isInSafeMode() ? "safeMode" : "Operational";
5231  }
5232  
5233  private ObjectName mbeanName;
5234  private ObjectName mxbeanName;
5235
5236  /**
5237   * Register the FSNamesystem MBean using the name
5238   *        "hadoop:service=NameNode,name=FSNamesystemState"
5239   */
5240  private void registerMBean() {
5241    // We can only implement one MXBean interface, so we keep the old one.
5242    try {
5243      StandardMBean bean = new StandardMBean(this, FSNamesystemMBean.class);
5244      mbeanName = MBeans.register("NameNode", "FSNamesystemState", bean);
5245    } catch (NotCompliantMBeanException e) {
5246      throw new RuntimeException("Bad MBean setup", e);
5247    }
5248
5249    LOG.info("Registered FSNamesystemState MBean");
5250  }
5251
5252  /**
5253   * shutdown FSNamesystem
5254   */
5255  void shutdown() {
5256    if (snapshotManager != null) {
5257      snapshotManager.shutdown();
5258    }
5259    if (mbeanName != null) {
5260      MBeans.unregister(mbeanName);
5261      mbeanName = null;
5262    }
5263    if (mxbeanName != null) {
5264      MBeans.unregister(mxbeanName);
5265      mxbeanName = null;
5266    }
5267    if (dir != null) {
5268      dir.shutdown();
5269    }
5270    if (blockManager != null) {
5271      blockManager.shutdown();
5272    }
5273    if (provider != null) {
5274      try {
5275        provider.close();
5276      } catch (IOException e) {
5277        LOG.error("Failed to close provider.", e);
5278      }
5279    }
5280  }
5281
5282  @Override // FSNamesystemMBean
5283  public int getNumLiveDataNodes() {
5284    return getBlockManager().getDatanodeManager().getNumLiveDataNodes();
5285  }
5286
5287  @Override // FSNamesystemMBean
5288  public int getNumDeadDataNodes() {
5289    return getBlockManager().getDatanodeManager().getNumDeadDataNodes();
5290  }
5291  
5292  @Override // FSNamesystemMBean
5293  public int getNumDecomLiveDataNodes() {
5294    final List<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>();
5295    getBlockManager().getDatanodeManager().fetchDatanodes(live, null, false);
5296    int liveDecommissioned = 0;
5297    for (DatanodeDescriptor node : live) {
5298      liveDecommissioned += node.isDecommissioned() ? 1 : 0;
5299    }
5300    return liveDecommissioned;
5301  }
5302
5303  @Override // FSNamesystemMBean
5304  public int getNumDecomDeadDataNodes() {
5305    final List<DatanodeDescriptor> dead = new ArrayList<DatanodeDescriptor>();
5306    getBlockManager().getDatanodeManager().fetchDatanodes(null, dead, false);
5307    int deadDecommissioned = 0;
5308    for (DatanodeDescriptor node : dead) {
5309      deadDecommissioned += node.isDecommissioned() ? 1 : 0;
5310    }
5311    return deadDecommissioned;
5312  }
5313
5314  @Override // FSNamesystemMBean
5315  public int getVolumeFailuresTotal() {
5316    List<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>();
5317    getBlockManager().getDatanodeManager().fetchDatanodes(live, null, false);
5318    int volumeFailuresTotal = 0;
5319    for (DatanodeDescriptor node: live) {
5320      volumeFailuresTotal += node.getVolumeFailures();
5321    }
5322    return volumeFailuresTotal;
5323  }
5324
5325  @Override // FSNamesystemMBean
5326  public long getEstimatedCapacityLostTotal() {
5327    List<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>();
5328    getBlockManager().getDatanodeManager().fetchDatanodes(live, null, false);
5329    long estimatedCapacityLostTotal = 0;
5330    for (DatanodeDescriptor node: live) {
5331      VolumeFailureSummary volumeFailureSummary = node.getVolumeFailureSummary();
5332      if (volumeFailureSummary != null) {
5333        estimatedCapacityLostTotal +=
5334            volumeFailureSummary.getEstimatedCapacityLostTotal();
5335      }
5336    }
5337    return estimatedCapacityLostTotal;
5338  }
5339
5340  @Override // FSNamesystemMBean
5341  public int getNumDecommissioningDataNodes() {
5342    return getBlockManager().getDatanodeManager().getDecommissioningNodes()
5343        .size();
5344  }
5345
5346  @Override // FSNamesystemMBean
5347  @Metric({"StaleDataNodes", 
5348    "Number of datanodes marked stale due to delayed heartbeat"})
5349  public int getNumStaleDataNodes() {
5350    return getBlockManager().getDatanodeManager().getNumStaleNodes();
5351  }
5352
5353  /**
5354   * Storages are marked as "content stale" after NN restart or fails over and
5355   * before NN receives the first Heartbeat followed by the first Blockreport.
5356   */
5357  @Override // FSNamesystemMBean
5358  public int getNumStaleStorages() {
5359    return getBlockManager().getDatanodeManager().getNumStaleStorages();
5360  }
5361
5362  @Override // FSNamesystemMBean
5363  public String getTopUserOpCounts() {
5364    if (!topConf.isEnabled) {
5365      return null;
5366    }
5367
5368    Date now = new Date();
5369    final List<RollingWindowManager.TopWindow> topWindows =
5370        topMetrics.getTopWindows();
5371    Map<String, Object> topMap = new TreeMap<String, Object>();
5372    topMap.put("windows", topWindows);
5373    topMap.put("timestamp", DFSUtil.dateToIso8601String(now));
5374    try {
5375      return JsonUtil.toJsonString(topMap);
5376    } catch (IOException e) {
5377      LOG.warn("Failed to fetch TopUser metrics", e);
5378    }
5379    return null;
5380  }
5381
5382  /**
5383   * Increments, logs and then returns the stamp
5384   */
5385  long nextGenerationStamp(boolean legacyBlock)
5386      throws IOException, SafeModeException {
5387    assert hasWriteLock();
5388    checkNameNodeSafeMode("Cannot get next generation stamp");
5389
5390    long gs = blockIdManager.nextGenerationStamp(legacyBlock);
5391    if (legacyBlock) {
5392      getEditLog().logGenerationStampV1(gs);
5393    } else {
5394      getEditLog().logGenerationStampV2(gs);
5395    }
5396
5397    // NB: callers sync the log
5398    return gs;
5399  }
5400
5401  /**
5402   * Increments, logs and then returns the block ID
5403   */
5404  private long nextBlockId() throws IOException {
5405    assert hasWriteLock();
5406    checkNameNodeSafeMode("Cannot get next block ID");
5407    final long blockId = blockIdManager.nextBlockId();
5408    getEditLog().logAllocateBlockId(blockId);
5409    // NB: callers sync the log
5410    return blockId;
5411  }
5412
5413  private boolean isFileDeleted(INodeFile file) {
5414    // Not in the inodeMap or in the snapshot but marked deleted.
5415    if (dir.getInode(file.getId()) == null) {
5416      return true;
5417    }
5418
5419    // look at the path hierarchy to see if one parent is deleted by recursive
5420    // deletion
5421    INode tmpChild = file;
5422    INodeDirectory tmpParent = file.getParent();
5423    while (true) {
5424      if (tmpParent == null) {
5425        return true;
5426      }
5427
5428      INode childINode = tmpParent.getChild(tmpChild.getLocalNameBytes(),
5429          Snapshot.CURRENT_STATE_ID);
5430      if (childINode == null || !childINode.equals(tmpChild)) {
5431        // a newly created INode with the same name as an already deleted one
5432        // would be a different INode than the deleted one
5433        return true;
5434      }
5435
5436      if (tmpParent.isRoot()) {
5437        break;
5438      }
5439
5440      tmpChild = tmpParent;
5441      tmpParent = tmpParent.getParent();
5442    }
5443
5444    if (file.isWithSnapshot() &&
5445        file.getFileWithSnapshotFeature().isCurrentFileDeleted()) {
5446      return true;
5447    }
5448    return false;
5449  }
5450
5451  private INodeFile checkUCBlock(ExtendedBlock block,
5452      String clientName) throws IOException {
5453    assert hasWriteLock();
5454    checkNameNodeSafeMode("Cannot get a new generation stamp and an "
5455        + "access token for block " + block);
5456    
5457    // check stored block state
5458    BlockInfo storedBlock = getStoredBlock(ExtendedBlock.getLocalBlock(block));
5459    if (storedBlock == null) {
5460      throw new IOException(block + " does not exist.");
5461    }
5462    if (storedBlock.getBlockUCState() != BlockUCState.UNDER_CONSTRUCTION) {
5463      throw new IOException("Unexpected BlockUCState: " + block
5464          + " is " + storedBlock.getBlockUCState()
5465          + " but not " + BlockUCState.UNDER_CONSTRUCTION);
5466    }
5467    
5468    // check file inode
5469    final INodeFile file = getBlockCollection(storedBlock);
5470    if (file == null || !file.isUnderConstruction() || isFileDeleted(file)) {
5471      throw new IOException("The file " + storedBlock + 
5472          " belonged to does not exist or it is not under construction.");
5473    }
5474    
5475    // check lease
5476    if (clientName == null
5477        || !clientName.equals(file.getFileUnderConstructionFeature()
5478            .getClientName())) {
5479      throw new LeaseExpiredException("Lease mismatch: " + block + 
5480          " is accessed by a non lease holder " + clientName); 
5481    }
5482
5483    return file;
5484  }
5485  
5486  /**
5487   * Client is reporting some bad block locations.
5488   */
5489  void reportBadBlocks(LocatedBlock[] blocks) throws IOException {
5490    checkOperation(OperationCategory.WRITE);
5491    writeLock();
5492    try {
5493      checkOperation(OperationCategory.WRITE);
5494      for (int i = 0; i < blocks.length; i++) {
5495        ExtendedBlock blk = blocks[i].getBlock();
5496        DatanodeInfo[] nodes = blocks[i].getLocations();
5497        String[] storageIDs = blocks[i].getStorageIDs();
5498        for (int j = 0; j < nodes.length; j++) {
5499          NameNode.stateChangeLog.info("*DIR* reportBadBlocks for block: {} on"
5500              + " datanode: {}", blk, nodes[j].getXferAddr());
5501          blockManager.findAndMarkBlockAsCorrupt(blk, nodes[j],
5502              storageIDs == null ? null: storageIDs[j], 
5503              "client machine reported it");
5504        }
5505      }
5506    } finally {
5507      writeUnlock("reportBadBlocks");
5508    }
5509  }
5510
5511  /**
5512   * Get a new generation stamp together with an access token for 
5513   * a block under construction
5514   * 
5515   * This method is called for recovering a failed pipeline or setting up
5516   * a pipeline to append to a block.
5517   * 
5518   * @param block a block
5519   * @param clientName the name of a client
5520   * @return a located block with a new generation stamp and an access token
5521   * @throws IOException if any error occurs
5522   */
5523  LocatedBlock updateBlockForPipeline(ExtendedBlock block, 
5524      String clientName) throws IOException {
5525    LocatedBlock locatedBlock;
5526    checkOperation(OperationCategory.WRITE);
5527    writeLock();
5528    try {
5529      checkOperation(OperationCategory.WRITE);
5530
5531      // check vadility of parameters
5532      checkUCBlock(block, clientName);
5533  
5534      // get a new generation stamp and an access token
5535      block.setGenerationStamp(nextGenerationStamp(blockIdManager.isLegacyBlock(block.getLocalBlock())));
5536      locatedBlock = new LocatedBlock(block, new DatanodeInfo[0]);
5537      blockManager.setBlockToken(locatedBlock, BlockTokenIdentifier.AccessMode.WRITE);
5538    } finally {
5539      writeUnlock("bumpBlockGenerationStamp");
5540    }
5541    // Ensure we record the new generation stamp
5542    getEditLog().logSync();
5543    return locatedBlock;
5544  }
5545  
5546  /**
5547   * Update a pipeline for a block under construction
5548   * 
5549   * @param clientName the name of the client
5550   * @param oldBlock and old block
5551   * @param newBlock a new block with a new generation stamp and length
5552   * @param newNodes datanodes in the pipeline
5553   * @throws IOException if any error occurs
5554   */
5555  void updatePipeline(
5556      String clientName, ExtendedBlock oldBlock, ExtendedBlock newBlock,
5557      DatanodeID[] newNodes, String[] newStorageIDs, boolean logRetryCache)
5558      throws IOException {
5559    LOG.info("updatePipeline(" + oldBlock.getLocalBlock()
5560             + ", newGS=" + newBlock.getGenerationStamp()
5561             + ", newLength=" + newBlock.getNumBytes()
5562             + ", newNodes=" + Arrays.asList(newNodes)
5563             + ", client=" + clientName
5564             + ")");
5565    waitForLoadingFSImage();
5566    writeLock();
5567    try {
5568      checkOperation(OperationCategory.WRITE);
5569      checkNameNodeSafeMode("Pipeline not updated");
5570      assert newBlock.getBlockId()==oldBlock.getBlockId() : newBlock + " and "
5571        + oldBlock + " has different block identifier";
5572      updatePipelineInternal(clientName, oldBlock, newBlock, newNodes,
5573          newStorageIDs, logRetryCache);
5574    } finally {
5575      writeUnlock("updatePipeline");
5576    }
5577    getEditLog().logSync();
5578    LOG.info("updatePipeline(" + oldBlock.getLocalBlock() + " => "
5579        + newBlock.getLocalBlock() + ") success");
5580  }
5581
5582  private void updatePipelineInternal(String clientName, ExtendedBlock oldBlock,
5583      ExtendedBlock newBlock, DatanodeID[] newNodes, String[] newStorageIDs,
5584      boolean logRetryCache)
5585      throws IOException {
5586    assert hasWriteLock();
5587    // check the vadility of the block and lease holder name
5588    final INodeFile pendingFile = checkUCBlock(oldBlock, clientName);
5589    final String src = pendingFile.getFullPathName();
5590    final BlockInfo blockinfo = pendingFile.getLastBlock();
5591    assert !blockinfo.isComplete();
5592
5593    // check new GS & length: this is not expected
5594    if (newBlock.getGenerationStamp() <= blockinfo.getGenerationStamp() ||
5595        newBlock.getNumBytes() < blockinfo.getNumBytes()) {
5596      String msg = "Update " + oldBlock + " (len = " + 
5597        blockinfo.getNumBytes() + ") to an older state: " + newBlock + 
5598        " (len = " + newBlock.getNumBytes() +")";
5599      LOG.warn(msg);
5600      throw new IOException(msg);
5601    }
5602
5603    // Update old block with the new generation stamp and new length
5604    blockinfo.setNumBytes(newBlock.getNumBytes());
5605    blockinfo.setGenerationStampAndVerifyReplicas(newBlock.getGenerationStamp());
5606
5607    // find the DatanodeDescriptor objects
5608    final DatanodeStorageInfo[] storages = blockManager.getDatanodeManager()
5609        .getDatanodeStorageInfos(newNodes, newStorageIDs,
5610            "src=%s, oldBlock=%s, newBlock=%s, clientName=%s",
5611            src, oldBlock, newBlock, clientName);
5612    blockinfo.getUnderConstructionFeature().setExpectedLocations(
5613        blockinfo, storages);
5614
5615    FSDirWriteFileOp.persistBlocks(dir, src, pendingFile, logRetryCache);
5616  }
5617
5618  /**
5619   * Register a Backup name-node, verifying that it belongs
5620   * to the correct namespace, and adding it to the set of
5621   * active journals if necessary.
5622   * 
5623   * @param bnReg registration of the new BackupNode
5624   * @param nnReg registration of this NameNode
5625   * @throws IOException if the namespace IDs do not match
5626   */
5627  void registerBackupNode(NamenodeRegistration bnReg,
5628      NamenodeRegistration nnReg) throws IOException {
5629    writeLock();
5630    try {
5631      if(getFSImage().getStorage().getNamespaceID() 
5632         != bnReg.getNamespaceID())
5633        throw new IOException("Incompatible namespaceIDs: "
5634            + " Namenode namespaceID = "
5635            + getFSImage().getStorage().getNamespaceID() + "; "
5636            + bnReg.getRole() +
5637            " node namespaceID = " + bnReg.getNamespaceID());
5638      if (bnReg.getRole() == NamenodeRole.BACKUP) {
5639        getFSImage().getEditLog().registerBackupNode(
5640            bnReg, nnReg);
5641      }
5642    } finally {
5643      writeUnlock("registerBackupNode");
5644    }
5645  }
5646
5647  /**
5648   * Release (unregister) backup node.
5649   * <p>
5650   * Find and remove the backup stream corresponding to the node.
5651   * @throws IOException
5652   */
5653  void releaseBackupNode(NamenodeRegistration registration)
5654    throws IOException {
5655    checkOperation(OperationCategory.WRITE);
5656    writeLock();
5657    try {
5658      checkOperation(OperationCategory.WRITE);
5659      if(getFSImage().getStorage().getNamespaceID()
5660         != registration.getNamespaceID())
5661        throw new IOException("Incompatible namespaceIDs: "
5662            + " Namenode namespaceID = "
5663            + getFSImage().getStorage().getNamespaceID() + "; "
5664            + registration.getRole() +
5665            " node namespaceID = " + registration.getNamespaceID());
5666      getEditLog().releaseBackupStream(registration);
5667    } finally {
5668      writeUnlock("releaseBackupNode");
5669    }
5670  }
5671
5672  static class CorruptFileBlockInfo {
5673    final String path;
5674    final Block block;
5675    
5676    public CorruptFileBlockInfo(String p, Block b) {
5677      path = p;
5678      block = b;
5679    }
5680    
5681    @Override
5682    public String toString() {
5683      return block.getBlockName() + "\t" + path;
5684    }
5685  }
5686  /**
5687   * @param path Restrict corrupt files to this portion of namespace.
5688   * @param cookieTab Support for continuation; cookieTab  tells where
5689   *                  to start from
5690   * @return a list in which each entry describes a corrupt file/block
5691   * @throws IOException
5692   */
5693  Collection<CorruptFileBlockInfo> listCorruptFileBlocks(String path,
5694  String[] cookieTab) throws IOException {
5695    checkSuperuserPrivilege();
5696    checkOperation(OperationCategory.READ);
5697
5698    int count = 0;
5699    ArrayList<CorruptFileBlockInfo> corruptFiles =
5700        new ArrayList<CorruptFileBlockInfo>();
5701    if (cookieTab == null) {
5702      cookieTab = new String[] { null };
5703    }
5704
5705    // Do a quick check if there are any corrupt files without taking the lock
5706    if (blockManager.getMissingBlocksCount() == 0) {
5707      if (cookieTab[0] == null) {
5708        cookieTab[0] = String.valueOf(getIntCookie(cookieTab[0]));
5709      }
5710      if (LOG.isDebugEnabled()) {
5711        LOG.debug("there are no corrupt file blocks.");
5712      }
5713      return corruptFiles;
5714    }
5715
5716    readLock();
5717    try {
5718      checkOperation(OperationCategory.READ);
5719      if (!blockManager.isPopulatingReplQueues()) {
5720        throw new IOException("Cannot run listCorruptFileBlocks because " +
5721                              "replication queues have not been initialized.");
5722      }
5723      // print a limited # of corrupt files per call
5724
5725      final Iterator<BlockInfo> blkIterator =
5726          blockManager.getCorruptReplicaBlockIterator();
5727
5728      int skip = getIntCookie(cookieTab[0]);
5729      for (int i = 0; i < skip && blkIterator.hasNext(); i++) {
5730        blkIterator.next();
5731      }
5732
5733      while (blkIterator.hasNext()) {
5734        BlockInfo blk = blkIterator.next();
5735        final INodeFile inode = getBlockCollection(blk);
5736        skip++;
5737        if (inode != null && blockManager.countNodes(blk).liveReplicas() == 0) {
5738          String src = inode.getFullPathName();
5739          if (src.startsWith(path)){
5740            corruptFiles.add(new CorruptFileBlockInfo(src, blk));
5741            count++;
5742            if (count >= DEFAULT_MAX_CORRUPT_FILEBLOCKS_RETURNED)
5743              break;
5744          }
5745        }
5746      }
5747      cookieTab[0] = String.valueOf(skip);
5748      if (LOG.isDebugEnabled()) {
5749        LOG.debug("list corrupt file blocks returned: " + count);
5750      }
5751      return corruptFiles;
5752    } finally {
5753      readUnlock("listCorruptFileBlocks");
5754    }
5755  }
5756
5757  /**
5758   * Convert string cookie to integer.
5759   */
5760  private static int getIntCookie(String cookie){
5761    int c;
5762    if(cookie == null){
5763      c = 0;
5764    } else {
5765      try{
5766        c = Integer.parseInt(cookie);
5767      }catch (NumberFormatException e) {
5768        c = 0;
5769      }
5770    }
5771    c = Math.max(0, c);
5772    return c;
5773  }
5774
5775  /**
5776   * Create delegation token secret manager
5777   */
5778  private DelegationTokenSecretManager createDelegationTokenSecretManager(
5779      Configuration conf) {
5780    return new DelegationTokenSecretManager(conf.getLong(
5781        DFS_NAMENODE_DELEGATION_KEY_UPDATE_INTERVAL_KEY,
5782        DFS_NAMENODE_DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT),
5783        conf.getLong(DFS_NAMENODE_DELEGATION_TOKEN_MAX_LIFETIME_KEY,
5784            DFS_NAMENODE_DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT),
5785        conf.getLong(DFS_NAMENODE_DELEGATION_TOKEN_RENEW_INTERVAL_KEY,
5786            DFS_NAMENODE_DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT),
5787        DELEGATION_TOKEN_REMOVER_SCAN_INTERVAL,
5788        conf.getBoolean(DFS_NAMENODE_AUDIT_LOG_TOKEN_TRACKING_ID_KEY,
5789            DFS_NAMENODE_AUDIT_LOG_TOKEN_TRACKING_ID_DEFAULT),
5790        this);
5791  }
5792
5793  /**
5794   * Returns the DelegationTokenSecretManager instance in the namesystem.
5795   * @return delegation token secret manager object
5796   */
5797  DelegationTokenSecretManager getDelegationTokenSecretManager() {
5798    return dtSecretManager;
5799  }
5800
5801  /**
5802   * @param renewer Renewer information
5803   * @return delegation toek
5804   * @throws IOException on error
5805   */
5806  Token<DelegationTokenIdentifier> getDelegationToken(Text renewer)
5807      throws IOException {
5808    Token<DelegationTokenIdentifier> token;
5809    checkOperation(OperationCategory.WRITE);
5810    writeLock();
5811    try {
5812      checkOperation(OperationCategory.WRITE);
5813      checkNameNodeSafeMode("Cannot issue delegation token");
5814      if (!isAllowedDelegationTokenOp()) {
5815        throw new IOException(
5816          "Delegation Token can be issued only with kerberos or web authentication");
5817      }
5818      if (dtSecretManager == null || !dtSecretManager.isRunning()) {
5819        LOG.warn("trying to get DT with no secret manager running");
5820        return null;
5821      }
5822
5823      UserGroupInformation ugi = getRemoteUser();
5824      String user = ugi.getUserName();
5825      Text owner = new Text(user);
5826      Text realUser = null;
5827      if (ugi.getRealUser() != null) {
5828        realUser = new Text(ugi.getRealUser().getUserName());
5829      }
5830      DelegationTokenIdentifier dtId = new DelegationTokenIdentifier(owner,
5831        renewer, realUser);
5832      token = new Token<DelegationTokenIdentifier>(
5833        dtId, dtSecretManager);
5834      long expiryTime = dtSecretManager.getTokenExpiryTime(dtId);
5835      getEditLog().logGetDelegationToken(dtId, expiryTime);
5836    } finally {
5837      writeUnlock("getDelegationToken");
5838    }
5839    getEditLog().logSync();
5840    return token;
5841  }
5842
5843  /**
5844   * 
5845   * @param token token to renew
5846   * @return new expiryTime of the token
5847   * @throws InvalidToken if {@code token} is invalid
5848   * @throws IOException on other errors
5849   */
5850  long renewDelegationToken(Token<DelegationTokenIdentifier> token)
5851      throws InvalidToken, IOException {
5852    long expiryTime;
5853    checkOperation(OperationCategory.WRITE);
5854    writeLock();
5855    try {
5856      checkOperation(OperationCategory.WRITE);
5857
5858      checkNameNodeSafeMode("Cannot renew delegation token");
5859      if (!isAllowedDelegationTokenOp()) {
5860        throw new IOException(
5861            "Delegation Token can be renewed only with kerberos or web authentication");
5862      }
5863      String renewer = getRemoteUser().getShortUserName();
5864      expiryTime = dtSecretManager.renewToken(token, renewer);
5865      DelegationTokenIdentifier id = new DelegationTokenIdentifier();
5866      ByteArrayInputStream buf = new ByteArrayInputStream(token.getIdentifier());
5867      DataInputStream in = new DataInputStream(buf);
5868      id.readFields(in);
5869      getEditLog().logRenewDelegationToken(id, expiryTime);
5870    } finally {
5871      writeUnlock("renewDelegationToken");
5872    }
5873    getEditLog().logSync();
5874    return expiryTime;
5875  }
5876
5877  /**
5878   * 
5879   * @param token token to cancel
5880   * @throws IOException on error
5881   */
5882  void cancelDelegationToken(Token<DelegationTokenIdentifier> token)
5883      throws IOException {
5884    checkOperation(OperationCategory.WRITE);
5885    writeLock();
5886    try {
5887      checkOperation(OperationCategory.WRITE);
5888
5889      checkNameNodeSafeMode("Cannot cancel delegation token");
5890      String canceller = getRemoteUser().getUserName();
5891      DelegationTokenIdentifier id = dtSecretManager
5892        .cancelToken(token, canceller);
5893      getEditLog().logCancelDelegationToken(id);
5894    } finally {
5895      writeUnlock("cancelDelegationToken");
5896    }
5897    getEditLog().logSync();
5898  }
5899
5900  /**
5901   * @param out save state of the secret manager
5902   * @param sdPath String storage directory path
5903   */
5904  void saveSecretManagerStateCompat(DataOutputStream out, String sdPath)
5905      throws IOException {
5906    dtSecretManager.saveSecretManagerStateCompat(out, sdPath);
5907  }
5908
5909  SecretManagerState saveSecretManagerState() {
5910    return dtSecretManager.saveSecretManagerState();
5911  }
5912
5913  /**
5914   * @param in load the state of secret manager from input stream
5915   */
5916  void loadSecretManagerStateCompat(DataInput in) throws IOException {
5917    dtSecretManager.loadSecretManagerStateCompat(in);
5918  }
5919
5920  void loadSecretManagerState(SecretManagerSection s,
5921      List<SecretManagerSection.DelegationKey> keys,
5922      List<SecretManagerSection.PersistToken> tokens) throws IOException {
5923    dtSecretManager.loadSecretManagerState(new SecretManagerState(s, keys, tokens));
5924  }
5925
5926  /**
5927   * Log the updateMasterKey operation to edit logs
5928   * 
5929   * @param key new delegation key.
5930   */
5931  public void logUpdateMasterKey(DelegationKey key) {
5932    
5933    assert !isInSafeMode() :
5934      "this should never be called while in safemode, since we stop " +
5935      "the DT manager before entering safemode!";
5936    // No need to hold FSN lock since we don't access any internal
5937    // structures, and this is stopped before the FSN shuts itself
5938    // down, etc.
5939    getEditLog().logUpdateMasterKey(key);
5940    getEditLog().logSync();
5941  }
5942  
5943  /**
5944   * Log the cancellation of expired tokens to edit logs
5945   * 
5946   * @param id token identifier to cancel
5947   */
5948  public void logExpireDelegationToken(DelegationTokenIdentifier id) {
5949    assert !isInSafeMode() :
5950      "this should never be called while in safemode, since we stop " +
5951      "the DT manager before entering safemode!";
5952    // No need to hold FSN lock since we don't access any internal
5953    // structures, and this is stopped before the FSN shuts itself
5954    // down, etc.
5955    getEditLog().logCancelDelegationToken(id);
5956  }  
5957  
5958  private void logReassignLease(String leaseHolder, String src,
5959      String newHolder) {
5960    assert hasWriteLock();
5961    getEditLog().logReassignLease(leaseHolder, src, newHolder);
5962  }
5963  
5964  /**
5965   * 
5966   * @return true if delegation token operation is allowed
5967   */
5968  private boolean isAllowedDelegationTokenOp() throws IOException {
5969    AuthenticationMethod authMethod = getConnectionAuthenticationMethod();
5970    if (UserGroupInformation.isSecurityEnabled()
5971        && (authMethod != AuthenticationMethod.KERBEROS)
5972        && (authMethod != AuthenticationMethod.KERBEROS_SSL)
5973        && (authMethod != AuthenticationMethod.CERTIFICATE)) {
5974      return false;
5975    }
5976    return true;
5977  }
5978  
5979  /**
5980   * Returns authentication method used to establish the connection
5981   * @return AuthenticationMethod used to establish connection
5982   * @throws IOException
5983   */
5984  private AuthenticationMethod getConnectionAuthenticationMethod()
5985      throws IOException {
5986    UserGroupInformation ugi = getRemoteUser();
5987    AuthenticationMethod authMethod = ugi.getAuthenticationMethod();
5988    if (authMethod == AuthenticationMethod.PROXY) {
5989      authMethod = ugi.getRealUser().getAuthenticationMethod();
5990    }
5991    return authMethod;
5992  }
5993  
5994  /**
5995   * Client invoked methods are invoked over RPC and will be in 
5996   * RPC call context even if the client exits.
5997   */
5998  boolean isExternalInvocation() {
5999    return Server.isRpcInvocation();
6000  }
6001
6002  // optimize ugi lookup for RPC operations to avoid a trip through
6003  // UGI.getCurrentUser which is synch'ed
6004  private static UserGroupInformation getRemoteUser() throws IOException {
6005    return NameNode.getRemoteUser();
6006  }
6007  
6008  /**
6009   * Log fsck event in the audit log 
6010   */
6011  void logFsckEvent(String src, InetAddress remoteAddress) throws IOException {
6012    if (isAuditEnabled()) {
6013      logAuditEvent(true, getRemoteUser(),
6014                    remoteAddress,
6015                    "fsck", src, null, null);
6016    }
6017  }
6018  /**
6019   * Register NameNodeMXBean
6020   */
6021  private void registerMXBean() {
6022    mxbeanName = MBeans.register("NameNode", "NameNodeInfo", this);
6023  }
6024
6025  /**
6026   * Class representing Namenode information for JMX interfaces
6027   */
6028  @Override // NameNodeMXBean
6029  public String getVersion() {
6030    return VersionInfo.getVersion() + ", r" + VersionInfo.getRevision();
6031  }
6032
6033  @Override // NameNodeMXBean
6034  public long getUsed() {
6035    return this.getCapacityUsed();
6036  }
6037
6038  @Override // NameNodeMXBean
6039  public long getFree() {
6040    return this.getCapacityRemaining();
6041  }
6042
6043  @Override // NameNodeMXBean
6044  public long getTotal() {
6045    return this.getCapacityTotal();
6046  }
6047
6048  @Override // NameNodeMXBean
6049  public String getSafemode() {
6050    if (!this.isInSafeMode())
6051      return "";
6052    return "Safe mode is ON. " + this.getSafeModeTip();
6053  }
6054
6055  @Override // NameNodeMXBean
6056  public boolean isUpgradeFinalized() {
6057    return this.getFSImage().isUpgradeFinalized();
6058  }
6059
6060  @Override // NameNodeMXBean
6061  public long getNonDfsUsedSpace() {
6062    return datanodeStatistics.getCapacityUsedNonDFS();
6063  }
6064
6065  @Override // NameNodeMXBean
6066  public float getPercentUsed() {
6067    return datanodeStatistics.getCapacityUsedPercent();
6068  }
6069
6070  @Override // NameNodeMXBean
6071  public long getBlockPoolUsedSpace() {
6072    return datanodeStatistics.getBlockPoolUsed();
6073  }
6074
6075  @Override // NameNodeMXBean
6076  public float getPercentBlockPoolUsed() {
6077    return datanodeStatistics.getPercentBlockPoolUsed();
6078  }
6079
6080  @Override // NameNodeMXBean
6081  public float getPercentRemaining() {
6082    return datanodeStatistics.getCapacityRemainingPercent();
6083  }
6084
6085  @Override // NameNodeMXBean
6086  public long getCacheCapacity() {
6087    return datanodeStatistics.getCacheCapacity();
6088  }
6089
6090  @Override // NameNodeMXBean
6091  public long getCacheUsed() {
6092    return datanodeStatistics.getCacheUsed();
6093  }
6094
6095  @Override // NameNodeMXBean
6096  public long getTotalBlocks() {
6097    return getBlocksTotal();
6098  }
6099
6100  /** @deprecated Use {@link #getFilesTotal()} instead. */
6101  @Deprecated
6102  @Override // NameNodeMXBean
6103  @Metric
6104  public long getTotalFiles() {
6105    return getFilesTotal();
6106  }
6107
6108  @Override // NameNodeMXBean
6109  public long getNumberOfMissingBlocks() {
6110    return getMissingBlocksCount();
6111  }
6112  
6113  @Override // NameNodeMXBean
6114  public long getNumberOfMissingBlocksWithReplicationFactorOne() {
6115    return getMissingReplOneBlocksCount();
6116  }
6117
6118  @Override // NameNodeMXBean
6119  public int getThreads() {
6120    return ManagementFactory.getThreadMXBean().getThreadCount();
6121  }
6122
6123  /**
6124   * Returned information is a JSON representation of map with host name as the
6125   * key and value is a map of live node attribute keys to its values
6126   */
6127  @Override // NameNodeMXBean
6128  public String getLiveNodes() {
6129    final Map<String, Map<String,Object>> info = 
6130      new HashMap<String, Map<String,Object>>();
6131    final List<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>();
6132    blockManager.getDatanodeManager().fetchDatanodes(live, null, false);
6133    for (DatanodeDescriptor node : live) {
6134      ImmutableMap.Builder<String, Object> innerinfo =
6135          ImmutableMap.<String,Object>builder();
6136      innerinfo
6137          .put("infoAddr", node.getInfoAddr())
6138          .put("infoSecureAddr", node.getInfoSecureAddr())
6139          .put("xferaddr", node.getXferAddr())
6140          .put("lastContact", getLastContact(node))
6141          .put("usedSpace", getDfsUsed(node))
6142          .put("adminState", node.getAdminState().toString())
6143          .put("nonDfsUsedSpace", node.getNonDfsUsed())
6144          .put("capacity", node.getCapacity())
6145          .put("numBlocks", node.numBlocks())
6146          .put("version", node.getSoftwareVersion())
6147          .put("used", node.getDfsUsed())
6148          .put("remaining", node.getRemaining())
6149          .put("blockScheduled", node.getBlocksScheduled())
6150          .put("blockPoolUsed", node.getBlockPoolUsed())
6151          .put("blockPoolUsedPercent", node.getBlockPoolUsedPercent())
6152          .put("volfails", node.getVolumeFailures());
6153      VolumeFailureSummary volumeFailureSummary = node.getVolumeFailureSummary();
6154      if (volumeFailureSummary != null) {
6155        innerinfo
6156            .put("failedStorageLocations",
6157                volumeFailureSummary.getFailedStorageLocations())
6158            .put("lastVolumeFailureDate",
6159                volumeFailureSummary.getLastVolumeFailureDate())
6160            .put("estimatedCapacityLostTotal",
6161                volumeFailureSummary.getEstimatedCapacityLostTotal());
6162      }
6163      if (node.getUpgradeDomain() != null) {
6164        innerinfo.put("upgradeDomain", node.getUpgradeDomain());
6165      }
6166      info.put(node.getHostName() + ":" + node.getXferPort(), innerinfo.build());
6167    }
6168    return JSON.toString(info);
6169  }
6170
6171  /**
6172   * Returned information is a JSON representation of map with host name as the
6173   * key and value is a map of dead node attribute keys to its values
6174   */
6175  @Override // NameNodeMXBean
6176  public String getDeadNodes() {
6177    final Map<String, Map<String, Object>> info = 
6178      new HashMap<String, Map<String, Object>>();
6179    final List<DatanodeDescriptor> dead = new ArrayList<DatanodeDescriptor>();
6180    blockManager.getDatanodeManager().fetchDatanodes(null, dead, false);
6181    for (DatanodeDescriptor node : dead) {
6182      Map<String, Object> innerinfo = ImmutableMap.<String, Object>builder()
6183          .put("lastContact", getLastContact(node))
6184          .put("decommissioned", node.isDecommissioned())
6185          .put("xferaddr", node.getXferAddr())
6186          .build();
6187      info.put(node.getHostName() + ":" + node.getXferPort(), innerinfo);
6188    }
6189    return JSON.toString(info);
6190  }
6191
6192  /**
6193   * Returned information is a JSON representation of map with host name as the
6194   * key and value is a map of decommissioning node attribute keys to its
6195   * values
6196   */
6197  @Override // NameNodeMXBean
6198  public String getDecomNodes() {
6199    final Map<String, Map<String, Object>> info = 
6200      new HashMap<String, Map<String, Object>>();
6201    final List<DatanodeDescriptor> decomNodeList = blockManager.getDatanodeManager(
6202        ).getDecommissioningNodes();
6203    for (DatanodeDescriptor node : decomNodeList) {
6204      Map<String, Object> innerinfo = ImmutableMap
6205          .<String, Object> builder()
6206          .put("xferaddr", node.getXferAddr())
6207          .put("underReplicatedBlocks",
6208              node.decommissioningStatus.getUnderReplicatedBlocks())
6209          .put("decommissionOnlyReplicas",
6210              node.decommissioningStatus.getDecommissionOnlyReplicas())
6211          .put("underReplicateInOpenFiles",
6212              node.decommissioningStatus.getUnderReplicatedInOpenFiles())
6213          .build();
6214      info.put(node.getHostName() + ":" + node.getXferPort(), innerinfo);
6215    }
6216    return JSON.toString(info);
6217  }
6218
6219  private long getLastContact(DatanodeDescriptor alivenode) {
6220    return (monotonicNow() - alivenode.getLastUpdateMonotonic())/1000;
6221  }
6222
6223  private long getDfsUsed(DatanodeDescriptor alivenode) {
6224    return alivenode.getDfsUsed();
6225  }
6226
6227  @Override  // NameNodeMXBean
6228  public String getClusterId() {
6229    return getFSImage().getStorage().getClusterID();
6230  }
6231  
6232  @Override  // NameNodeMXBean
6233  public String getBlockPoolId() {
6234    return blockPoolId;
6235  }
6236  
6237  @Override  // NameNodeMXBean
6238  public String getNameDirStatuses() {
6239    Map<String, Map<File, StorageDirType>> statusMap =
6240      new HashMap<String, Map<File, StorageDirType>>();
6241    
6242    Map<File, StorageDirType> activeDirs = new HashMap<File, StorageDirType>();
6243    for (Iterator<StorageDirectory> it
6244        = getFSImage().getStorage().dirIterator(); it.hasNext();) {
6245      StorageDirectory st = it.next();
6246      activeDirs.put(st.getRoot(), st.getStorageDirType());
6247    }
6248    statusMap.put("active", activeDirs);
6249    
6250    List<Storage.StorageDirectory> removedStorageDirs
6251        = getFSImage().getStorage().getRemovedStorageDirs();
6252    Map<File, StorageDirType> failedDirs = new HashMap<File, StorageDirType>();
6253    for (StorageDirectory st : removedStorageDirs) {
6254      failedDirs.put(st.getRoot(), st.getStorageDirType());
6255    }
6256    statusMap.put("failed", failedDirs);
6257    
6258    return JSON.toString(statusMap);
6259  }
6260
6261  @Override // NameNodeMXBean
6262  public String getNodeUsage() {
6263    float median = 0;
6264    float max = 0;
6265    float min = 0;
6266    float dev = 0;
6267
6268    final Map<String, Map<String,Object>> info =
6269        new HashMap<String, Map<String,Object>>();
6270    final List<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>();
6271    blockManager.getDatanodeManager().fetchDatanodes(live, null, true);
6272    for (Iterator<DatanodeDescriptor> it = live.iterator(); it.hasNext();) {
6273      DatanodeDescriptor node = it.next();
6274      if (node.isDecommissionInProgress() || node.isDecommissioned()) {
6275        it.remove();
6276      }
6277    }
6278
6279    if (live.size() > 0) {
6280      float totalDfsUsed = 0;
6281      float[] usages = new float[live.size()];
6282      int i = 0;
6283      for (DatanodeDescriptor dn : live) {
6284        usages[i++] = dn.getDfsUsedPercent();
6285        totalDfsUsed += dn.getDfsUsedPercent();
6286      }
6287      totalDfsUsed /= live.size();
6288      Arrays.sort(usages);
6289      median = usages[usages.length / 2];
6290      max = usages[usages.length - 1];
6291      min = usages[0];
6292
6293      for (i = 0; i < usages.length; i++) {
6294        dev += (usages[i] - totalDfsUsed) * (usages[i] - totalDfsUsed);
6295      }
6296      dev = (float) Math.sqrt(dev / usages.length);
6297    }
6298
6299    final Map<String, Object> innerInfo = new HashMap<String, Object>();
6300    innerInfo.put("min", StringUtils.format("%.2f%%", min));
6301    innerInfo.put("median", StringUtils.format("%.2f%%", median));
6302    innerInfo.put("max", StringUtils.format("%.2f%%", max));
6303    innerInfo.put("stdDev", StringUtils.format("%.2f%%", dev));
6304    info.put("nodeUsage", innerInfo);
6305
6306    return JSON.toString(info);
6307  }
6308
6309  @Override  // NameNodeMXBean
6310  public String getNameJournalStatus() {
6311    List<Map<String, String>> jasList = new ArrayList<Map<String, String>>();
6312    FSEditLog log = getFSImage().getEditLog();
6313    if (log != null) {
6314      // This flag can be false because we cannot hold a lock of FSEditLog
6315      // for metrics.
6316      boolean openForWrite = log.isOpenForWriteWithoutLock();
6317      for (JournalAndStream jas : log.getJournals()) {
6318        final Map<String, String> jasMap = new HashMap<String, String>();
6319        String manager = jas.getManager().toString();
6320
6321        jasMap.put("required", String.valueOf(jas.isRequired()));
6322        jasMap.put("disabled", String.valueOf(jas.isDisabled()));
6323        jasMap.put("manager", manager);
6324
6325        if (jas.isDisabled()) {
6326          jasMap.put("stream", "Failed");
6327        } else if (openForWrite) {
6328          EditLogOutputStream elos = jas.getCurrentStream();
6329          if (elos != null) {
6330            jasMap.put("stream", elos.generateReport());
6331          } else {
6332            jasMap.put("stream", "not currently writing");
6333          }
6334        } else {
6335          jasMap.put("stream", "open for read");
6336        }
6337        jasList.add(jasMap);
6338      }
6339    }
6340    return JSON.toString(jasList);
6341  }
6342
6343  @Override // NameNodeMxBean
6344  public String getJournalTransactionInfo() {
6345    Map<String, String> txnIdMap = new HashMap<String, String>();
6346    txnIdMap.put("LastAppliedOrWrittenTxId",
6347        Long.toString(this.getFSImage().getLastAppliedOrWrittenTxId()));
6348    txnIdMap.put("MostRecentCheckpointTxId",
6349        Long.toString(this.getFSImage().getMostRecentCheckpointTxId()));
6350    return JSON.toString(txnIdMap);
6351  }
6352  
6353  /** @deprecated Use {@link #getNNStartedTimeInMillis()} instead. */
6354  @Override  // NameNodeMXBean
6355  @Deprecated
6356  public String getNNStarted() {
6357    return getStartTime().toString();
6358  }
6359
6360  @Override // NameNodeMXBean
6361  public long getNNStartedTimeInMillis() {
6362    return startTime;
6363  }
6364
6365  @Override  // NameNodeMXBean
6366  public String getCompileInfo() {
6367    return VersionInfo.getDate() + " by " + VersionInfo.getUser() +
6368        " from " + VersionInfo.getBranch();
6369  }
6370
6371  /** @return the block manager. */
6372  public BlockManager getBlockManager() {
6373    return blockManager;
6374  }
6375
6376  public BlockIdManager getBlockIdManager() {
6377    return blockIdManager;
6378  }
6379
6380  /** @return the FSDirectory. */
6381  @Override
6382  public FSDirectory getFSDirectory() {
6383    return dir;
6384  }
6385  /** Set the FSDirectory. */
6386  @VisibleForTesting
6387  public void setFSDirectory(FSDirectory dir) {
6388    this.dir = dir;
6389  }
6390  /** @return the cache manager. */
6391  @Override
6392  public CacheManager getCacheManager() {
6393    return cacheManager;
6394  }
6395
6396  @Override
6397  public HAContext getHAContext() {
6398    return haContext;
6399  }
6400
6401  @Override  // NameNodeMXBean
6402  public String getCorruptFiles() {
6403    List<String> list = new ArrayList<String>();
6404    Collection<FSNamesystem.CorruptFileBlockInfo> corruptFileBlocks;
6405    try {
6406      corruptFileBlocks = listCorruptFileBlocks("/", null);
6407      int corruptFileCount = corruptFileBlocks.size();
6408      if (corruptFileCount != 0) {
6409        for (FSNamesystem.CorruptFileBlockInfo c : corruptFileBlocks) {
6410          list.add(c.toString());
6411        }
6412      }
6413    } catch (StandbyException e) {
6414      if (LOG.isDebugEnabled()) {
6415        LOG.debug("Get corrupt file blocks returned error: " + e.getMessage());
6416      }
6417    } catch (IOException e) {
6418      LOG.warn("Get corrupt file blocks returned error: " + e.getMessage());
6419    }
6420    return JSON.toString(list);
6421  }
6422
6423  @Override  // NameNodeMXBean
6424  public long getNumberOfSnapshottableDirs() {
6425    return snapshotManager.getNumSnapshottableDirs();
6426  }
6427
6428  /**
6429   * Get the list of corrupt blocks and corresponding full file path
6430   * including snapshots in given snapshottable directories.
6431   * @param path Restrict corrupt files to this portion of namespace.
6432   * @param snapshottableDirs Snapshottable directories. Passing in null
6433   *                          will only return corrupt blocks in non-snapshots.
6434   * @param cookieTab Support for continuation; cookieTab tells where
6435   *                  to start from.
6436   * @return a list in which each entry describes a corrupt file/block
6437   * @throws IOException
6438   */
6439  List<String> listCorruptFileBlocksWithSnapshot(String path,
6440      List<String> snapshottableDirs, String[] cookieTab) throws IOException {
6441    final Collection<CorruptFileBlockInfo> corruptFileBlocks =
6442        listCorruptFileBlocks(path, cookieTab);
6443    List<String> list = new ArrayList<String>();
6444
6445    // Precalculate snapshottableFeature list
6446    List<DirectorySnapshottableFeature> lsf = new ArrayList<>();
6447    if (snapshottableDirs != null) {
6448      for (String snap : snapshottableDirs) {
6449        final INode isnap = getFSDirectory().getINode(snap, DirOp.READ_LINK);
6450        final DirectorySnapshottableFeature sf =
6451            isnap.asDirectory().getDirectorySnapshottableFeature();
6452        if (sf == null) {
6453          throw new SnapshotException(
6454              "Directory is not a snapshottable directory: " + snap);
6455        }
6456        lsf.add(sf);
6457      }
6458    }
6459
6460    for (CorruptFileBlockInfo c : corruptFileBlocks) {
6461      if (getFileInfo(c.path, true) != null) {
6462        list.add(c.toString());
6463      }
6464      final Collection<String> snaps = FSDirSnapshotOp
6465          .getSnapshotFiles(getFSDirectory(), lsf, c.path);
6466      if (snaps != null) {
6467        for (String snap : snaps) {
6468          // follow the syntax of CorruptFileBlockInfo#toString()
6469          list.add(c.block.getBlockName() + "\t" + snap);
6470        }
6471      }
6472    }
6473    return list;
6474  }
6475
6476  @Override  //NameNodeMXBean
6477  public int getDistinctVersionCount() {
6478    return blockManager.getDatanodeManager().getDatanodesSoftwareVersions()
6479      .size();
6480  }
6481
6482  @Override  //NameNodeMXBean
6483  public Map<String, Integer> getDistinctVersions() {
6484    return blockManager.getDatanodeManager().getDatanodesSoftwareVersions();
6485  }
6486
6487  @Override  //NameNodeMXBean
6488  public String getSoftwareVersion() {
6489    return VersionInfo.getVersion();
6490  }
6491
6492  @Override // NameNodeStatusMXBean
6493  public String getNameDirSize() {
6494    return getFSImage().getStorage().getNNDirectorySize();
6495  }
6496
6497  /**
6498   * Verifies that the given identifier and password are valid and match.
6499   * @param identifier Token identifier.
6500   * @param password Password in the token.
6501   */
6502  public synchronized void verifyToken(DelegationTokenIdentifier identifier,
6503      byte[] password) throws InvalidToken, RetriableException {
6504    try {
6505      getDelegationTokenSecretManager().verifyToken(identifier, password);
6506    } catch (InvalidToken it) {
6507      if (inTransitionToActive()) {
6508        throw new RetriableException(it);
6509      }
6510      throw it;
6511    }
6512  }
6513  
6514  @Override
6515  public boolean isGenStampInFuture(Block block) {
6516    return blockIdManager.isGenStampInFuture(block);
6517  }
6518
6519  @VisibleForTesting
6520  public EditLogTailer getEditLogTailer() {
6521    return editLogTailer;
6522  }
6523  
6524  @VisibleForTesting
6525  public void setEditLogTailerForTests(EditLogTailer tailer) {
6526    this.editLogTailer = tailer;
6527  }
6528  
6529  @VisibleForTesting
6530  void setFsLockForTests(ReentrantReadWriteLock lock) {
6531    this.fsLock.coarseLock = lock;
6532  }
6533  
6534  @VisibleForTesting
6535  public ReentrantReadWriteLock getFsLockForTests() {
6536    return fsLock.coarseLock;
6537  }
6538  
6539  @VisibleForTesting
6540  public ReentrantLock getCpLockForTests() {
6541    return cpLock;
6542  }
6543
6544  @VisibleForTesting
6545  public SafeModeInfo getSafeModeInfoForTests() {
6546    return safeMode;
6547  }
6548  
6549  @VisibleForTesting
6550  public void setNNResourceChecker(NameNodeResourceChecker nnResourceChecker) {
6551    this.nnResourceChecker = nnResourceChecker;
6552  }
6553
6554  public SnapshotManager getSnapshotManager() {
6555    return snapshotManager;
6556  }
6557  
6558  /** Allow snapshot on a directory. */
6559  void allowSnapshot(String path) throws IOException {
6560    checkOperation(OperationCategory.WRITE);
6561    final String operationName = "allowSnapshot";
6562    boolean success = false;
6563    writeLock();
6564    try {
6565      checkOperation(OperationCategory.WRITE);
6566      checkNameNodeSafeMode("Cannot allow snapshot for " + path);
6567      checkSuperuserPrivilege();
6568      FSDirSnapshotOp.allowSnapshot(dir, snapshotManager, path);
6569      success = true;
6570    } finally {
6571      writeUnlock(operationName);
6572    }
6573    getEditLog().logSync();
6574    logAuditEvent(success, operationName, path, null, null);
6575  }
6576  
6577  /** Disallow snapshot on a directory. */
6578  void disallowSnapshot(String path) throws IOException {
6579    checkOperation(OperationCategory.WRITE);
6580    final String operationName = "disallowSnapshot";
6581    boolean success = false;
6582    writeLock();
6583    try {
6584      checkOperation(OperationCategory.WRITE);
6585      checkNameNodeSafeMode("Cannot disallow snapshot for " + path);
6586      checkSuperuserPrivilege();
6587      FSDirSnapshotOp.disallowSnapshot(dir, snapshotManager, path);
6588      success = true;
6589    } finally {
6590      writeUnlock(operationName);
6591    }
6592    getEditLog().logSync();
6593    logAuditEvent(success, operationName, path, null, null);
6594  }
6595  
6596  /**
6597   * Create a snapshot
6598   * @param snapshotRoot The directory path where the snapshot is taken
6599   * @param snapshotName The name of the snapshot
6600   */
6601  String createSnapshot(String snapshotRoot, String snapshotName,
6602                        boolean logRetryCache) throws IOException {
6603    final String operationName = "createSnapshot";
6604    String snapshotPath = null;
6605    writeLock();
6606    try {
6607      checkOperation(OperationCategory.WRITE);
6608      checkNameNodeSafeMode("Cannot create snapshot for " + snapshotRoot);
6609      snapshotPath = FSDirSnapshotOp.createSnapshot(dir,
6610          snapshotManager, snapshotRoot, snapshotName, logRetryCache);
6611    } finally {
6612      writeUnlock(operationName);
6613    }
6614    getEditLog().logSync();
6615    logAuditEvent(snapshotPath != null, operationName, snapshotRoot,
6616        snapshotPath, null);
6617    return snapshotPath;
6618  }
6619  
6620  /**
6621   * Rename a snapshot
6622   * @param path The directory path where the snapshot was taken
6623   * @param snapshotOldName Old snapshot name
6624   * @param snapshotNewName New snapshot name
6625   * @throws SafeModeException
6626   * @throws IOException 
6627   */
6628  void renameSnapshot(
6629      String path, String snapshotOldName, String snapshotNewName,
6630      boolean logRetryCache) throws IOException {
6631    final String operationName = "renameSnapshot";
6632    boolean success = false;
6633    writeLock();
6634    try {
6635      checkOperation(OperationCategory.WRITE);
6636      checkNameNodeSafeMode("Cannot rename snapshot for " + path);
6637      FSDirSnapshotOp.renameSnapshot(dir, snapshotManager, path,
6638          snapshotOldName, snapshotNewName, logRetryCache);
6639      success = true;
6640    } finally {
6641      writeUnlock(operationName);
6642    }
6643    getEditLog().logSync();
6644    String oldSnapshotRoot = Snapshot.getSnapshotPath(path, snapshotOldName);
6645    String newSnapshotRoot = Snapshot.getSnapshotPath(path, snapshotNewName);
6646    logAuditEvent(success, operationName, oldSnapshotRoot,
6647        newSnapshotRoot, null);
6648  }
6649  
6650  /**
6651   * Get the list of snapshottable directories that are owned 
6652   * by the current user. Return all the snapshottable directories if the 
6653   * current user is a super user.
6654   * @return The list of all the current snapshottable directories
6655   * @throws IOException
6656   */
6657  public SnapshottableDirectoryStatus[] getSnapshottableDirListing()
6658      throws IOException {
6659    final String operationName = "listSnapshottableDirectory";
6660    SnapshottableDirectoryStatus[] status = null;
6661    checkOperation(OperationCategory.READ);
6662    boolean success = false;
6663    readLock();
6664    try {
6665      checkOperation(OperationCategory.READ);
6666      status = FSDirSnapshotOp.getSnapshottableDirListing(dir, snapshotManager);
6667      success = true;
6668    } finally {
6669      readUnlock(operationName);
6670    }
6671    logAuditEvent(success, operationName, null, null, null);
6672    return status;
6673  }
6674  
6675  /**
6676   * Get the difference between two snapshots (or between a snapshot and the
6677   * current status) of a snapshottable directory.
6678   * 
6679   * @param path The full path of the snapshottable directory.
6680   * @param fromSnapshot Name of the snapshot to calculate the diff from. Null
6681   *          or empty string indicates the current tree.
6682   * @param toSnapshot Name of the snapshot to calculated the diff to. Null or
6683   *          empty string indicates the current tree.
6684   * @return A report about the difference between {@code fromSnapshot} and 
6685   *         {@code toSnapshot}. Modified/deleted/created/renamed files and 
6686   *         directories belonging to the snapshottable directories are listed 
6687   *         and labeled as M/-/+/R respectively. 
6688   * @throws IOException
6689   */
6690  SnapshotDiffReport getSnapshotDiffReport(String path,
6691      String fromSnapshot, String toSnapshot) throws IOException {
6692    final String operationName = "computeSnapshotDiff";
6693    SnapshotDiffReport diffs = null;
6694    checkOperation(OperationCategory.READ);
6695    readLock();
6696    try {
6697      checkOperation(OperationCategory.READ);
6698      diffs = FSDirSnapshotOp.getSnapshotDiffReport(dir, snapshotManager,
6699          path, fromSnapshot, toSnapshot);
6700    } finally {
6701      readUnlock(operationName);
6702    }
6703    String fromSnapshotRoot = (fromSnapshot == null || fromSnapshot.isEmpty()) ?
6704        path : Snapshot.getSnapshotPath(path, fromSnapshot);
6705    String toSnapshotRoot = (toSnapshot == null || toSnapshot.isEmpty()) ?
6706        path : Snapshot.getSnapshotPath(path, toSnapshot);
6707    logAuditEvent(diffs != null, operationName, fromSnapshotRoot,
6708        toSnapshotRoot, null);
6709    return diffs;
6710  }
6711  
6712  /**
6713   * Delete a snapshot of a snapshottable directory
6714   * @param snapshotRoot The snapshottable directory
6715   * @param snapshotName The name of the to-be-deleted snapshot
6716   * @throws SafeModeException
6717   * @throws IOException
6718   */
6719  void deleteSnapshot(String snapshotRoot, String snapshotName,
6720      boolean logRetryCache) throws IOException {
6721    final String operationName = "deleteSnapshot";
6722    boolean success = false;
6723    writeLock();
6724    BlocksMapUpdateInfo blocksToBeDeleted = null;
6725    try {
6726      checkOperation(OperationCategory.WRITE);
6727      checkNameNodeSafeMode("Cannot delete snapshot for " + snapshotRoot);
6728
6729      blocksToBeDeleted = FSDirSnapshotOp.deleteSnapshot(dir, snapshotManager,
6730          snapshotRoot, snapshotName, logRetryCache);
6731      success = true;
6732    } finally {
6733      writeUnlock(operationName);
6734    }
6735    getEditLog().logSync();
6736
6737    // Breaking the pattern as removing blocks have to happen outside of the
6738    // global lock
6739    if (blocksToBeDeleted != null) {
6740      removeBlocks(blocksToBeDeleted);
6741    }
6742
6743    String rootPath = Snapshot.getSnapshotPath(snapshotRoot, snapshotName);
6744    logAuditEvent(success, operationName, rootPath, null, null);
6745  }
6746
6747  /**
6748   * Remove a list of INodeDirectorySnapshottable from the SnapshotManager
6749   * @param toRemove the list of INodeDirectorySnapshottable to be removed
6750   */
6751  void removeSnapshottableDirs(List<INodeDirectory> toRemove) {
6752    if (snapshotManager != null) {
6753      snapshotManager.removeSnapshottable(toRemove);
6754    }
6755  }
6756
6757  RollingUpgradeInfo queryRollingUpgrade() throws IOException {
6758    checkSuperuserPrivilege();
6759    checkOperation(OperationCategory.READ);
6760    readLock();
6761    try {
6762      if (!isRollingUpgrade()) {
6763        return null;
6764      }
6765      Preconditions.checkNotNull(rollingUpgradeInfo);
6766      boolean hasRollbackImage = this.getFSImage().hasRollbackFSImage();
6767      rollingUpgradeInfo.setCreatedRollbackImages(hasRollbackImage);
6768      return rollingUpgradeInfo;
6769    } finally {
6770      readUnlock("queryRollingUpgrade");
6771    }
6772  }
6773
6774  RollingUpgradeInfo startRollingUpgrade() throws IOException {
6775    final String operationName = "startRollingUpgrade";
6776    checkSuperuserPrivilege();
6777    checkOperation(OperationCategory.WRITE);
6778    writeLock();
6779    try {
6780      checkOperation(OperationCategory.WRITE);
6781      if (isRollingUpgrade()) {
6782        return rollingUpgradeInfo;
6783      }
6784      long startTime = now();
6785      if (!haEnabled) { // for non-HA, we require NN to be in safemode
6786        startRollingUpgradeInternalForNonHA(startTime);
6787      } else { // for HA, NN cannot be in safemode
6788        checkNameNodeSafeMode("Failed to start rolling upgrade");
6789        startRollingUpgradeInternal(startTime);
6790      }
6791
6792      getEditLog().logStartRollingUpgrade(rollingUpgradeInfo.getStartTime());
6793      if (haEnabled) {
6794        // roll the edit log to make sure the standby NameNode can tail
6795        getFSImage().rollEditLog(getEffectiveLayoutVersion());
6796      }
6797    } finally {
6798      writeUnlock(operationName);
6799    }
6800
6801    getEditLog().logSync();
6802    if (auditLog.isInfoEnabled() && isExternalInvocation()) {
6803      logAuditEvent(true, operationName, null, null, null);
6804    }
6805    return rollingUpgradeInfo;
6806  }
6807
6808  /**
6809   * Update internal state to indicate that a rolling upgrade is in progress.
6810   * @param startTime rolling upgrade start time
6811   */
6812  void startRollingUpgradeInternal(long startTime)
6813      throws IOException {
6814    checkRollingUpgrade("start rolling upgrade");
6815    getFSImage().checkUpgrade();
6816    setRollingUpgradeInfo(false, startTime);
6817  }
6818
6819  /**
6820   * Update internal state to indicate that a rolling upgrade is in progress for
6821   * non-HA setup. This requires the namesystem is in SafeMode and after doing a
6822   * checkpoint for rollback the namesystem will quit the safemode automatically 
6823   */
6824  private void startRollingUpgradeInternalForNonHA(long startTime)
6825      throws IOException {
6826    Preconditions.checkState(!haEnabled);
6827    if (!isInSafeMode()) {
6828      throw new IOException("Safe mode should be turned ON "
6829          + "in order to create namespace image.");
6830    }
6831    checkRollingUpgrade("start rolling upgrade");
6832    getFSImage().checkUpgrade();
6833    // in non-HA setup, we do an extra checkpoint to generate a rollback image
6834    getFSImage().saveNamespace(this, NameNodeFile.IMAGE_ROLLBACK, null);
6835    LOG.info("Successfully saved namespace for preparing rolling upgrade.");
6836
6837    // leave SafeMode automatically
6838    setSafeMode(SafeModeAction.SAFEMODE_LEAVE);
6839    setRollingUpgradeInfo(true, startTime);
6840  }
6841
6842  void setRollingUpgradeInfo(boolean createdRollbackImages, long startTime) {
6843    rollingUpgradeInfo = new RollingUpgradeInfo(blockPoolId,
6844        createdRollbackImages, startTime, 0L);
6845  }
6846
6847  public void setCreatedRollbackImages(boolean created) {
6848    if (rollingUpgradeInfo != null) {
6849      rollingUpgradeInfo.setCreatedRollbackImages(created);
6850    }
6851  }
6852
6853  public RollingUpgradeInfo getRollingUpgradeInfo() {
6854    return rollingUpgradeInfo;
6855  }
6856
6857  public boolean isNeedRollbackFsImage() {
6858    return needRollbackFsImage;
6859  }
6860
6861  public void setNeedRollbackFsImage(boolean needRollbackFsImage) {
6862    this.needRollbackFsImage = needRollbackFsImage;
6863  }
6864
6865  @Override  // NameNodeMXBean
6866  public RollingUpgradeInfo.Bean getRollingUpgradeStatus() {
6867    if (!isRollingUpgrade()) {
6868      return null;
6869    }
6870    RollingUpgradeInfo upgradeInfo = getRollingUpgradeInfo();
6871    if (upgradeInfo.createdRollbackImages()) {
6872      return new RollingUpgradeInfo.Bean(upgradeInfo);
6873    }
6874    readLock();
6875    try {
6876      // check again after acquiring the read lock.
6877      upgradeInfo = getRollingUpgradeInfo();
6878      if (upgradeInfo == null) {
6879        return null;
6880      }
6881      if (!upgradeInfo.createdRollbackImages()) {
6882        boolean hasRollbackImage = this.getFSImage().hasRollbackFSImage();
6883        upgradeInfo.setCreatedRollbackImages(hasRollbackImage);
6884      }
6885    } catch (IOException ioe) {
6886      LOG.warn("Encountered exception setting Rollback Image", ioe);
6887    } finally {
6888      readUnlock("getRollingUpgradeStatus");
6889    }
6890    return new RollingUpgradeInfo.Bean(upgradeInfo);
6891  }
6892
6893  /** Is rolling upgrade in progress? */
6894  public boolean isRollingUpgrade() {
6895    return rollingUpgradeInfo != null && !rollingUpgradeInfo.isFinalized();
6896  }
6897
6898  /**
6899   * Returns the layout version in effect.  Under normal operation, this is the
6900   * same as the software's current layout version, defined in
6901   * {@link NameNodeLayoutVersion#CURRENT_LAYOUT_VERSION}.  During a rolling
6902   * upgrade, this can retain the layout version that was persisted to metadata
6903   * prior to starting the rolling upgrade, back to a lower bound defined in
6904   * {@link NameNodeLayoutVersion#MINIMUM_COMPATIBLE_LAYOUT_VERSION}.  New
6905   * fsimage files and edit log segments will continue to be written with this
6906   * older layout version, so that the files are still readable by the old
6907   * software version if the admin chooses to downgrade.
6908   *
6909   * @return layout version in effect
6910   */
6911  public int getEffectiveLayoutVersion() {
6912    return getEffectiveLayoutVersion(isRollingUpgrade(),
6913        fsImage.getStorage().getLayoutVersion(),
6914        NameNodeLayoutVersion.MINIMUM_COMPATIBLE_LAYOUT_VERSION,
6915        NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
6916  }
6917
6918  @VisibleForTesting
6919  static int getEffectiveLayoutVersion(boolean isRollingUpgrade, int storageLV,
6920      int minCompatLV, int currentLV) {
6921    if (isRollingUpgrade) {
6922      if (storageLV <= minCompatLV) {
6923        // The prior layout version satisfies the minimum compatible layout
6924        // version of the current software.  Keep reporting the prior layout
6925        // as the effective one.  Downgrade is possible.
6926        return storageLV;
6927      }
6928    }
6929    // The current software cannot satisfy the layout version of the prior
6930    // software.  Proceed with using the current layout version.
6931    return currentLV;
6932  }
6933
6934  /**
6935   * Performs a pre-condition check that the layout version in effect is
6936   * sufficient to support the requested {@link Feature}.  If not, then the
6937   * method throws {@link HadoopIllegalArgumentException} to deny the operation.
6938   * This exception class is registered as a terse exception, so it prevents
6939   * verbose stack traces in the NameNode log.  During a rolling upgrade, this
6940   * method is used to restrict usage of new features.  This prevents writing
6941   * new edit log operations that would be unreadable by the old software
6942   * version if the admin chooses to downgrade.
6943   *
6944   * @param f feature to check
6945   * @throws HadoopIllegalArgumentException if the current layout version in
6946   *     effect is insufficient to support the feature
6947   */
6948  private void requireEffectiveLayoutVersionForFeature(Feature f)
6949      throws HadoopIllegalArgumentException {
6950    int lv = getEffectiveLayoutVersion();
6951    if (!NameNodeLayoutVersion.supports(f, lv)) {
6952      throw new HadoopIllegalArgumentException(String.format(
6953          "Feature %s unsupported at NameNode layout version %d.  If a " +
6954          "rolling upgrade is in progress, then it must be finalized before " +
6955          "using this feature.", f, lv));
6956    }
6957  }
6958
6959  void checkRollingUpgrade(String action) throws RollingUpgradeException {
6960    if (isRollingUpgrade()) {
6961      throw new RollingUpgradeException("Failed to " + action
6962          + " since a rolling upgrade is already in progress."
6963          + " Existing rolling upgrade info:\n" + rollingUpgradeInfo);
6964    }
6965  }
6966
6967  RollingUpgradeInfo finalizeRollingUpgrade() throws IOException {
6968    final String operationName = "finalizeRollingUpgrade";
6969    checkSuperuserPrivilege();
6970    checkOperation(OperationCategory.WRITE);
6971    writeLock();
6972    try {
6973      checkOperation(OperationCategory.WRITE);
6974      if (!isRollingUpgrade()) {
6975        return null;
6976      }
6977      checkNameNodeSafeMode("Failed to finalize rolling upgrade");
6978
6979      finalizeRollingUpgradeInternal(now());
6980      getEditLog().logFinalizeRollingUpgrade(rollingUpgradeInfo.getFinalizeTime());
6981      if (haEnabled) {
6982        // roll the edit log to make sure the standby NameNode can tail
6983        getFSImage().rollEditLog(getEffectiveLayoutVersion());
6984      }
6985      getFSImage().updateStorageVersion();
6986      getFSImage().renameCheckpoint(NameNodeFile.IMAGE_ROLLBACK,
6987          NameNodeFile.IMAGE);
6988    } finally {
6989      writeUnlock(operationName);
6990    }
6991
6992    if (!haEnabled) {
6993      // Sync not needed for ha since the edit was rolled after logging.
6994      getEditLog().logSync();
6995    }
6996
6997    if (auditLog.isInfoEnabled() && isExternalInvocation()) {
6998      logAuditEvent(true, operationName, null, null, null);
6999    }
7000    return rollingUpgradeInfo;
7001  }
7002
7003  void finalizeRollingUpgradeInternal(long finalizeTime) {
7004    // Set the finalize time
7005    rollingUpgradeInfo.finalize(finalizeTime);
7006  }
7007
7008  long addCacheDirective(CacheDirectiveInfo directive,
7009                         EnumSet<CacheFlag> flags, boolean logRetryCache)
7010      throws IOException {
7011    final String operationName = "addCacheDirective";
7012    CacheDirectiveInfo effectiveDirective = null;
7013    if (!flags.contains(CacheFlag.FORCE)) {
7014      cacheManager.waitForRescanIfNeeded();
7015    }
7016    writeLock();
7017    try {
7018      checkOperation(OperationCategory.WRITE);
7019      checkNameNodeSafeMode("Cannot add cache directive");
7020      effectiveDirective = FSNDNCacheOp.addCacheDirective(this, cacheManager,
7021          directive, flags, logRetryCache);
7022    } finally {
7023      writeUnlock(operationName);
7024      boolean success = effectiveDirective != null;
7025      if (success) {
7026        getEditLog().logSync();
7027      }
7028
7029      String effectiveDirectiveStr = effectiveDirective != null ?
7030          effectiveDirective.toString() : null;
7031      logAuditEvent(success, operationName, effectiveDirectiveStr,
7032          null, null);
7033    }
7034    return effectiveDirective != null ? effectiveDirective.getId() : 0;
7035  }
7036
7037  void modifyCacheDirective(CacheDirectiveInfo directive,
7038      EnumSet<CacheFlag> flags, boolean logRetryCache) throws IOException {
7039    final String operationName = "modifyCacheDirective";
7040    boolean success = false;
7041    if (!flags.contains(CacheFlag.FORCE)) {
7042      cacheManager.waitForRescanIfNeeded();
7043    }
7044    writeLock();
7045    try {
7046      checkOperation(OperationCategory.WRITE);
7047      checkNameNodeSafeMode("Cannot add cache directive");
7048      FSNDNCacheOp.modifyCacheDirective(this, cacheManager, directive, flags,
7049          logRetryCache);
7050      success = true;
7051    } finally {
7052      writeUnlock(operationName);
7053      if (success) {
7054        getEditLog().logSync();
7055      }
7056      final String idStr = "{id: " + directive.getId() + "}";
7057      logAuditEvent(success, operationName, idStr,
7058          directive.toString(), null);
7059    }
7060  }
7061
7062  void removeCacheDirective(long id, boolean logRetryCache) throws IOException {
7063    final String operationName = "removeCacheDirective";
7064    boolean success = false;
7065    writeLock();
7066    try {
7067      checkOperation(OperationCategory.WRITE);
7068      checkNameNodeSafeMode("Cannot remove cache directives");
7069      FSNDNCacheOp.removeCacheDirective(this, cacheManager, id, logRetryCache);
7070      success = true;
7071    } finally {
7072      writeUnlock(operationName);
7073      String idStr = "{id: " + Long.toString(id) + "}";
7074      logAuditEvent(success, operationName, idStr, null,
7075          null);
7076    }
7077    getEditLog().logSync();
7078  }
7079
7080  BatchedListEntries<CacheDirectiveEntry> listCacheDirectives(
7081      long startId, CacheDirectiveInfo filter) throws IOException {
7082    final String operationName = "listCacheDirectives";
7083    checkOperation(OperationCategory.READ);
7084    BatchedListEntries<CacheDirectiveEntry> results;
7085    cacheManager.waitForRescanIfNeeded();
7086    readLock();
7087    boolean success = false;
7088    try {
7089      checkOperation(OperationCategory.READ);
7090      results = FSNDNCacheOp.listCacheDirectives(this, cacheManager, startId,
7091          filter);
7092      success = true;
7093    } finally {
7094      readUnlock(operationName);
7095      logAuditEvent(success, operationName, filter.toString(), null,
7096          null);
7097    }
7098    return results;
7099  }
7100
7101  void addCachePool(CachePoolInfo req, boolean logRetryCache)
7102      throws IOException {
7103    final String operationName = "addCachePool";
7104    writeLock();
7105    boolean success = false;
7106    String poolInfoStr = null;
7107    try {
7108      checkOperation(OperationCategory.WRITE);
7109      checkNameNodeSafeMode("Cannot add cache pool"
7110          + (req == null ? null : req.getPoolName()));
7111      CachePoolInfo info = FSNDNCacheOp.addCachePool(this, cacheManager, req,
7112          logRetryCache);
7113      poolInfoStr = info.toString();
7114      success = true;
7115    } finally {
7116      writeUnlock(operationName);
7117      logAuditEvent(success, operationName, poolInfoStr, null, null);
7118    }
7119    
7120    getEditLog().logSync();
7121  }
7122
7123  void modifyCachePool(CachePoolInfo req, boolean logRetryCache)
7124      throws IOException {
7125    final String operationName = "modifyCachePool";
7126    writeLock();
7127    boolean success = false;
7128    try {
7129      checkOperation(OperationCategory.WRITE);
7130      checkNameNodeSafeMode("Cannot modify cache pool"
7131          + (req == null ? null : req.getPoolName()));
7132      FSNDNCacheOp.modifyCachePool(this, cacheManager, req, logRetryCache);
7133      success = true;
7134    } finally {
7135      writeUnlock(operationName);
7136      String poolNameStr = "{poolName: " +
7137          (req == null ? null : req.getPoolName()) + "}";
7138      logAuditEvent(success, operationName, poolNameStr,
7139                    req == null ? null : req.toString(), null);
7140    }
7141
7142    getEditLog().logSync();
7143  }
7144
7145  void removeCachePool(String cachePoolName, boolean logRetryCache)
7146      throws IOException {
7147    final String operationName = "removeCachePool";
7148    writeLock();
7149    boolean success = false;
7150    try {
7151      checkOperation(OperationCategory.WRITE);
7152      checkNameNodeSafeMode("Cannot modify cache pool" + cachePoolName);
7153      FSNDNCacheOp.removeCachePool(this, cacheManager, cachePoolName,
7154          logRetryCache);
7155      success = true;
7156    } finally {
7157      writeUnlock(operationName);
7158      String poolNameStr = "{poolName: " + cachePoolName + "}";
7159      logAuditEvent(success, operationName, poolNameStr, null, null);
7160    }
7161    
7162    getEditLog().logSync();
7163  }
7164
7165  BatchedListEntries<CachePoolEntry> listCachePools(String prevKey)
7166      throws IOException {
7167    final String operationName = "listCachePools";
7168    BatchedListEntries<CachePoolEntry> results;
7169    checkOperation(OperationCategory.READ);
7170    boolean success = false;
7171    cacheManager.waitForRescanIfNeeded();
7172    readLock();
7173    try {
7174      checkOperation(OperationCategory.READ);
7175      results = FSNDNCacheOp.listCachePools(this, cacheManager, prevKey);
7176      success = true;
7177    } finally {
7178      readUnlock(operationName);
7179      logAuditEvent(success, operationName, null, null, null);
7180    }
7181    return results;
7182  }
7183
7184  void modifyAclEntries(final String src, List<AclEntry> aclSpec)
7185      throws IOException {
7186    final String operationName = "modifyAclEntries";
7187    HdfsFileStatus auditStat = null;
7188    checkOperation(OperationCategory.WRITE);
7189    writeLock();
7190    try {
7191      checkOperation(OperationCategory.WRITE);
7192      checkNameNodeSafeMode("Cannot modify ACL entries on " + src);
7193      auditStat = FSDirAclOp.modifyAclEntries(dir, src, aclSpec);
7194    } catch (AccessControlException e) {
7195      logAuditEvent(false, operationName, src);
7196      throw e;
7197    } finally {
7198      writeUnlock(operationName);
7199    }
7200    getEditLog().logSync();
7201    logAuditEvent(true, operationName, src, null, auditStat);
7202  }
7203
7204  void removeAclEntries(final String src, List<AclEntry> aclSpec)
7205      throws IOException {
7206    final String operationName = "removeAclEntries";
7207    checkOperation(OperationCategory.WRITE);
7208    HdfsFileStatus auditStat = null;
7209    writeLock();
7210    try {
7211      checkOperation(OperationCategory.WRITE);
7212      checkNameNodeSafeMode("Cannot remove ACL entries on " + src);
7213      auditStat = FSDirAclOp.removeAclEntries(dir, src, aclSpec);
7214    } catch (AccessControlException e) {
7215      logAuditEvent(false, operationName, src);
7216      throw e;
7217    } finally {
7218      writeUnlock(operationName);
7219    }
7220    getEditLog().logSync();
7221    logAuditEvent(true, operationName, src, null, auditStat);
7222  }
7223
7224  void removeDefaultAcl(final String src) throws IOException {
7225    final String operationName = "removeDefaultAcl";
7226    HdfsFileStatus auditStat = null;
7227    checkOperation(OperationCategory.WRITE);
7228    writeLock();
7229    try {
7230      checkOperation(OperationCategory.WRITE);
7231      checkNameNodeSafeMode("Cannot remove default ACL entries on " + src);
7232      auditStat = FSDirAclOp.removeDefaultAcl(dir, src);
7233    } catch (AccessControlException e) {
7234      logAuditEvent(false, operationName, src);
7235      throw e;
7236    } finally {
7237      writeUnlock(operationName);
7238    }
7239    getEditLog().logSync();
7240    logAuditEvent(true, operationName, src, null, auditStat);
7241  }
7242
7243  void removeAcl(final String src) throws IOException {
7244    final String operationName = "removeAcl";
7245    HdfsFileStatus auditStat = null;
7246    checkOperation(OperationCategory.WRITE);
7247    writeLock();
7248    try {
7249      checkOperation(OperationCategory.WRITE);
7250      checkNameNodeSafeMode("Cannot remove ACL on " + src);
7251      auditStat = FSDirAclOp.removeAcl(dir, src);
7252    } catch (AccessControlException e) {
7253      logAuditEvent(false, operationName, src);
7254      throw e;
7255    } finally {
7256      writeUnlock(operationName);
7257    }
7258    getEditLog().logSync();
7259    logAuditEvent(true, operationName, src, null, auditStat);
7260  }
7261
7262  void setAcl(final String src, List<AclEntry> aclSpec) throws IOException {
7263    final String operationName = "setAcl";
7264    HdfsFileStatus auditStat = null;
7265    checkOperation(OperationCategory.WRITE);
7266    writeLock();
7267    try {
7268      checkOperation(OperationCategory.WRITE);
7269      checkNameNodeSafeMode("Cannot set ACL on " + src);
7270      auditStat = FSDirAclOp.setAcl(dir, src, aclSpec);
7271    } catch (AccessControlException e) {
7272      logAuditEvent(false, operationName, src);
7273      throw e;
7274    } finally {
7275      writeUnlock(operationName);
7276    }
7277    getEditLog().logSync();
7278    logAuditEvent(true, operationName, src, null, auditStat);
7279  }
7280
7281  AclStatus getAclStatus(String src) throws IOException {
7282    final String operationName = "getAclStatus";
7283    checkOperation(OperationCategory.READ);
7284    boolean success = false;
7285    readLock();
7286    try {
7287      checkOperation(OperationCategory.READ);
7288      final AclStatus ret = FSDirAclOp.getAclStatus(dir, src);
7289      success = true;
7290      return ret;
7291    } finally {
7292      readUnlock(operationName);
7293      logAuditEvent(success, operationName, src);
7294    }
7295  }
7296
7297  /**
7298   * Create an encryption zone on directory src using the specified key.
7299   *
7300   * @param src     the path of a directory which will be the root of the
7301   *                encryption zone. The directory must be empty.
7302   * @param keyName name of a key which must be present in the configured
7303   *                KeyProvider.
7304   * @throws AccessControlException  if the caller is not the superuser.
7305   * @throws UnresolvedLinkException if the path can't be resolved.
7306   * @throws SafeModeException       if the Namenode is in safe mode.
7307   */
7308  void createEncryptionZone(final String src, final String keyName,
7309      boolean logRetryCache) throws IOException, UnresolvedLinkException,
7310          SafeModeException, AccessControlException {
7311    final String operationName = "createEncryptionZone";
7312    try {
7313      Metadata metadata = FSDirEncryptionZoneOp.ensureKeyIsInitialized(dir,
7314          keyName, src);
7315      checkSuperuserPrivilege();
7316      FSPermissionChecker pc = getPermissionChecker();
7317      checkOperation(OperationCategory.WRITE);
7318      final HdfsFileStatus resultingStat;
7319      writeLock();
7320      try {
7321        checkSuperuserPrivilege();
7322        checkOperation(OperationCategory.WRITE);
7323        checkNameNodeSafeMode("Cannot create encryption zone on " + src);
7324        resultingStat = FSDirEncryptionZoneOp.createEncryptionZone(dir, src,
7325            pc, metadata.getCipher(), keyName, logRetryCache);
7326      } finally {
7327        writeUnlock(operationName);
7328      }
7329
7330      getEditLog().logSync();
7331      logAuditEvent(true, operationName, src, null, resultingStat);
7332    } catch (AccessControlException e) {
7333      logAuditEvent(false, operationName, src);
7334      throw e;
7335    }
7336  }
7337
7338  /**
7339   * Get the encryption zone for the specified path.
7340   *
7341   * @param srcArg the path of a file or directory to get the EZ for.
7342   * @return the EZ of the of the path or null if none.
7343   * @throws AccessControlException  if the caller is not the superuser.
7344   * @throws UnresolvedLinkException if the path can't be resolved.
7345   */
7346  EncryptionZone getEZForPath(final String srcArg)
7347    throws AccessControlException, UnresolvedLinkException, IOException {
7348    final String operationName = "getEZForPath";
7349    HdfsFileStatus resultingStat = null;
7350    boolean success = false;
7351    final FSPermissionChecker pc = getPermissionChecker();
7352    checkOperation(OperationCategory.READ);
7353    readLock();
7354    try {
7355      checkOperation(OperationCategory.READ);
7356      Entry<EncryptionZone, HdfsFileStatus> ezForPath = FSDirEncryptionZoneOp
7357          .getEZForPath(dir, srcArg, pc);
7358      success = true;
7359      resultingStat = ezForPath.getValue();
7360      return ezForPath.getKey();
7361    } finally {
7362      readUnlock(operationName);
7363      logAuditEvent(success, operationName, srcArg, null, resultingStat);
7364    }
7365  }
7366
7367  BatchedListEntries<EncryptionZone> listEncryptionZones(long prevId)
7368      throws IOException {
7369    final String operationName = "listEncryptionZones";
7370    boolean success = false;
7371    checkSuperuserPrivilege();
7372    checkOperation(OperationCategory.READ);
7373    readLock();
7374    try {
7375      checkSuperuserPrivilege();
7376      checkOperation(OperationCategory.READ);
7377      final BatchedListEntries<EncryptionZone> ret =
7378          FSDirEncryptionZoneOp.listEncryptionZones(dir, prevId);
7379      success = true;
7380      return ret;
7381    } finally {
7382      readUnlock(operationName);
7383      logAuditEvent(success, operationName, null);
7384    }
7385  }
7386
7387  void setXAttr(String src, XAttr xAttr, EnumSet<XAttrSetFlag> flag,
7388                boolean logRetryCache)
7389      throws IOException {
7390    final String operationName = "setXAttr";
7391    HdfsFileStatus auditStat = null;
7392    writeLock();
7393    try {
7394      checkOperation(OperationCategory.WRITE);
7395      checkNameNodeSafeMode("Cannot set XAttr on " + src);
7396      auditStat = FSDirXAttrOp.setXAttr(dir, src, xAttr, flag, logRetryCache);
7397    } catch (AccessControlException e) {
7398      logAuditEvent(false, operationName, src);
7399      throw e;
7400    } finally {
7401      writeUnlock(operationName);
7402    }
7403    getEditLog().logSync();
7404    logAuditEvent(true, operationName, src, null, auditStat);
7405  }
7406
7407  List<XAttr> getXAttrs(final String src, List<XAttr> xAttrs)
7408      throws IOException {
7409    final String operationName = "getXAttrs";
7410    checkOperation(OperationCategory.READ);
7411    readLock();
7412    try {
7413      checkOperation(OperationCategory.READ);
7414      return FSDirXAttrOp.getXAttrs(dir, src, xAttrs);
7415    } catch (AccessControlException e) {
7416      logAuditEvent(false, operationName, src);
7417      throw e;
7418    } finally {
7419      readUnlock(operationName);
7420    }
7421  }
7422
7423  List<XAttr> listXAttrs(String src) throws IOException {
7424    final String operationName = "listXAttrs";
7425    checkOperation(OperationCategory.READ);
7426    readLock();
7427    try {
7428      checkOperation(OperationCategory.READ);
7429      return FSDirXAttrOp.listXAttrs(dir, src);
7430    } catch (AccessControlException e) {
7431      logAuditEvent(false, operationName, src);
7432      throw e;
7433    } finally {
7434      readUnlock(operationName);
7435    }
7436  }
7437
7438  void removeXAttr(String src, XAttr xAttr, boolean logRetryCache)
7439      throws IOException {
7440    final String operationName = "removeXAttr";
7441    HdfsFileStatus auditStat = null;
7442    writeLock();
7443    try {
7444      checkOperation(OperationCategory.WRITE);
7445      checkNameNodeSafeMode("Cannot remove XAttr entry on " + src);
7446      auditStat = FSDirXAttrOp.removeXAttr(dir, src, xAttr, logRetryCache);
7447    } catch (AccessControlException e) {
7448      logAuditEvent(false, operationName, src);
7449      throw e;
7450    } finally {
7451      writeUnlock(operationName);
7452    }
7453    getEditLog().logSync();
7454    logAuditEvent(true, operationName, src, null, auditStat);
7455  }
7456
7457  void checkAccess(String src, FsAction mode) throws IOException {
7458    final String operationName = "checkAccess";
7459    checkOperation(OperationCategory.READ);
7460    FSPermissionChecker pc = getPermissionChecker();
7461    readLock();
7462    try {
7463      checkOperation(OperationCategory.READ);
7464      final INodesInPath iip = dir.resolvePath(pc, src, DirOp.READ);
7465      src = iip.getPath();
7466      INode inode = iip.getLastINode();
7467      if (inode == null) {
7468        throw new FileNotFoundException("Path not found");
7469      }
7470      if (isPermissionEnabled) {
7471        dir.checkPathAccess(pc, iip, mode);
7472      }
7473    } catch (AccessControlException e) {
7474      logAuditEvent(false, operationName, src);
7475      throw e;
7476    } finally {
7477      readUnlock(operationName);
7478    }
7479  }
7480
7481  /**
7482   * Default AuditLogger implementation; used when no access logger is
7483   * defined in the config file. It can also be explicitly listed in the
7484   * config file.
7485   */
7486  @VisibleForTesting
7487  static class DefaultAuditLogger extends HdfsAuditLogger {
7488    private static final ThreadLocal<StringBuilder> STRING_BUILDER =
7489        new ThreadLocal<StringBuilder>() {
7490          @Override
7491          protected StringBuilder initialValue() {
7492            return new StringBuilder();
7493          }
7494        };
7495
7496    private boolean isCallerContextEnabled;
7497    private int callerContextMaxLen;
7498    private int callerSignatureMaxLen;
7499
7500    private boolean logTokenTrackingId;
7501    private Set<String> debugCmdSet = new HashSet<String>();
7502
7503    @Override
7504    public void initialize(Configuration conf) {
7505      isCallerContextEnabled = conf.getBoolean(
7506          HADOOP_CALLER_CONTEXT_ENABLED_KEY,
7507          HADOOP_CALLER_CONTEXT_ENABLED_DEFAULT);
7508      callerContextMaxLen = conf.getInt(
7509          HADOOP_CALLER_CONTEXT_MAX_SIZE_KEY,
7510          HADOOP_CALLER_CONTEXT_MAX_SIZE_DEFAULT);
7511      callerSignatureMaxLen = conf.getInt(
7512          HADOOP_CALLER_CONTEXT_SIGNATURE_MAX_SIZE_KEY,
7513          HADOOP_CALLER_CONTEXT_SIGNATURE_MAX_SIZE_DEFAULT);
7514      logTokenTrackingId = conf.getBoolean(
7515          DFSConfigKeys.DFS_NAMENODE_AUDIT_LOG_TOKEN_TRACKING_ID_KEY,
7516          DFSConfigKeys.DFS_NAMENODE_AUDIT_LOG_TOKEN_TRACKING_ID_DEFAULT);
7517
7518      debugCmdSet.addAll(Arrays.asList(conf.getTrimmedStrings(
7519          DFSConfigKeys.DFS_NAMENODE_AUDIT_LOG_DEBUG_CMDLIST)));
7520    }
7521
7522    @Override
7523    public void logAuditEvent(boolean succeeded, String userName,
7524        InetAddress addr, String cmd, String src, String dst,
7525        FileStatus status, CallerContext callerContext, UserGroupInformation ugi,
7526        DelegationTokenSecretManager dtSecretManager) {
7527
7528      if (auditLog.isDebugEnabled() ||
7529          (auditLog.isInfoEnabled() && !debugCmdSet.contains(cmd))) {
7530        final StringBuilder sb = STRING_BUILDER.get();
7531        src = escapeJava(src);
7532        dst = escapeJava(dst);
7533        sb.setLength(0);
7534        sb.append("allowed=").append(succeeded).append("\t");
7535        sb.append("ugi=").append(userName).append("\t");
7536        sb.append("ip=").append(addr).append("\t");
7537        sb.append("cmd=").append(cmd).append("\t");
7538        sb.append("src=").append(src).append("\t");
7539        sb.append("dst=").append(dst).append("\t");
7540        if (null == status) {
7541          sb.append("perm=null");
7542        } else {
7543          sb.append("perm=");
7544          sb.append(status.getOwner()).append(":");
7545          sb.append(status.getGroup()).append(":");
7546          sb.append(status.getPermission());
7547        }
7548        if (logTokenTrackingId) {
7549          sb.append("\t").append("trackingId=");
7550          String trackingId = null;
7551          if (ugi != null && dtSecretManager != null
7552              && ugi.getAuthenticationMethod() == AuthenticationMethod.TOKEN) {
7553            for (TokenIdentifier tid: ugi.getTokenIdentifiers()) {
7554              if (tid instanceof DelegationTokenIdentifier) {
7555                DelegationTokenIdentifier dtid =
7556                    (DelegationTokenIdentifier)tid;
7557                trackingId = dtSecretManager.getTokenTrackingId(dtid);
7558                break;
7559              }
7560            }
7561          }
7562          sb.append(trackingId);
7563        }
7564        sb.append("\t").append("proto=");
7565        sb.append(Server.getProtocol());
7566        if (isCallerContextEnabled &&
7567            callerContext != null &&
7568            callerContext.isContextValid()) {
7569          sb.append("\t").append("callerContext=");
7570          if (callerContext.getContext().length() > callerContextMaxLen) {
7571            sb.append(callerContext.getContext().substring(0,
7572                callerContextMaxLen));
7573          } else {
7574            sb.append(callerContext.getContext());
7575          }
7576          if (callerContext.getSignature() != null &&
7577              callerContext.getSignature().length > 0 &&
7578              callerContext.getSignature().length <= callerSignatureMaxLen) {
7579            sb.append(":");
7580            sb.append(new String(callerContext.getSignature(),
7581                CallerContext.SIGNATURE_ENCODING));
7582          }
7583        }
7584        logAuditMessage(sb.toString());
7585      }
7586    }
7587
7588    @Override
7589    public void logAuditEvent(boolean succeeded, String userName,
7590        InetAddress addr, String cmd, String src, String dst,
7591        FileStatus status, UserGroupInformation ugi,
7592        DelegationTokenSecretManager dtSecretManager) {
7593      this.logAuditEvent(succeeded, userName, addr, cmd, src, dst, status,
7594              null /*CallerContext*/, ugi, dtSecretManager);
7595    }
7596
7597    public void logAuditMessage(String message) {
7598      auditLog.info(message);
7599    }
7600  }
7601
7602  private static void enableAsyncAuditLog() {
7603    if (!(auditLog instanceof Log4JLogger)) {
7604      LOG.warn("Log4j is required to enable async auditlog");
7605      return;
7606    }
7607    Logger logger = ((Log4JLogger)auditLog).getLogger();
7608    @SuppressWarnings("unchecked")
7609    List<Appender> appenders = Collections.list(logger.getAllAppenders());
7610    // failsafe against trying to async it more than once
7611    if (!appenders.isEmpty() && !(appenders.get(0) instanceof AsyncAppender)) {
7612      AsyncAppender asyncAppender = new AsyncAppender();
7613      // change logger to have an async appender containing all the
7614      // previously configured appenders
7615      for (Appender appender : appenders) {
7616        logger.removeAppender(appender);
7617        asyncAppender.addAppender(appender);
7618      }
7619      logger.addAppender(asyncAppender);        
7620    }
7621  }
7622
7623  /**
7624   * Return total number of Sync Operations on FSEditLog.
7625   */
7626  @Override
7627  @Metric({"TotalSyncCount",
7628              "Total number of sync operations performed on edit logs"})
7629  public long getTotalSyncCount() {
7630    return fsImage.editLog.getTotalSyncCount();
7631  }
7632
7633  /**
7634   * Return total time spent doing sync operations on FSEditLog.
7635   */
7636  @Override
7637  @Metric({"TotalSyncTimes",
7638              "Total time spend in sync operation on various edit logs"})
7639  public String getTotalSyncTimes() {
7640    JournalSet journalSet = fsImage.editLog.getJournalSet();
7641    if (journalSet != null) {
7642      return journalSet.getSyncTimes();
7643    } else {
7644      return "";
7645    }
7646  }
7647
7648  /**
7649   * Gets number of bytes in the blocks in future generation stamps.
7650   *
7651   * @return number of bytes that can be deleted if exited from safe mode.
7652   */
7653  public long getBytesInFuture() {
7654    return blockManager.getBytesInFuture();
7655  }
7656
7657  @VisibleForTesting
7658  synchronized void enableSafeModeForTesting(Configuration conf) {
7659    SafeModeInfo newSafemode = new SafeModeInfo(conf);
7660    newSafemode.enter();
7661    this.safeMode = newSafemode;
7662  }
7663
7664}
7665