001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hdfs.server.namenode;
019
020import com.google.common.annotations.VisibleForTesting;
021import com.google.common.base.Joiner;
022import com.google.common.base.Preconditions;
023import com.google.common.collect.Lists;
024import org.apache.commons.logging.Log;
025import org.apache.commons.logging.LogFactory;
026import org.apache.hadoop.HadoopIllegalArgumentException;
027import org.apache.hadoop.classification.InterfaceAudience;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.fs.FileSystem;
030import org.apache.hadoop.fs.Trash;
031import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
032import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo;
033import org.apache.hadoop.ha.HAServiceStatus;
034import org.apache.hadoop.ha.HealthCheckFailedException;
035import org.apache.hadoop.ha.ServiceFailedException;
036import org.apache.hadoop.hdfs.DFSConfigKeys;
037import org.apache.hadoop.hdfs.DFSUtil;
038import org.apache.hadoop.hdfs.DFSUtilClient;
039import org.apache.hadoop.hdfs.HAUtil;
040import org.apache.hadoop.hdfs.HdfsConfiguration;
041import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
042import org.apache.hadoop.hdfs.protocol.ClientProtocol;
043import org.apache.hadoop.hdfs.protocol.HdfsConstants;
044import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
045import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.RollingUpgradeStartupOption;
046import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
047import org.apache.hadoop.hdfs.server.common.MetricsLoggerTask;
048import org.apache.hadoop.hdfs.server.namenode.ha.ActiveState;
049import org.apache.hadoop.hdfs.server.namenode.ha.BootstrapStandby;
050import org.apache.hadoop.hdfs.server.namenode.ha.HAContext;
051import org.apache.hadoop.hdfs.server.namenode.ha.HAState;
052import org.apache.hadoop.hdfs.server.namenode.ha.StandbyState;
053import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
054import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress;
055import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgressMetrics;
056import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
057import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
058import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
059import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
060import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
061import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
062import org.apache.hadoop.ipc.ExternalCall;
063import org.apache.hadoop.ipc.RefreshCallQueueProtocol;
064import org.apache.hadoop.ipc.RetriableException;
065import org.apache.hadoop.ipc.Server;
066import org.apache.hadoop.ipc.StandbyException;
067import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
068import org.apache.hadoop.metrics2.util.MBeans;
069import org.apache.hadoop.net.NetUtils;
070import org.apache.hadoop.security.AccessControlException;
071import org.apache.hadoop.security.RefreshUserMappingsProtocol;
072import org.apache.hadoop.security.SecurityUtil;
073import org.apache.hadoop.security.UserGroupInformation;
074import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol;
075import org.apache.hadoop.tools.GetUserMappingsProtocol;
076import org.apache.hadoop.tracing.TraceAdminProtocol;
077import org.apache.hadoop.tracing.TraceUtils;
078import org.apache.hadoop.tracing.TracerConfigurationManager;
079import org.apache.hadoop.util.ExitUtil.ExitException;
080import org.apache.hadoop.util.GenericOptionsParser;
081import org.apache.hadoop.util.JvmPauseMonitor;
082import org.apache.hadoop.util.ServicePlugin;
083import org.apache.hadoop.util.StringUtils;
084import org.apache.htrace.core.Tracer;
085import org.slf4j.Logger;
086import org.slf4j.LoggerFactory;
087
088import javax.management.ObjectName;
089
090import java.io.IOException;
091import java.io.PrintStream;
092import java.net.InetSocketAddress;
093import java.net.URI;
094import java.security.PrivilegedExceptionAction;
095import java.util.ArrayList;
096import java.util.Arrays;
097import java.util.Collection;
098import java.util.List;
099import java.util.concurrent.ScheduledThreadPoolExecutor;
100import java.util.concurrent.TimeUnit;
101import java.util.concurrent.atomic.AtomicBoolean;
102
103import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY;
104import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_DEFAULT;
105import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_KEY;
106import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_NAMENODE_RPC_PORT_DEFAULT;
107import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_AUTO_FAILOVER_ENABLED_DEFAULT;
108import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_AUTO_FAILOVER_ENABLED_KEY;
109import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_FENCE_METHODS_KEY;
110import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODE_ID_KEY;
111import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_ZKFC_PORT_KEY;
112import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_METRICS_PERCENTILES_INTERVALS_KEY;
113import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BACKUP_ADDRESS_KEY;
114import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BACKUP_HTTP_ADDRESS_KEY;
115import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BACKUP_SERVICE_RPC_ADDRESS_KEY;
116import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_DIR_KEY;
117import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_EDITS_DIR_KEY;
118import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY;
119import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY;
120import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTPS_BIND_HOST_KEY;
121import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_DEFAULT;
122import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY;
123import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTP_BIND_HOST_KEY;
124import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KERBEROS_INTERNAL_SPNEGO_PRINCIPAL_KEY;
125import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY;
126import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY;
127import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIFELINE_RPC_ADDRESS_KEY;
128import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIFELINE_RPC_BIND_HOST_KEY;
129import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_METRICS_LOGGER_PERIOD_SECONDS_DEFAULT;
130import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_METRICS_LOGGER_PERIOD_SECONDS_KEY;
131import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY;
132import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_PLUGINS_KEY;
133import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
134import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_BIND_HOST_KEY;
135import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SECONDARY_HTTPS_ADDRESS_KEY;
136import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_KEY;
137import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY;
138import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_BIND_HOST_KEY;
139import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY;
140import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_STARTUP_KEY;
141import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SUPPORT_ALLOW_FORMAT_DEFAULT;
142import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SUPPORT_ALLOW_FORMAT_KEY;
143import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICE_ID;
144import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SECONDARY_NAMENODE_KEYTAB_FILE_KEY;
145import static org.apache.hadoop.hdfs.DFSConfigKeys.HADOOP_USER_GROUP_METRICS_PERCENTILES_INTERVALS;
146import static org.apache.hadoop.util.ExitUtil.terminate;
147import static org.apache.hadoop.util.ToolRunner.confirmPrompt;
148
149/**********************************************************
150 * NameNode serves as both directory namespace manager and
151 * "inode table" for the Hadoop DFS.  There is a single NameNode
152 * running in any DFS deployment.  (Well, except when there
153 * is a second backup/failover NameNode, or when using federated NameNodes.)
154 *
155 * The NameNode controls two critical tables:
156 *   1)  filename->blocksequence (namespace)
157 *   2)  block->machinelist ("inodes")
158 *
159 * The first table is stored on disk and is very precious.
160 * The second table is rebuilt every time the NameNode comes up.
161 *
162 * 'NameNode' refers to both this class as well as the 'NameNode server'.
163 * The 'FSNamesystem' class actually performs most of the filesystem
164 * management.  The majority of the 'NameNode' class itself is concerned
165 * with exposing the IPC interface and the HTTP server to the outside world,
166 * plus some configuration management.
167 *
168 * NameNode implements the
169 * {@link org.apache.hadoop.hdfs.protocol.ClientProtocol} interface, which
170 * allows clients to ask for DFS services.
171 * {@link org.apache.hadoop.hdfs.protocol.ClientProtocol} is not designed for
172 * direct use by authors of DFS client code.  End-users should instead use the
173 * {@link org.apache.hadoop.fs.FileSystem} class.
174 *
175 * NameNode also implements the
176 * {@link org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol} interface,
177 * used by DataNodes that actually store DFS data blocks.  These
178 * methods are invoked repeatedly and automatically by all the
179 * DataNodes in a DFS deployment.
180 *
181 * NameNode also implements the
182 * {@link org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol} interface,
183 * used by secondary namenodes or rebalancing processes to get partial
184 * NameNode state, for example partial blocksMap etc.
185 **********************************************************/
186@InterfaceAudience.Private
187public class NameNode implements NameNodeStatusMXBean {
188  static{
189    HdfsConfiguration.init();
190  }
191
192  /**
193   * Categories of operations supported by the namenode.
194   */
195  public static enum OperationCategory {
196    /** Operations that are state agnostic */
197    UNCHECKED,
198    /** Read operation that does not change the namespace state */
199    READ,
200    /** Write operation that changes the namespace state */
201    WRITE,
202    /** Operations related to checkpointing */
203    CHECKPOINT,
204    /** Operations related to {@link JournalProtocol} */
205    JOURNAL
206  }
207  
208  /**
209   * HDFS configuration can have three types of parameters:
210   * <ol>
211   * <li>Parameters that are common for all the name services in the cluster.</li>
212   * <li>Parameters that are specific to a name service. These keys are suffixed
213   * with nameserviceId in the configuration. For example,
214   * "dfs.namenode.rpc-address.nameservice1".</li>
215   * <li>Parameters that are specific to a single name node. These keys are suffixed
216   * with nameserviceId and namenodeId in the configuration. for example,
217   * "dfs.namenode.rpc-address.nameservice1.namenode1"</li>
218   * </ol>
219   * 
220   * In the latter cases, operators may specify the configuration without
221   * any suffix, with a nameservice suffix, or with a nameservice and namenode
222   * suffix. The more specific suffix will take precedence.
223   * 
224   * These keys are specific to a given namenode, and thus may be configured
225   * globally, for a nameservice, or for a specific namenode within a nameservice.
226   */
227  public static final String[] NAMENODE_SPECIFIC_KEYS = {
228    DFS_NAMENODE_RPC_ADDRESS_KEY,
229    DFS_NAMENODE_RPC_BIND_HOST_KEY,
230    DFS_NAMENODE_NAME_DIR_KEY,
231    DFS_NAMENODE_EDITS_DIR_KEY,
232    DFS_NAMENODE_SHARED_EDITS_DIR_KEY,
233    DFS_NAMENODE_CHECKPOINT_DIR_KEY,
234    DFS_NAMENODE_CHECKPOINT_EDITS_DIR_KEY,
235    DFS_NAMENODE_LIFELINE_RPC_ADDRESS_KEY,
236    DFS_NAMENODE_LIFELINE_RPC_BIND_HOST_KEY,
237    DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY,
238    DFS_NAMENODE_SERVICE_RPC_BIND_HOST_KEY,
239    DFS_NAMENODE_HTTP_ADDRESS_KEY,
240    DFS_NAMENODE_HTTPS_ADDRESS_KEY,
241    DFS_NAMENODE_HTTP_BIND_HOST_KEY,
242    DFS_NAMENODE_HTTPS_BIND_HOST_KEY,
243    DFS_NAMENODE_KEYTAB_FILE_KEY,
244    DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_KEY,
245    DFS_NAMENODE_SECONDARY_HTTPS_ADDRESS_KEY,
246    DFS_SECONDARY_NAMENODE_KEYTAB_FILE_KEY,
247    DFS_NAMENODE_BACKUP_ADDRESS_KEY,
248    DFS_NAMENODE_BACKUP_HTTP_ADDRESS_KEY,
249    DFS_NAMENODE_BACKUP_SERVICE_RPC_ADDRESS_KEY,
250    DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY,
251    DFS_NAMENODE_KERBEROS_INTERNAL_SPNEGO_PRINCIPAL_KEY,
252    DFS_HA_FENCE_METHODS_KEY,
253    DFS_HA_ZKFC_PORT_KEY,
254    DFS_HA_FENCE_METHODS_KEY
255  };
256  
257  /**
258   * @see #NAMENODE_SPECIFIC_KEYS
259   * These keys are specific to a nameservice, but may not be overridden
260   * for a specific namenode.
261   */
262  public static final String[] NAMESERVICE_SPECIFIC_KEYS = {
263    DFS_HA_AUTO_FAILOVER_ENABLED_KEY
264  };
265  
266  private static final String USAGE = "Usage: hdfs namenode ["
267      + StartupOption.BACKUP.getName() + "] | \n\t["
268      + StartupOption.CHECKPOINT.getName() + "] | \n\t["
269      + StartupOption.FORMAT.getName() + " ["
270      + StartupOption.CLUSTERID.getName() + " cid ] ["
271      + StartupOption.FORCE.getName() + "] ["
272      + StartupOption.NONINTERACTIVE.getName() + "] ] | \n\t["
273      + StartupOption.UPGRADE.getName() + 
274        " [" + StartupOption.CLUSTERID.getName() + " cid]" +
275        " [" + StartupOption.RENAMERESERVED.getName() + "<k-v pairs>] ] | \n\t["
276      + StartupOption.UPGRADEONLY.getName() + 
277        " [" + StartupOption.CLUSTERID.getName() + " cid]" +
278        " [" + StartupOption.RENAMERESERVED.getName() + "<k-v pairs>] ] | \n\t["
279      + StartupOption.ROLLBACK.getName() + "] | \n\t["
280      + StartupOption.ROLLINGUPGRADE.getName() + " "
281      + RollingUpgradeStartupOption.getAllOptionString() + " ] | \n\t["
282      + StartupOption.FINALIZE.getName() + "] | \n\t["
283      + StartupOption.IMPORT.getName() + "] | \n\t["
284      + StartupOption.INITIALIZESHAREDEDITS.getName() + "] | \n\t["
285      + StartupOption.BOOTSTRAPSTANDBY.getName() + " ["
286      + StartupOption.FORCE.getName() + "] ["
287      + StartupOption.NONINTERACTIVE.getName() + "] ["
288      + StartupOption.SKIPSHAREDEDITSCHECK.getName() + "] ] | \n\t["
289      + StartupOption.RECOVER.getName() + " [ "
290      + StartupOption.FORCE.getName() + "] ] | \n\t["
291      + StartupOption.METADATAVERSION.getName() + " ]";
292
293  
294  public long getProtocolVersion(String protocol, 
295                                 long clientVersion) throws IOException {
296    if (protocol.equals(ClientProtocol.class.getName())) {
297      return ClientProtocol.versionID; 
298    } else if (protocol.equals(DatanodeProtocol.class.getName())){
299      return DatanodeProtocol.versionID;
300    } else if (protocol.equals(NamenodeProtocol.class.getName())){
301      return NamenodeProtocol.versionID;
302    } else if (protocol.equals(RefreshAuthorizationPolicyProtocol.class.getName())){
303      return RefreshAuthorizationPolicyProtocol.versionID;
304    } else if (protocol.equals(RefreshUserMappingsProtocol.class.getName())){
305      return RefreshUserMappingsProtocol.versionID;
306    } else if (protocol.equals(RefreshCallQueueProtocol.class.getName())) {
307      return RefreshCallQueueProtocol.versionID;
308    } else if (protocol.equals(GetUserMappingsProtocol.class.getName())){
309      return GetUserMappingsProtocol.versionID;
310    } else if (protocol.equals(TraceAdminProtocol.class.getName())){
311      return TraceAdminProtocol.versionID;
312    } else {
313      throw new IOException("Unknown protocol to name node: " + protocol);
314    }
315  }
316    
317  /**
318   * @deprecated Use {@link HdfsClientConfigKeys#DFS_NAMENODE_RPC_PORT_DEFAULT}
319   *             instead.
320   */
321  @Deprecated
322  public static final int DEFAULT_PORT = DFS_NAMENODE_RPC_PORT_DEFAULT;
323  public static final Logger LOG =
324      LoggerFactory.getLogger(NameNode.class.getName());
325  public static final Logger stateChangeLog =
326      LoggerFactory.getLogger("org.apache.hadoop.hdfs.StateChange");
327  public static final Logger blockStateChangeLog =
328      LoggerFactory.getLogger("BlockStateChange");
329  public static final HAState ACTIVE_STATE = new ActiveState();
330  public static final HAState STANDBY_STATE = new StandbyState();
331
332  private static final String NAMENODE_HTRACE_PREFIX = "namenode.htrace.";
333
334  public static final Log MetricsLog =
335      LogFactory.getLog("NameNodeMetricsLog");
336
337  protected FSNamesystem namesystem; 
338  protected final Configuration conf;
339  protected final NamenodeRole role;
340  private volatile HAState state;
341  private final boolean haEnabled;
342  private final HAContext haContext;
343  protected final boolean allowStaleStandbyReads;
344  private AtomicBoolean started = new AtomicBoolean(false); 
345
346  
347  /** httpServer */
348  protected NameNodeHttpServer httpServer;
349  private Thread emptier;
350  /** only used for testing purposes  */
351  protected boolean stopRequested = false;
352  /** Registration information of this name-node  */
353  protected NamenodeRegistration nodeRegistration;
354  /** Activated plug-ins. */
355  private List<ServicePlugin> plugins;
356  
357  private NameNodeRpcServer rpcServer;
358
359  private JvmPauseMonitor pauseMonitor;
360  private ObjectName nameNodeStatusBeanName;
361  protected final Tracer tracer;
362  protected final TracerConfigurationManager tracerConfigurationManager;
363  ScheduledThreadPoolExecutor metricsLoggerTimer;
364
365  /**
366   * The namenode address that clients will use to access this namenode
367   * or the name service. For HA configurations using logical URI, it
368   * will be the logical address.
369   */
370  private String clientNamenodeAddress;
371  
372  /** Format a new filesystem.  Destroys any filesystem that may already
373   * exist at this location.  **/
374  public static void format(Configuration conf) throws IOException {
375    format(conf, true, true);
376  }
377
378  static NameNodeMetrics metrics;
379  private static final StartupProgress startupProgress = new StartupProgress();
380  /** Return the {@link FSNamesystem} object.
381   * @return {@link FSNamesystem} object.
382   */
383  public FSNamesystem getNamesystem() {
384    return namesystem;
385  }
386
387  public NamenodeProtocols getRpcServer() {
388    return rpcServer;
389  }
390
391  public void queueExternalCall(ExternalCall<?> extCall)
392      throws IOException, InterruptedException {
393    if (rpcServer == null) {
394      throw new RetriableException("Namenode is in startup mode");
395    }
396    rpcServer.getClientRpcServer().queueCall(extCall);
397  }
398
399  static void initMetrics(Configuration conf, NamenodeRole role) {
400    metrics = NameNodeMetrics.create(conf, role);
401  }
402
403  public static NameNodeMetrics getNameNodeMetrics() {
404    return metrics;
405  }
406
407  /**
408   * Returns object used for reporting namenode startup progress.
409   * 
410   * @return StartupProgress for reporting namenode startup progress
411   */
412  public static StartupProgress getStartupProgress() {
413    return startupProgress;
414  }
415
416  /**
417   * Return the service name of the issued delegation token.
418   *
419   * @return The name service id in HA-mode, or the rpc address in non-HA mode
420   */
421  public String getTokenServiceName() {
422    return getClientNamenodeAddress();
423  }
424
425  /**
426   * Set the namenode address that will be used by clients to access this
427   * namenode or name service. This needs to be called before the config
428   * is overriden.
429   */
430  public void setClientNamenodeAddress(Configuration conf) {
431    String nnAddr = conf.get(FS_DEFAULT_NAME_KEY);
432    if (nnAddr == null) {
433      // default fs is not set.
434      clientNamenodeAddress = null;
435      return;
436    }
437
438    LOG.info("{} is {}", FS_DEFAULT_NAME_KEY, nnAddr);
439    URI nnUri = URI.create(nnAddr);
440
441    String nnHost = nnUri.getHost();
442    if (nnHost == null) {
443      clientNamenodeAddress = null;
444      return;
445    }
446
447    if (DFSUtilClient.getNameServiceIds(conf).contains(nnHost)) {
448      // host name is logical
449      clientNamenodeAddress = nnHost;
450    } else if (nnUri.getPort() > 0) {
451      // physical address with a valid port
452      clientNamenodeAddress = nnUri.getAuthority();
453    } else {
454      // the port is missing or 0. Figure out real bind address later.
455      clientNamenodeAddress = null;
456      return;
457    }
458    LOG.info("Clients are to use {} to access"
459        + " this namenode/service.", clientNamenodeAddress );
460  }
461
462  /**
463   * Get the namenode address to be used by clients.
464   * @return nn address
465   */
466  public String getClientNamenodeAddress() {
467    return clientNamenodeAddress;
468  }
469
470  /**
471   * Set the configuration property for the service rpc address
472   * to address
473   */
474  public static void setServiceAddress(Configuration conf,
475                                           String address) {
476    LOG.info("Setting ADDRESS {}", address);
477    conf.set(DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, address);
478  }
479  
480  /**
481   * Fetches the address for services to use when connecting to namenode
482   * based on the value of fallback returns null if the special
483   * address is not specified or returns the default namenode address
484   * to be used by both clients and services.
485   * Services here are datanodes, backup node, any non client connection
486   */
487  public static InetSocketAddress getServiceAddress(Configuration conf,
488                                                        boolean fallback) {
489    String addr = conf.getTrimmed(DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY);
490    if (addr == null || addr.isEmpty()) {
491      return fallback ? DFSUtilClient.getNNAddress(conf) : null;
492    }
493    return DFSUtilClient.getNNAddress(addr);
494  }
495
496  //
497  // Common NameNode methods implementation for the active name-node role.
498  //
499  public NamenodeRole getRole() {
500    return role;
501  }
502
503  boolean isRole(NamenodeRole that) {
504    return role.equals(that);
505  }
506
507  /**
508   * Given a configuration get the address of the lifeline RPC server.
509   * If the lifeline RPC is not configured returns null.
510   *
511   * @param conf configuration
512   * @return address or null
513   */
514  InetSocketAddress getLifelineRpcServerAddress(Configuration conf) {
515    String addr = getTrimmedOrNull(conf, DFS_NAMENODE_LIFELINE_RPC_ADDRESS_KEY);
516    if (addr == null) {
517      return null;
518    }
519    return NetUtils.createSocketAddr(addr);
520  }
521
522  /**
523   * Given a configuration get the address of the service rpc server
524   * If the service rpc is not configured returns null
525   */
526  protected InetSocketAddress getServiceRpcServerAddress(Configuration conf) {
527    return NameNode.getServiceAddress(conf, false);
528  }
529
530  protected InetSocketAddress getRpcServerAddress(Configuration conf) {
531    return DFSUtilClient.getNNAddress(conf);
532  }
533
534  /**
535   * Given a configuration get the bind host of the lifeline RPC server.
536   * If the bind host is not configured returns null.
537   *
538   * @param conf configuration
539   * @return bind host or null
540   */
541  String getLifelineRpcServerBindHost(Configuration conf) {
542    return getTrimmedOrNull(conf, DFS_NAMENODE_LIFELINE_RPC_BIND_HOST_KEY);
543  }
544
545  /** Given a configuration get the bind host of the service rpc server
546   *  If the bind host is not configured returns null.
547   */
548  protected String getServiceRpcServerBindHost(Configuration conf) {
549    return getTrimmedOrNull(conf, DFS_NAMENODE_SERVICE_RPC_BIND_HOST_KEY);
550  }
551
552  /** Given a configuration get the bind host of the client rpc server
553   *  If the bind host is not configured returns null.
554   */
555  protected String getRpcServerBindHost(Configuration conf) {
556    return getTrimmedOrNull(conf, DFS_NAMENODE_RPC_BIND_HOST_KEY);
557  }
558
559  /**
560   * Gets a trimmed value from configuration, or null if no value is defined.
561   *
562   * @param conf configuration
563   * @param key configuration key to get
564   * @return trimmed value, or null if no value is defined
565   */
566  private static String getTrimmedOrNull(Configuration conf, String key) {
567    String addr = conf.getTrimmed(key);
568    if (addr == null || addr.isEmpty()) {
569      return null;
570    }
571    return addr;
572  }
573   
574  /**
575   * Modifies the configuration to contain the lifeline RPC address setting.
576   *
577   * @param conf configuration to modify
578   * @param lifelineRPCAddress lifeline RPC address
579   */
580  void setRpcLifelineServerAddress(Configuration conf,
581      InetSocketAddress lifelineRPCAddress) {
582    LOG.info("Setting lifeline RPC address {}", lifelineRPCAddress);
583    conf.set(DFS_NAMENODE_LIFELINE_RPC_ADDRESS_KEY,
584        NetUtils.getHostPortString(lifelineRPCAddress));
585  }
586
587  /**
588   * Modifies the configuration passed to contain the service rpc address setting
589   */
590  protected void setRpcServiceServerAddress(Configuration conf,
591      InetSocketAddress serviceRPCAddress) {
592    setServiceAddress(conf, NetUtils.getHostPortString(serviceRPCAddress));
593  }
594
595  protected void setRpcServerAddress(Configuration conf,
596      InetSocketAddress rpcAddress) {
597    FileSystem.setDefaultUri(conf, DFSUtilClient.getNNUri(rpcAddress));
598  }
599
600  protected InetSocketAddress getHttpServerAddress(Configuration conf) {
601    return getHttpAddress(conf);
602  }
603
604  /**
605   * HTTP server address for binding the endpoint. This method is
606   * for use by the NameNode and its derivatives. It may return
607   * a different address than the one that should be used by clients to
608   * connect to the NameNode. See
609   * {@link DFSConfigKeys#DFS_NAMENODE_HTTP_BIND_HOST_KEY}
610   *
611   * @param conf
612   * @return
613   */
614  protected InetSocketAddress getHttpServerBindAddress(Configuration conf) {
615    InetSocketAddress bindAddress = getHttpServerAddress(conf);
616
617    // If DFS_NAMENODE_HTTP_BIND_HOST_KEY exists then it overrides the
618    // host name portion of DFS_NAMENODE_HTTP_ADDRESS_KEY.
619    final String bindHost = conf.getTrimmed(DFS_NAMENODE_HTTP_BIND_HOST_KEY);
620    if (bindHost != null && !bindHost.isEmpty()) {
621      bindAddress = new InetSocketAddress(bindHost, bindAddress.getPort());
622    }
623
624    return bindAddress;
625  }
626
627  /** @return the NameNode HTTP address. */
628  public static InetSocketAddress getHttpAddress(Configuration conf) {
629    return  NetUtils.createSocketAddr(
630        conf.getTrimmed(DFS_NAMENODE_HTTP_ADDRESS_KEY, DFS_NAMENODE_HTTP_ADDRESS_DEFAULT));
631  }
632
633  protected void loadNamesystem(Configuration conf) throws IOException {
634    this.namesystem = FSNamesystem.loadFromDisk(conf);
635  }
636
637  NamenodeRegistration getRegistration() {
638    return nodeRegistration;
639  }
640
641  NamenodeRegistration setRegistration() {
642    nodeRegistration = new NamenodeRegistration(
643        NetUtils.getHostPortString(getNameNodeAddress()),
644        NetUtils.getHostPortString(getHttpAddress()),
645        getFSImage().getStorage(), getRole());
646    return nodeRegistration;
647  }
648
649  /* optimize ugi lookup for RPC operations to avoid a trip through
650   * UGI.getCurrentUser which is synch'ed
651   */
652  public static UserGroupInformation getRemoteUser() throws IOException {
653    UserGroupInformation ugi = Server.getRemoteUser();
654    return (ugi != null) ? ugi : UserGroupInformation.getCurrentUser();
655  }
656
657
658  /**
659   * Login as the configured user for the NameNode.
660   */
661  void loginAsNameNodeUser(Configuration conf) throws IOException {
662    InetSocketAddress socAddr = getRpcServerAddress(conf);
663    SecurityUtil.login(conf, DFS_NAMENODE_KEYTAB_FILE_KEY,
664        DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY, socAddr.getHostName());
665  }
666  
667  /**
668   * Initialize name-node.
669   * 
670   * @param conf the configuration
671   */
672  protected void initialize(Configuration conf) throws IOException {
673    if (conf.get(HADOOP_USER_GROUP_METRICS_PERCENTILES_INTERVALS) == null) {
674      String intervals = conf.get(DFS_METRICS_PERCENTILES_INTERVALS_KEY);
675      if (intervals != null) {
676        conf.set(HADOOP_USER_GROUP_METRICS_PERCENTILES_INTERVALS,
677          intervals);
678      }
679    }
680
681    UserGroupInformation.setConfiguration(conf);
682    loginAsNameNodeUser(conf);
683
684    NameNode.initMetrics(conf, this.getRole());
685    StartupProgressMetrics.register(startupProgress);
686
687    pauseMonitor = new JvmPauseMonitor(conf);
688    pauseMonitor.start();
689    metrics.getJvmMetrics().setPauseMonitor(pauseMonitor);
690
691    if (NamenodeRole.NAMENODE == role) {
692      startHttpServer(conf);
693    }
694
695    loadNamesystem(conf);
696
697    rpcServer = createRpcServer(conf);
698    if (clientNamenodeAddress == null) {
699      // This is expected for MiniDFSCluster. Set it now using 
700      // the RPC server's bind address.
701      clientNamenodeAddress = 
702          NetUtils.getHostPortString(getNameNodeAddress());
703      LOG.info("Clients are to use " + clientNamenodeAddress + " to access"
704          + " this namenode/service.");
705    }
706    if (NamenodeRole.NAMENODE == role) {
707      httpServer.setNameNodeAddress(getNameNodeAddress());
708      httpServer.setFSImage(getFSImage());
709    }
710
711    startCommonServices(conf);
712    startMetricsLogger(conf);
713  }
714
715  /**
716   * Start a timer to periodically write NameNode metrics to the log
717   * file. This behavior can be disabled by configuration.
718   * @param conf
719   */
720  protected void startMetricsLogger(Configuration conf) {
721    long metricsLoggerPeriodSec =
722        conf.getInt(DFS_NAMENODE_METRICS_LOGGER_PERIOD_SECONDS_KEY,
723            DFS_NAMENODE_METRICS_LOGGER_PERIOD_SECONDS_DEFAULT);
724
725    if (metricsLoggerPeriodSec <= 0) {
726      return;
727    }
728
729    MetricsLoggerTask.makeMetricsLoggerAsync(MetricsLog);
730
731    // Schedule the periodic logging.
732    metricsLoggerTimer = new ScheduledThreadPoolExecutor(1);
733    metricsLoggerTimer.setExecuteExistingDelayedTasksAfterShutdownPolicy(
734        false);
735    metricsLoggerTimer.scheduleWithFixedDelay(new MetricsLoggerTask(MetricsLog,
736        "NameNode", (short) 128),
737        metricsLoggerPeriodSec,
738        metricsLoggerPeriodSec,
739        TimeUnit.SECONDS);
740  }
741
742  protected void stopMetricsLogger() {
743    if (metricsLoggerTimer != null) {
744      metricsLoggerTimer.shutdown();
745      metricsLoggerTimer = null;
746    }
747  }
748  
749  /**
750   * Create the RPC server implementation. Used as an extension point for the
751   * BackupNode.
752   */
753  protected NameNodeRpcServer createRpcServer(Configuration conf)
754      throws IOException {
755    return new NameNodeRpcServer(conf, this);
756  }
757
758  /** Start the services common to active and standby states */
759  private void startCommonServices(Configuration conf) throws IOException {
760    namesystem.startCommonServices(conf, haContext);
761    registerNNSMXBean();
762    if (NamenodeRole.NAMENODE != role) {
763      startHttpServer(conf);
764      httpServer.setNameNodeAddress(getNameNodeAddress());
765      httpServer.setFSImage(getFSImage());
766    }
767    rpcServer.start();
768    plugins = conf.getInstances(DFS_NAMENODE_PLUGINS_KEY,
769        ServicePlugin.class);
770    for (ServicePlugin p: plugins) {
771      try {
772        p.start(this);
773      } catch (Throwable t) {
774        LOG.warn("ServicePlugin " + p + " could not be started", t);
775      }
776    }
777    LOG.info(getRole() + " RPC up at: " + getNameNodeAddress());
778    if (rpcServer.getServiceRpcAddress() != null) {
779      LOG.info(getRole() + " service RPC up at: "
780          + rpcServer.getServiceRpcAddress());
781    }
782  }
783  
784  private void stopCommonServices() {
785    if(rpcServer != null) rpcServer.stop();
786    if(namesystem != null) namesystem.close();
787    if (pauseMonitor != null) pauseMonitor.stop();
788    if (plugins != null) {
789      for (ServicePlugin p : plugins) {
790        try {
791          p.stop();
792        } catch (Throwable t) {
793          LOG.warn("ServicePlugin " + p + " could not be stopped", t);
794        }
795      }
796    }   
797    stopHttpServer();
798  }
799  
800  private void startTrashEmptier(final Configuration conf) throws IOException {
801    long trashInterval =
802        conf.getLong(FS_TRASH_INTERVAL_KEY, FS_TRASH_INTERVAL_DEFAULT);
803    if (trashInterval == 0) {
804      return;
805    } else if (trashInterval < 0) {
806      throw new IOException("Cannot start trash emptier with negative interval."
807          + " Set " + FS_TRASH_INTERVAL_KEY + " to a positive value.");
808    }
809    
810    // This may be called from the transitionToActive code path, in which
811    // case the current user is the administrator, not the NN. The trash
812    // emptier needs to run as the NN. See HDFS-3972.
813    FileSystem fs = SecurityUtil.doAsLoginUser(
814        new PrivilegedExceptionAction<FileSystem>() {
815          @Override
816          public FileSystem run() throws IOException {
817            return FileSystem.get(conf);
818          }
819        });
820    this.emptier = new Thread(new Trash(fs, conf).getEmptier(), "Trash Emptier");
821    this.emptier.setDaemon(true);
822    this.emptier.start();
823  }
824  
825  private void stopTrashEmptier() {
826    if (this.emptier != null) {
827      emptier.interrupt();
828      emptier = null;
829    }
830  }
831  
832  private void startHttpServer(final Configuration conf) throws IOException {
833    httpServer = new NameNodeHttpServer(conf, this, getHttpServerBindAddress(conf));
834    httpServer.start();
835    httpServer.setStartupProgress(startupProgress);
836  }
837  
838  private void stopHttpServer() {
839    try {
840      if (httpServer != null) httpServer.stop();
841    } catch (Exception e) {
842      LOG.error("Exception while stopping httpserver", e);
843    }
844  }
845
846  /**
847   * Start NameNode.
848   * <p>
849   * The name-node can be started with one of the following startup options:
850   * <ul> 
851   * <li>{@link StartupOption#REGULAR REGULAR} - normal name node startup</li>
852   * <li>{@link StartupOption#FORMAT FORMAT} - format name node</li>
853   * <li>{@link StartupOption#BACKUP BACKUP} - start backup node</li>
854   * <li>{@link StartupOption#CHECKPOINT CHECKPOINT} - start checkpoint node</li>
855   * <li>{@link StartupOption#UPGRADE UPGRADE} - start the cluster  
856   * <li>{@link StartupOption#UPGRADEONLY UPGRADEONLY} - upgrade the cluster  
857   * upgrade and create a snapshot of the current file system state</li> 
858   * <li>{@link StartupOption#RECOVER RECOVERY} - recover name node
859   * metadata</li>
860   * <li>{@link StartupOption#ROLLBACK ROLLBACK} - roll the  
861   *            cluster back to the previous state</li>
862   * <li>{@link StartupOption#FINALIZE FINALIZE} - finalize 
863   *            previous upgrade</li>
864   * <li>{@link StartupOption#IMPORT IMPORT} - import checkpoint</li>
865   * </ul>
866   * The option is passed via configuration field: 
867   * <tt>dfs.namenode.startup</tt>
868   * 
869   * The conf will be modified to reflect the actual ports on which 
870   * the NameNode is up and running if the user passes the port as
871   * <code>zero</code> in the conf.
872   * 
873   * @param conf  confirguration
874   * @throws IOException
875   */
876  public NameNode(Configuration conf) throws IOException {
877    this(conf, NamenodeRole.NAMENODE);
878  }
879
880  protected NameNode(Configuration conf, NamenodeRole role)
881      throws IOException {
882    this.tracer = new Tracer.Builder("NameNode").
883        conf(TraceUtils.wrapHadoopConf(NAMENODE_HTRACE_PREFIX, conf)).
884        build();
885    this.tracerConfigurationManager =
886        new TracerConfigurationManager(NAMENODE_HTRACE_PREFIX, conf);
887    this.conf = conf;
888    this.role = role;
889    setClientNamenodeAddress(conf);
890    String nsId = getNameServiceId(conf);
891    String namenodeId = HAUtil.getNameNodeId(conf, nsId);
892    this.haEnabled = HAUtil.isHAEnabled(conf, nsId);
893    state = createHAState(getStartupOption(conf));
894    this.allowStaleStandbyReads = HAUtil.shouldAllowStandbyReads(conf);
895    this.haContext = createHAContext();
896    try {
897      initializeGenericKeys(conf, nsId, namenodeId);
898      initialize(conf);
899      try {
900        haContext.writeLock();
901        state.prepareToEnterState(haContext);
902        state.enterState(haContext);
903      } finally {
904        haContext.writeUnlock();
905      }
906    } catch (IOException e) {
907      this.stopAtException(e);
908      throw e;
909    } catch (HadoopIllegalArgumentException e) {
910      this.stopAtException(e);
911      throw e;
912    }
913    this.started.set(true);
914  }
915
916  private void stopAtException(Exception e){
917    try {
918      this.stop();
919    } catch (Exception ex) {
920      LOG.warn("Encountered exception when handling exception ("
921          + e.getMessage() + "):", ex);
922    }
923  }
924
925  protected HAState createHAState(StartupOption startOpt) {
926    if (!haEnabled || startOpt == StartupOption.UPGRADE 
927        || startOpt == StartupOption.UPGRADEONLY) {
928      return ACTIVE_STATE;
929    } else {
930      return STANDBY_STATE;
931    }
932  }
933
934  protected HAContext createHAContext() {
935    return new NameNodeHAContext();
936  }
937
938  /**
939   * Wait for service to finish.
940   * (Normally, it runs forever.)
941   */
942  public void join() {
943    try {
944      rpcServer.join();
945    } catch (InterruptedException ie) {
946      LOG.info("Caught interrupted exception ", ie);
947    }
948  }
949
950  /**
951   * Stop all NameNode threads and wait for all to finish.
952   */
953  public void stop() {
954    synchronized(this) {
955      if (stopRequested)
956        return;
957      stopRequested = true;
958    }
959    try {
960      if (state != null) {
961        state.exitState(haContext);
962      }
963    } catch (ServiceFailedException e) {
964      LOG.warn("Encountered exception while exiting state ", e);
965    } finally {
966      stopMetricsLogger();
967      stopCommonServices();
968      if (metrics != null) {
969        metrics.shutdown();
970      }
971      if (namesystem != null) {
972        namesystem.shutdown();
973      }
974      if (nameNodeStatusBeanName != null) {
975        MBeans.unregister(nameNodeStatusBeanName);
976        nameNodeStatusBeanName = null;
977      }
978    }
979    tracer.close();
980  }
981
982  synchronized boolean isStopRequested() {
983    return stopRequested;
984  }
985
986  /**
987   * Is the cluster currently in safe mode?
988   */
989  public boolean isInSafeMode() {
990    return namesystem.isInSafeMode();
991  }
992    
993  /** get FSImage */
994  @VisibleForTesting
995  public FSImage getFSImage() {
996    return namesystem.getFSImage();
997  }
998
999  /**
1000   * @return NameNode RPC address
1001   */
1002  public InetSocketAddress getNameNodeAddress() {
1003    return rpcServer.getRpcAddress();
1004  }
1005
1006  /**
1007   * @return NameNode RPC address in "host:port" string form
1008   */
1009  public String getNameNodeAddressHostPortString() {
1010    return NetUtils.getHostPortString(getNameNodeAddress());
1011  }
1012
1013  /**
1014   * @return NameNode service RPC address if configured, the
1015   *    NameNode RPC address otherwise
1016   */
1017  public InetSocketAddress getServiceRpcAddress() {
1018    final InetSocketAddress serviceAddr = rpcServer.getServiceRpcAddress();
1019    return serviceAddr == null ? getNameNodeAddress() : serviceAddr;
1020  }
1021
1022  /**
1023   * @return NameNode HTTP address, used by the Web UI, image transfer,
1024   *    and HTTP-based file system clients like Hftp and WebHDFS
1025   */
1026  public InetSocketAddress getHttpAddress() {
1027    return httpServer.getHttpAddress();
1028  }
1029
1030  /**
1031   * @return NameNode HTTPS address, used by the Web UI, image transfer,
1032   *    and HTTP-based file system clients like Hftp and WebHDFS
1033   */
1034  public InetSocketAddress getHttpsAddress() {
1035    return httpServer.getHttpsAddress();
1036  }
1037
1038  /**
1039   * @return NameNodeHttpServer, used by unit tests to ensure a full shutdown,
1040   * so that no bind exception is thrown during restart.
1041   */
1042  @VisibleForTesting
1043  public void joinHttpServer() {
1044    if (httpServer != null) {
1045      try {
1046        httpServer.join();
1047      } catch (InterruptedException e) {
1048        LOG.info("Caught InterruptedException joining NameNodeHttpServer", e);
1049        Thread.currentThread().interrupt();
1050      }
1051    }
1052  }
1053
1054  /**
1055   * Verify that configured directories exist, then
1056   * Interactively confirm that formatting is desired 
1057   * for each existing directory and format them.
1058   * 
1059   * @param conf configuration to use
1060   * @param force if true, format regardless of whether dirs exist
1061   * @return true if formatting was aborted, false otherwise
1062   * @throws IOException
1063   */
1064  private static boolean format(Configuration conf, boolean force,
1065      boolean isInteractive) throws IOException {
1066    String nsId = DFSUtil.getNamenodeNameServiceId(conf);
1067    String namenodeId = HAUtil.getNameNodeId(conf, nsId);
1068    initializeGenericKeys(conf, nsId, namenodeId);
1069    checkAllowFormat(conf);
1070
1071    if (UserGroupInformation.isSecurityEnabled()) {
1072      InetSocketAddress socAddr = DFSUtilClient.getNNAddress(conf);
1073      SecurityUtil.login(conf, DFS_NAMENODE_KEYTAB_FILE_KEY,
1074          DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY, socAddr.getHostName());
1075    }
1076    
1077    Collection<URI> nameDirsToFormat = FSNamesystem.getNamespaceDirs(conf);
1078    List<URI> sharedDirs = FSNamesystem.getSharedEditsDirs(conf);
1079    List<URI> dirsToPrompt = new ArrayList<URI>();
1080    dirsToPrompt.addAll(nameDirsToFormat);
1081    dirsToPrompt.addAll(sharedDirs);
1082    List<URI> editDirsToFormat = 
1083                 FSNamesystem.getNamespaceEditsDirs(conf);
1084
1085    // if clusterID is not provided - see if you can find the current one
1086    String clusterId = StartupOption.FORMAT.getClusterId();
1087    if(clusterId == null || clusterId.equals("")) {
1088      //Generate a new cluster id
1089      clusterId = NNStorage.newClusterID();
1090    }
1091    System.out.println("Formatting using clusterid: " + clusterId);
1092    
1093    FSImage fsImage = new FSImage(conf, nameDirsToFormat, editDirsToFormat);
1094    try {
1095      FSNamesystem fsn = new FSNamesystem(conf, fsImage);
1096      fsImage.getEditLog().initJournalsForWrite();
1097
1098      if (!fsImage.confirmFormat(force, isInteractive)) {
1099        return true; // aborted
1100      }
1101
1102      fsImage.format(fsn, clusterId);
1103    } catch (IOException ioe) {
1104      LOG.warn("Encountered exception during format: ", ioe);
1105      fsImage.close();
1106      throw ioe;
1107    }
1108    return false;
1109  }
1110
1111  public static void checkAllowFormat(Configuration conf) throws IOException {
1112    if (!conf.getBoolean(DFS_NAMENODE_SUPPORT_ALLOW_FORMAT_KEY, 
1113        DFS_NAMENODE_SUPPORT_ALLOW_FORMAT_DEFAULT)) {
1114      throw new IOException("The option " + DFS_NAMENODE_SUPPORT_ALLOW_FORMAT_KEY
1115                + " is set to false for this filesystem, so it "
1116                + "cannot be formatted. You will need to set "
1117                + DFS_NAMENODE_SUPPORT_ALLOW_FORMAT_KEY +" parameter "
1118                + "to true in order to format this filesystem");
1119    }
1120  }
1121  
1122  @VisibleForTesting
1123  public static boolean initializeSharedEdits(Configuration conf) throws IOException {
1124    return initializeSharedEdits(conf, true);
1125  }
1126  
1127  @VisibleForTesting
1128  public static boolean initializeSharedEdits(Configuration conf,
1129      boolean force) throws IOException {
1130    return initializeSharedEdits(conf, force, false);
1131  }
1132
1133  /**
1134   * Clone the supplied configuration but remove the shared edits dirs.
1135   *
1136   * @param conf Supplies the original configuration.
1137   * @return Cloned configuration without the shared edit dirs.
1138   * @throws IOException on failure to generate the configuration.
1139   */
1140  private static Configuration getConfigurationWithoutSharedEdits(
1141      Configuration conf)
1142      throws IOException {
1143    List<URI> editsDirs = FSNamesystem.getNamespaceEditsDirs(conf, false);
1144    String editsDirsString = Joiner.on(",").join(editsDirs);
1145
1146    Configuration confWithoutShared = new Configuration(conf);
1147    confWithoutShared.unset(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY);
1148    confWithoutShared.setStrings(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY,
1149        editsDirsString);
1150    return confWithoutShared;
1151  }
1152
1153  /**
1154   * Format a new shared edits dir and copy in enough edit log segments so that
1155   * the standby NN can start up.
1156   * 
1157   * @param conf configuration
1158   * @param force format regardless of whether or not the shared edits dir exists
1159   * @param interactive prompt the user when a dir exists
1160   * @return true if the command aborts, false otherwise
1161   */
1162  private static boolean initializeSharedEdits(Configuration conf,
1163      boolean force, boolean interactive) throws IOException {
1164    String nsId = DFSUtil.getNamenodeNameServiceId(conf);
1165    String namenodeId = HAUtil.getNameNodeId(conf, nsId);
1166    initializeGenericKeys(conf, nsId, namenodeId);
1167    
1168    if (conf.get(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY) == null) {
1169      LOG.error("No shared edits directory configured for namespace " +
1170          nsId + " namenode " + namenodeId);
1171      return false;
1172    }
1173
1174    if (UserGroupInformation.isSecurityEnabled()) {
1175      InetSocketAddress socAddr = DFSUtilClient.getNNAddress(conf);
1176      SecurityUtil.login(conf, DFS_NAMENODE_KEYTAB_FILE_KEY,
1177          DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY, socAddr.getHostName());
1178    }
1179
1180    NNStorage existingStorage = null;
1181    FSImage sharedEditsImage = null;
1182    try {
1183      FSNamesystem fsns =
1184          FSNamesystem.loadFromDisk(getConfigurationWithoutSharedEdits(conf));
1185      
1186      existingStorage = fsns.getFSImage().getStorage();
1187      NamespaceInfo nsInfo = existingStorage.getNamespaceInfo();
1188      
1189      List<URI> sharedEditsDirs = FSNamesystem.getSharedEditsDirs(conf);
1190      
1191      sharedEditsImage = new FSImage(conf,
1192          Lists.<URI>newArrayList(),
1193          sharedEditsDirs);
1194      sharedEditsImage.getEditLog().initJournalsForWrite();
1195      
1196      if (!sharedEditsImage.confirmFormat(force, interactive)) {
1197        return true; // abort
1198      }
1199      
1200      NNStorage newSharedStorage = sharedEditsImage.getStorage();
1201      // Call Storage.format instead of FSImage.format here, since we don't
1202      // actually want to save a checkpoint - just prime the dirs with
1203      // the existing namespace info
1204      newSharedStorage.format(nsInfo);
1205      sharedEditsImage.getEditLog().formatNonFileJournals(nsInfo);
1206
1207      // Need to make sure the edit log segments are in good shape to initialize
1208      // the shared edits dir.
1209      fsns.getFSImage().getEditLog().close();
1210      fsns.getFSImage().getEditLog().initJournalsForWrite();
1211      fsns.getFSImage().getEditLog().recoverUnclosedStreams();
1212
1213      copyEditLogSegmentsToSharedDir(fsns, sharedEditsDirs, newSharedStorage,
1214          conf);
1215    } catch (IOException ioe) {
1216      LOG.error("Could not initialize shared edits dir", ioe);
1217      return true; // aborted
1218    } finally {
1219      if (sharedEditsImage != null) {
1220        try {
1221          sharedEditsImage.close();
1222        }  catch (IOException ioe) {
1223          LOG.warn("Could not close sharedEditsImage", ioe);
1224        }
1225      }
1226      // Have to unlock storage explicitly for the case when we're running in a
1227      // unit test, which runs in the same JVM as NNs.
1228      if (existingStorage != null) {
1229        try {
1230          existingStorage.unlockAll();
1231        } catch (IOException ioe) {
1232          LOG.warn("Could not unlock storage directories", ioe);
1233          return true; // aborted
1234        }
1235      }
1236    }
1237    return false; // did not abort
1238  }
1239
1240  private static void copyEditLogSegmentsToSharedDir(FSNamesystem fsns,
1241      Collection<URI> sharedEditsDirs, NNStorage newSharedStorage,
1242      Configuration conf) throws IOException {
1243    Preconditions.checkArgument(!sharedEditsDirs.isEmpty(),
1244        "No shared edits specified");
1245    // Copy edit log segments into the new shared edits dir.
1246    List<URI> sharedEditsUris = new ArrayList<URI>(sharedEditsDirs);
1247    FSEditLog newSharedEditLog = new FSEditLog(conf, newSharedStorage,
1248        sharedEditsUris);
1249    newSharedEditLog.initJournalsForWrite();
1250    newSharedEditLog.recoverUnclosedStreams();
1251    
1252    FSEditLog sourceEditLog = fsns.getFSImage().editLog;
1253    
1254    long fromTxId = fsns.getFSImage().getMostRecentCheckpointTxId();
1255    
1256    Collection<EditLogInputStream> streams = null;
1257    try {
1258      streams = sourceEditLog.selectInputStreams(fromTxId + 1, 0);
1259
1260      // Set the nextTxid to the CheckpointTxId+1
1261      newSharedEditLog.setNextTxId(fromTxId + 1);
1262
1263      // Copy all edits after last CheckpointTxId to shared edits dir
1264      for (EditLogInputStream stream : streams) {
1265        LOG.debug("Beginning to copy stream " + stream + " to shared edits");
1266        FSEditLogOp op;
1267        boolean segmentOpen = false;
1268        while ((op = stream.readOp()) != null) {
1269          if (LOG.isTraceEnabled()) {
1270            LOG.trace("copying op: " + op);
1271          }
1272          if (!segmentOpen) {
1273            newSharedEditLog.startLogSegment(op.txid, false,
1274                fsns.getEffectiveLayoutVersion());
1275            segmentOpen = true;
1276          }
1277
1278          newSharedEditLog.logEdit(op);
1279
1280          if (op.opCode == FSEditLogOpCodes.OP_END_LOG_SEGMENT) {
1281            newSharedEditLog.endCurrentLogSegment(false);
1282            LOG.debug("ending log segment because of END_LOG_SEGMENT op in "
1283                + stream);
1284            segmentOpen = false;
1285          }
1286        }
1287
1288        if (segmentOpen) {
1289          LOG.debug("ending log segment because of end of stream in " + stream);
1290          newSharedEditLog.logSync();
1291          newSharedEditLog.endCurrentLogSegment(false);
1292          segmentOpen = false;
1293        }
1294      }
1295    } finally {
1296      if (streams != null) {
1297        FSEditLog.closeAllStreams(streams);
1298      }
1299    }
1300  }
1301  
1302  @VisibleForTesting
1303  public static boolean doRollback(Configuration conf,
1304      boolean isConfirmationNeeded) throws IOException {
1305    String nsId = DFSUtil.getNamenodeNameServiceId(conf);
1306    String namenodeId = HAUtil.getNameNodeId(conf, nsId);
1307    initializeGenericKeys(conf, nsId, namenodeId);
1308
1309    FSNamesystem nsys = new FSNamesystem(conf, new FSImage(conf));
1310    System.err.print(
1311        "\"rollBack\" will remove the current state of the file system,\n"
1312        + "returning you to the state prior to initiating your recent.\n"
1313        + "upgrade. This action is permanent and cannot be undone. If you\n"
1314        + "are performing a rollback in an HA environment, you should be\n"
1315        + "certain that no NameNode process is running on any host.");
1316    if (isConfirmationNeeded) {
1317      if (!confirmPrompt("Roll back file system state?")) {
1318        System.err.println("Rollback aborted.");
1319        return true;
1320      }
1321    }
1322    nsys.getFSImage().doRollback(nsys);
1323    return false;
1324  }
1325
1326  private static void printUsage(PrintStream out) {
1327    out.println(USAGE + "\n");
1328  }
1329
1330  @VisibleForTesting
1331  static StartupOption parseArguments(String args[]) {
1332    int argsLen = (args == null) ? 0 : args.length;
1333    StartupOption startOpt = StartupOption.REGULAR;
1334    for(int i=0; i < argsLen; i++) {
1335      String cmd = args[i];
1336      if (StartupOption.FORMAT.getName().equalsIgnoreCase(cmd)) {
1337        startOpt = StartupOption.FORMAT;
1338        for (i = i + 1; i < argsLen; i++) {
1339          if (args[i].equalsIgnoreCase(StartupOption.CLUSTERID.getName())) {
1340            i++;
1341            if (i >= argsLen) {
1342              // if no cluster id specified, return null
1343              LOG.error("Must specify a valid cluster ID after the "
1344                  + StartupOption.CLUSTERID.getName() + " flag");
1345              return null;
1346            }
1347            String clusterId = args[i];
1348            // Make sure an id is specified and not another flag
1349            if (clusterId.isEmpty() ||
1350                clusterId.equalsIgnoreCase(StartupOption.FORCE.getName()) ||
1351                clusterId.equalsIgnoreCase(
1352                    StartupOption.NONINTERACTIVE.getName())) {
1353              LOG.error("Must specify a valid cluster ID after the "
1354                  + StartupOption.CLUSTERID.getName() + " flag");
1355              return null;
1356            }
1357            startOpt.setClusterId(clusterId);
1358          }
1359
1360          if (args[i].equalsIgnoreCase(StartupOption.FORCE.getName())) {
1361            startOpt.setForceFormat(true);
1362          }
1363
1364          if (args[i].equalsIgnoreCase(StartupOption.NONINTERACTIVE.getName())) {
1365            startOpt.setInteractiveFormat(false);
1366          }
1367        }
1368      } else if (StartupOption.GENCLUSTERID.getName().equalsIgnoreCase(cmd)) {
1369        startOpt = StartupOption.GENCLUSTERID;
1370      } else if (StartupOption.REGULAR.getName().equalsIgnoreCase(cmd)) {
1371        startOpt = StartupOption.REGULAR;
1372      } else if (StartupOption.BACKUP.getName().equalsIgnoreCase(cmd)) {
1373        startOpt = StartupOption.BACKUP;
1374      } else if (StartupOption.CHECKPOINT.getName().equalsIgnoreCase(cmd)) {
1375        startOpt = StartupOption.CHECKPOINT;
1376      } else if (StartupOption.UPGRADE.getName().equalsIgnoreCase(cmd)
1377          || StartupOption.UPGRADEONLY.getName().equalsIgnoreCase(cmd)) {
1378        startOpt = StartupOption.UPGRADE.getName().equalsIgnoreCase(cmd) ? 
1379            StartupOption.UPGRADE : StartupOption.UPGRADEONLY;
1380        /* Can be followed by CLUSTERID with a required parameter or
1381         * RENAMERESERVED with an optional parameter
1382         */
1383        while (i + 1 < argsLen) {
1384          String flag = args[i + 1];
1385          if (flag.equalsIgnoreCase(StartupOption.CLUSTERID.getName())) {
1386            if (i + 2 < argsLen) {
1387              i += 2;
1388              startOpt.setClusterId(args[i]);
1389            } else {
1390              LOG.error("Must specify a valid cluster ID after the "
1391                  + StartupOption.CLUSTERID.getName() + " flag");
1392              return null;
1393            }
1394          } else if (flag.equalsIgnoreCase(StartupOption.RENAMERESERVED
1395              .getName())) {
1396            if (i + 2 < argsLen) {
1397              FSImageFormat.setRenameReservedPairs(args[i + 2]);
1398              i += 2;
1399            } else {
1400              FSImageFormat.useDefaultRenameReservedPairs();
1401              i += 1;
1402            }
1403          } else {
1404            LOG.error("Unknown upgrade flag " + flag);
1405            return null;
1406          }
1407        }
1408      } else if (StartupOption.ROLLINGUPGRADE.getName().equalsIgnoreCase(cmd)) {
1409        startOpt = StartupOption.ROLLINGUPGRADE;
1410        ++i;
1411        if (i >= argsLen) {
1412          LOG.error("Must specify a rolling upgrade startup option "
1413              + RollingUpgradeStartupOption.getAllOptionString());
1414          return null;
1415        }
1416        startOpt.setRollingUpgradeStartupOption(args[i]);
1417      } else if (StartupOption.ROLLBACK.getName().equalsIgnoreCase(cmd)) {
1418        startOpt = StartupOption.ROLLBACK;
1419      } else if (StartupOption.FINALIZE.getName().equalsIgnoreCase(cmd)) {
1420        startOpt = StartupOption.FINALIZE;
1421      } else if (StartupOption.IMPORT.getName().equalsIgnoreCase(cmd)) {
1422        startOpt = StartupOption.IMPORT;
1423      } else if (StartupOption.BOOTSTRAPSTANDBY.getName().equalsIgnoreCase(cmd)) {
1424        startOpt = StartupOption.BOOTSTRAPSTANDBY;
1425        return startOpt;
1426      } else if (StartupOption.INITIALIZESHAREDEDITS.getName().equalsIgnoreCase(cmd)) {
1427        startOpt = StartupOption.INITIALIZESHAREDEDITS;
1428        for (i = i + 1 ; i < argsLen; i++) {
1429          if (StartupOption.NONINTERACTIVE.getName().equals(args[i])) {
1430            startOpt.setInteractiveFormat(false);
1431          } else if (StartupOption.FORCE.getName().equals(args[i])) {
1432            startOpt.setForceFormat(true);
1433          } else {
1434            LOG.error("Invalid argument: " + args[i]);
1435            return null;
1436          }
1437        }
1438        return startOpt;
1439      } else if (StartupOption.RECOVER.getName().equalsIgnoreCase(cmd)) {
1440        if (startOpt != StartupOption.REGULAR) {
1441          throw new RuntimeException("Can't combine -recover with " +
1442              "other startup options.");
1443        }
1444        startOpt = StartupOption.RECOVER;
1445        while (++i < argsLen) {
1446          if (args[i].equalsIgnoreCase(
1447                StartupOption.FORCE.getName())) {
1448            startOpt.setForce(MetaRecoveryContext.FORCE_FIRST_CHOICE);
1449          } else {
1450            throw new RuntimeException("Error parsing recovery options: " + 
1451              "can't understand option \"" + args[i] + "\"");
1452          }
1453        }
1454      } else if (StartupOption.METADATAVERSION.getName().equalsIgnoreCase(cmd)) {
1455        startOpt = StartupOption.METADATAVERSION;
1456      } else {
1457        return null;
1458      }
1459    }
1460    return startOpt;
1461  }
1462
1463  private static void setStartupOption(Configuration conf, StartupOption opt) {
1464    conf.set(DFS_NAMENODE_STARTUP_KEY, opt.name());
1465  }
1466
1467  public static StartupOption getStartupOption(Configuration conf) {
1468    return StartupOption.valueOf(conf.get(DFS_NAMENODE_STARTUP_KEY,
1469                                          StartupOption.REGULAR.toString()));
1470  }
1471
1472  private static void doRecovery(StartupOption startOpt, Configuration conf)
1473      throws IOException {
1474    String nsId = DFSUtil.getNamenodeNameServiceId(conf);
1475    String namenodeId = HAUtil.getNameNodeId(conf, nsId);
1476    initializeGenericKeys(conf, nsId, namenodeId);
1477    if (startOpt.getForce() < MetaRecoveryContext.FORCE_ALL) {
1478      if (!confirmPrompt("You have selected Metadata Recovery mode.  " +
1479          "This mode is intended to recover lost metadata on a corrupt " +
1480          "filesystem.  Metadata recovery mode often permanently deletes " +
1481          "data from your HDFS filesystem.  Please back up your edit log " +
1482          "and fsimage before trying this!\n\n" +
1483          "Are you ready to proceed? (Y/N)\n")) {
1484        System.err.println("Recovery aborted at user request.\n");
1485        return;
1486      }
1487    }
1488    MetaRecoveryContext.LOG.info("starting recovery...");
1489    UserGroupInformation.setConfiguration(conf);
1490    NameNode.initMetrics(conf, startOpt.toNodeRole());
1491    FSNamesystem fsn = null;
1492    try {
1493      fsn = FSNamesystem.loadFromDisk(conf);
1494      fsn.getFSImage().saveNamespace(fsn);
1495      MetaRecoveryContext.LOG.info("RECOVERY COMPLETE");
1496    } catch (IOException e) {
1497      MetaRecoveryContext.LOG.info("RECOVERY FAILED: caught exception", e);
1498      throw e;
1499    } catch (RuntimeException e) {
1500      MetaRecoveryContext.LOG.info("RECOVERY FAILED: caught exception", e);
1501      throw e;
1502    } finally {
1503      if (fsn != null)
1504        fsn.close();
1505    }
1506  }
1507
1508  /**
1509   * Verify that configured directories exist, then print the metadata versions
1510   * of the software and the image.
1511   *
1512   * @param conf configuration to use
1513   * @throws IOException
1514   */
1515  private static boolean printMetadataVersion(Configuration conf)
1516    throws IOException {
1517    final String nsId = DFSUtil.getNamenodeNameServiceId(conf);
1518    final String namenodeId = HAUtil.getNameNodeId(conf, nsId);
1519    NameNode.initializeGenericKeys(conf, nsId, namenodeId);
1520    final FSImage fsImage = new FSImage(conf);
1521    final FSNamesystem fs = new FSNamesystem(conf, fsImage, false);
1522    return fsImage.recoverTransitionRead(
1523      StartupOption.METADATAVERSION, fs, null);
1524  }
1525
1526  public static NameNode createNameNode(String argv[], Configuration conf)
1527      throws IOException {
1528    LOG.info("createNameNode " + Arrays.asList(argv));
1529    if (conf == null)
1530      conf = new HdfsConfiguration();
1531    // Parse out some generic args into Configuration.
1532    GenericOptionsParser hParser = new GenericOptionsParser(conf, argv);
1533    argv = hParser.getRemainingArgs();
1534    // Parse the rest, NN specific args.
1535    StartupOption startOpt = parseArguments(argv);
1536    if (startOpt == null) {
1537      printUsage(System.err);
1538      return null;
1539    }
1540    setStartupOption(conf, startOpt);
1541
1542    switch (startOpt) {
1543      case FORMAT: {
1544        boolean aborted = format(conf, startOpt.getForceFormat(),
1545            startOpt.getInteractiveFormat());
1546        terminate(aborted ? 1 : 0);
1547        return null; // avoid javac warning
1548      }
1549      case GENCLUSTERID: {
1550        System.err.println("Generating new cluster id:");
1551        System.out.println(NNStorage.newClusterID());
1552        terminate(0);
1553        return null;
1554      }
1555      case FINALIZE: {
1556        System.err.println("Use of the argument '" + StartupOption.FINALIZE +
1557            "' is no longer supported. To finalize an upgrade, start the NN " +
1558            " and then run `hdfs dfsadmin -finalizeUpgrade'");
1559        terminate(1);
1560        return null; // avoid javac warning
1561      }
1562      case ROLLBACK: {
1563        boolean aborted = doRollback(conf, true);
1564        terminate(aborted ? 1 : 0);
1565        return null; // avoid warning
1566      }
1567      case BOOTSTRAPSTANDBY: {
1568        String toolArgs[] = Arrays.copyOfRange(argv, 1, argv.length);
1569        int rc = BootstrapStandby.run(toolArgs, conf);
1570        terminate(rc);
1571        return null; // avoid warning
1572      }
1573      case INITIALIZESHAREDEDITS: {
1574        boolean aborted = initializeSharedEdits(conf,
1575            startOpt.getForceFormat(),
1576            startOpt.getInteractiveFormat());
1577        terminate(aborted ? 1 : 0);
1578        return null; // avoid warning
1579      }
1580      case BACKUP:
1581      case CHECKPOINT: {
1582        NamenodeRole role = startOpt.toNodeRole();
1583        DefaultMetricsSystem.initialize(role.toString().replace(" ", ""));
1584        return new BackupNode(conf, role);
1585      }
1586      case RECOVER: {
1587        NameNode.doRecovery(startOpt, conf);
1588        return null;
1589      }
1590      case METADATAVERSION: {
1591        printMetadataVersion(conf);
1592        terminate(0);
1593        return null; // avoid javac warning
1594      }
1595      case UPGRADEONLY: {
1596        DefaultMetricsSystem.initialize("NameNode");
1597        new NameNode(conf);
1598        terminate(0);
1599        return null;
1600      }
1601      default: {
1602        DefaultMetricsSystem.initialize("NameNode");
1603        return new NameNode(conf);
1604      }
1605    }
1606  }
1607
1608  /**
1609   * In federation configuration is set for a set of
1610   * namenode and secondary namenode/backup/checkpointer, which are
1611   * grouped under a logical nameservice ID. The configuration keys specific 
1612   * to them have suffix set to configured nameserviceId.
1613   * 
1614   * This method copies the value from specific key of format key.nameserviceId
1615   * to key, to set up the generic configuration. Once this is done, only
1616   * generic version of the configuration is read in rest of the code, for
1617   * backward compatibility and simpler code changes.
1618   * 
1619   * @param conf
1620   *          Configuration object to lookup specific key and to set the value
1621   *          to the key passed. Note the conf object is modified
1622   * @param nameserviceId name service Id (to distinguish federated NNs)
1623   * @param namenodeId the namenode ID (to distinguish HA NNs)
1624   * @see DFSUtil#setGenericConf(Configuration, String, String, String...)
1625   */
1626  public static void initializeGenericKeys(Configuration conf,
1627      String nameserviceId, String namenodeId) {
1628    if ((nameserviceId != null && !nameserviceId.isEmpty()) || 
1629        (namenodeId != null && !namenodeId.isEmpty())) {
1630      if (nameserviceId != null) {
1631        conf.set(DFS_NAMESERVICE_ID, nameserviceId);
1632      }
1633      if (namenodeId != null) {
1634        conf.set(DFS_HA_NAMENODE_ID_KEY, namenodeId);
1635      }
1636      
1637      DFSUtil.setGenericConf(conf, nameserviceId, namenodeId,
1638          NAMENODE_SPECIFIC_KEYS);
1639      DFSUtil.setGenericConf(conf, nameserviceId, null,
1640          NAMESERVICE_SPECIFIC_KEYS);
1641    }
1642    
1643    // If the RPC address is set use it to (re-)configure the default FS
1644    if (conf.get(DFS_NAMENODE_RPC_ADDRESS_KEY) != null) {
1645      URI defaultUri = URI.create(HdfsConstants.HDFS_URI_SCHEME + "://"
1646          + conf.get(DFS_NAMENODE_RPC_ADDRESS_KEY));
1647      conf.set(FS_DEFAULT_NAME_KEY, defaultUri.toString());
1648      if (LOG.isDebugEnabled()) {
1649        LOG.debug("Setting " + FS_DEFAULT_NAME_KEY + " to " + defaultUri.toString());
1650      }
1651    }
1652  }
1653    
1654  /** 
1655   * Get the name service Id for the node
1656   * @return name service Id or null if federation is not configured
1657   */
1658  protected String getNameServiceId(Configuration conf) {
1659    return DFSUtil.getNamenodeNameServiceId(conf);
1660  }
1661  
1662  /**
1663   */
1664  public static void main(String argv[]) throws Exception {
1665    if (DFSUtil.parseHelpArgument(argv, NameNode.USAGE, System.out, true)) {
1666      System.exit(0);
1667    }
1668
1669    try {
1670      StringUtils.startupShutdownMessage(NameNode.class, argv, LOG);
1671      NameNode namenode = createNameNode(argv, null);
1672      if (namenode != null) {
1673        namenode.join();
1674      }
1675    } catch (Throwable e) {
1676      LOG.error("Failed to start namenode.", e);
1677      terminate(1, e);
1678    }
1679  }
1680
1681  synchronized void monitorHealth() 
1682      throws HealthCheckFailedException, AccessControlException {
1683    namesystem.checkSuperuserPrivilege();
1684    if (!haEnabled) {
1685      return; // no-op, if HA is not enabled
1686    }
1687    getNamesystem().checkAvailableResources();
1688    if (!getNamesystem().nameNodeHasResourcesAvailable()) {
1689      throw new HealthCheckFailedException(
1690          "The NameNode has no resources available");
1691    }
1692  }
1693  
1694  synchronized void transitionToActive() 
1695      throws ServiceFailedException, AccessControlException {
1696    namesystem.checkSuperuserPrivilege();
1697    if (!haEnabled) {
1698      throw new ServiceFailedException("HA for namenode is not enabled");
1699    }
1700    state.setState(haContext, ACTIVE_STATE);
1701  }
1702  
1703  synchronized void transitionToStandby() 
1704      throws ServiceFailedException, AccessControlException {
1705    namesystem.checkSuperuserPrivilege();
1706    if (!haEnabled) {
1707      throw new ServiceFailedException("HA for namenode is not enabled");
1708    }
1709    state.setState(haContext, STANDBY_STATE);
1710  }
1711
1712  synchronized HAServiceStatus getServiceStatus()
1713      throws ServiceFailedException, AccessControlException {
1714    namesystem.checkSuperuserPrivilege();
1715    if (!haEnabled) {
1716      throw new ServiceFailedException("HA for namenode is not enabled");
1717    }
1718    if (state == null) {
1719      return new HAServiceStatus(HAServiceState.INITIALIZING);
1720    }
1721    HAServiceState retState = state.getServiceState();
1722    HAServiceStatus ret = new HAServiceStatus(retState);
1723    if (retState == HAServiceState.STANDBY) {
1724      String safemodeTip = namesystem.getSafeModeTip();
1725      if (!safemodeTip.isEmpty()) {
1726        ret.setNotReadyToBecomeActive(
1727            "The NameNode is in safemode. " +
1728            safemodeTip);
1729      } else {
1730        ret.setReadyToBecomeActive();
1731      }
1732    } else if (retState == HAServiceState.ACTIVE) {
1733      ret.setReadyToBecomeActive();
1734    } else {
1735      ret.setNotReadyToBecomeActive("State is " + state);
1736    }
1737    return ret;
1738  }
1739
1740  synchronized HAServiceState getServiceState() {
1741    if (state == null) {
1742      return HAServiceState.INITIALIZING;
1743    }
1744    return state.getServiceState();
1745  }
1746
1747  /**
1748   * Register NameNodeStatusMXBean
1749   */
1750  private void registerNNSMXBean() {
1751    nameNodeStatusBeanName = MBeans.register("NameNode", "NameNodeStatus", this);
1752  }
1753
1754  @Override // NameNodeStatusMXBean
1755  public String getNNRole() {
1756    String roleStr = "";
1757    NamenodeRole role = getRole();
1758    if (null != role) {
1759      roleStr = role.toString();
1760    }
1761    return roleStr;
1762  }
1763
1764  @Override // NameNodeStatusMXBean
1765  public String getState() {
1766    String servStateStr = "";
1767    HAServiceState servState = getServiceState();
1768    if (null != servState) {
1769      servStateStr = servState.toString();
1770    }
1771    return servStateStr;
1772  }
1773
1774  @Override // NameNodeStatusMXBean
1775  public String getHostAndPort() {
1776    return getNameNodeAddressHostPortString();
1777  }
1778
1779  @Override // NameNodeStatusMXBean
1780  public boolean isSecurityEnabled() {
1781    return UserGroupInformation.isSecurityEnabled();
1782  }
1783
1784  @Override // NameNodeStatusMXBean
1785  public long getLastHATransitionTime() {
1786    return state.getLastHATransitionTime();
1787  }
1788
1789  @Override //NameNodeStatusMXBean
1790  public long getBytesWithFutureGenerationStamps() {
1791    return getNamesystem().getBytesInFuture();
1792  }
1793
1794  /**
1795   * Shutdown the NN immediately in an ungraceful way. Used when it would be
1796   * unsafe for the NN to continue operating, e.g. during a failed HA state
1797   * transition.
1798   * 
1799   * @param t exception which warrants the shutdown. Printed to the NN log
1800   *          before exit.
1801   * @throws ExitException thrown only for testing.
1802   */
1803  protected synchronized void doImmediateShutdown(Throwable t)
1804      throws ExitException {
1805    String message = "Error encountered requiring NN shutdown. " +
1806        "Shutting down immediately.";
1807    try {
1808      LOG.error(message, t);
1809    } catch (Throwable ignored) {
1810      // This is unlikely to happen, but there's nothing we can do if it does.
1811    }
1812    terminate(1, t);
1813  }
1814  
1815  /**
1816   * Class used to expose {@link NameNode} as context to {@link HAState}
1817   */
1818  protected class NameNodeHAContext implements HAContext {
1819    @Override
1820    public void setState(HAState s) {
1821      state = s;
1822    }
1823
1824    @Override
1825    public HAState getState() {
1826      return state;
1827    }
1828
1829    @Override
1830    public void startActiveServices() throws IOException {
1831      try {
1832        namesystem.startActiveServices();
1833        startTrashEmptier(conf);
1834      } catch (Throwable t) {
1835        doImmediateShutdown(t);
1836      }
1837    }
1838
1839    @Override
1840    public void stopActiveServices() throws IOException {
1841      try {
1842        if (namesystem != null) {
1843          namesystem.stopActiveServices();
1844        }
1845        stopTrashEmptier();
1846      } catch (Throwable t) {
1847        doImmediateShutdown(t);
1848      }
1849    }
1850
1851    @Override
1852    public void startStandbyServices() throws IOException {
1853      try {
1854        namesystem.startStandbyServices(conf);
1855      } catch (Throwable t) {
1856        doImmediateShutdown(t);
1857      }
1858    }
1859
1860    @Override
1861    public void prepareToStopStandbyServices() throws ServiceFailedException {
1862      try {
1863        namesystem.prepareToStopStandbyServices();
1864      } catch (Throwable t) {
1865        doImmediateShutdown(t);
1866      }
1867    }
1868    
1869    @Override
1870    public void stopStandbyServices() throws IOException {
1871      try {
1872        if (namesystem != null) {
1873          namesystem.stopStandbyServices();
1874        }
1875      } catch (Throwable t) {
1876        doImmediateShutdown(t);
1877      }
1878    }
1879    
1880    @Override
1881    public void writeLock() {
1882      namesystem.writeLock();
1883      namesystem.lockRetryCache();
1884    }
1885    
1886    @Override
1887    public void writeUnlock() {
1888      namesystem.unlockRetryCache();
1889      namesystem.writeUnlock();
1890    }
1891    
1892    /** Check if an operation of given category is allowed */
1893    @Override
1894    public void checkOperation(final OperationCategory op)
1895        throws StandbyException {
1896      state.checkOperation(haContext, op);
1897    }
1898    
1899    @Override
1900    public boolean allowStaleReads() {
1901      return allowStaleStandbyReads;
1902    }
1903
1904  }
1905  
1906  public boolean isStandbyState() {
1907    return (state.equals(STANDBY_STATE));
1908  }
1909  
1910  public boolean isActiveState() {
1911    return (state.equals(ACTIVE_STATE));
1912  }
1913
1914  /**
1915   * Returns whether the NameNode is completely started
1916   */
1917  boolean isStarted() {
1918    return this.started.get();
1919  }
1920
1921  /**
1922   * Check that a request to change this node's HA state is valid.
1923   * In particular, verifies that, if auto failover is enabled, non-forced
1924   * requests from the HAAdmin CLI are rejected, and vice versa.
1925   *
1926   * @param req the request to check
1927   * @throws AccessControlException if the request is disallowed
1928   */
1929  void checkHaStateChange(StateChangeRequestInfo req)
1930      throws AccessControlException {
1931    boolean autoHaEnabled = conf.getBoolean(DFS_HA_AUTO_FAILOVER_ENABLED_KEY,
1932        DFS_HA_AUTO_FAILOVER_ENABLED_DEFAULT);
1933    switch (req.getSource()) {
1934    case REQUEST_BY_USER:
1935      if (autoHaEnabled) {
1936        throw new AccessControlException(
1937            "Manual HA control for this NameNode is disallowed, because " +
1938            "automatic HA is enabled.");
1939      }
1940      break;
1941    case REQUEST_BY_USER_FORCED:
1942      if (autoHaEnabled) {
1943        LOG.warn("Allowing manual HA control from " +
1944            Server.getRemoteAddress() +
1945            " even though automatic HA is enabled, because the user " +
1946            "specified the force flag");
1947      }
1948      break;
1949    case REQUEST_BY_ZKFC:
1950      if (!autoHaEnabled) {
1951        throw new AccessControlException(
1952            "Request from ZK failover controller at " +
1953            Server.getRemoteAddress() + " denied since automatic HA " +
1954            "is not enabled"); 
1955      }
1956      break;
1957    }
1958  }
1959}