001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.ha;
020
021import java.io.IOException;
022import java.util.Arrays;
023import java.util.List;
024import java.util.concurrent.CountDownLatch;
025import java.util.concurrent.TimeUnit;
026import java.util.concurrent.locks.Lock;
027import java.util.concurrent.locks.ReentrantLock;
028
029import org.apache.commons.logging.Log;
030import org.apache.commons.logging.LogFactory;
031import org.apache.hadoop.HadoopIllegalArgumentException;
032import org.apache.hadoop.classification.InterfaceAudience;
033import org.apache.hadoop.classification.InterfaceStability;
034import org.apache.hadoop.util.ZKUtil.ZKAuthInfo;
035import org.apache.hadoop.util.StringUtils;
036import org.apache.zookeeper.data.ACL;
037import org.apache.zookeeper.KeeperException;
038import org.apache.zookeeper.Watcher;
039import org.apache.zookeeper.WatchedEvent;
040import org.apache.zookeeper.Watcher.Event;
041import org.apache.zookeeper.ZKUtil;
042import org.apache.zookeeper.ZooKeeper;
043import org.apache.zookeeper.CreateMode;
044import org.apache.zookeeper.AsyncCallback.*;
045import org.apache.zookeeper.data.Stat;
046import org.apache.zookeeper.KeeperException.Code;
047
048import com.google.common.annotations.VisibleForTesting;
049import com.google.common.base.Preconditions;
050
051/**
052 * 
053 * This class implements a simple library to perform leader election on top of
054 * Apache Zookeeper. Using Zookeeper as a coordination service, leader election
055 * can be performed by atomically creating an ephemeral lock file (znode) on
056 * Zookeeper. The service instance that successfully creates the znode becomes
057 * active and the rest become standbys. <br/>
058 * This election mechanism is only efficient for small number of election
059 * candidates (order of 10's) because contention on single znode by a large
060 * number of candidates can result in Zookeeper overload. <br/>
061 * The elector does not guarantee fencing (protection of shared resources) among
062 * service instances. After it has notified an instance about becoming a leader,
063 * then that instance must ensure that it meets the service consistency
064 * requirements. If it cannot do so, then it is recommended to quit the
065 * election. The application implements the {@link ActiveStandbyElectorCallback}
066 * to interact with the elector
067 */
068@InterfaceAudience.Private
069@InterfaceStability.Evolving
070public class ActiveStandbyElector implements StatCallback, StringCallback {
071
072  /**
073   * Callback interface to interact with the ActiveStandbyElector object. <br/>
074   * The application will be notified with a callback only on state changes
075   * (i.e. there will never be successive calls to becomeActive without an
076   * intermediate call to enterNeutralMode). <br/>
077   * The callbacks will be running on Zookeeper client library threads. The
078   * application should return from these callbacks quickly so as not to impede
079   * Zookeeper client library performance and notifications. The app will
080   * typically remember the state change and return from the callback. It will
081   * then proceed with implementing actions around that state change. It is
082   * possible to be called back again while these actions are in flight and the
083   * app should handle this scenario.
084   */
085  public interface ActiveStandbyElectorCallback {
086    /**
087     * This method is called when the app becomes the active leader.
088     * If the service fails to become active, it should throw
089     * ServiceFailedException. This will cause the elector to
090     * sleep for a short period, then re-join the election.
091     * 
092     * Callback implementations are expected to manage their own
093     * timeouts (e.g. when making an RPC to a remote node).
094     */
095    void becomeActive() throws ServiceFailedException;
096
097    /**
098     * This method is called when the app becomes a standby
099     */
100    void becomeStandby();
101
102    /**
103     * If the elector gets disconnected from Zookeeper and does not know about
104     * the lock state, then it will notify the service via the enterNeutralMode
105     * interface. The service may choose to ignore this or stop doing state
106     * changing operations. Upon reconnection, the elector verifies the leader
107     * status and calls back on the becomeActive and becomeStandby app
108     * interfaces. <br/>
109     * Zookeeper disconnects can happen due to network issues or loss of
110     * Zookeeper quorum. Thus enterNeutralMode can be used to guard against
111     * split-brain issues. In such situations it might be prudent to call
112     * becomeStandby too. However, such state change operations might be
113     * expensive and enterNeutralMode can help guard against doing that for
114     * transient issues.
115     */
116    void enterNeutralMode();
117
118    /**
119     * If there is any fatal error (e.g. wrong ACL's, unexpected Zookeeper
120     * errors or Zookeeper persistent unavailability) then notifyFatalError is
121     * called to notify the app about it.
122     */
123    void notifyFatalError(String errorMessage);
124
125    /**
126     * If an old active has failed, rather than exited gracefully, then
127     * the new active may need to take some fencing actions against it
128     * before proceeding with failover.
129     * 
130     * @param oldActiveData the application data provided by the prior active
131     */
132    void fenceOldActive(byte[] oldActiveData);
133  }
134
135  /**
136   * Name of the lock znode used by the library. Protected for access in test
137   * classes
138   */
139  @VisibleForTesting
140  protected static final String LOCK_FILENAME = "ActiveStandbyElectorLock";
141  @VisibleForTesting
142  protected static final String BREADCRUMB_FILENAME = "ActiveBreadCrumb";
143
144  public static final Log LOG = LogFactory.getLog(ActiveStandbyElector.class);
145
146  private static final int SLEEP_AFTER_FAILURE_TO_BECOME_ACTIVE = 1000;
147
148  private static enum ConnectionState {
149    DISCONNECTED, CONNECTED, TERMINATED
150  };
151
152  static enum State {
153    INIT, ACTIVE, STANDBY, NEUTRAL
154  };
155
156  private State state = State.INIT;
157  private int createRetryCount = 0;
158  private int statRetryCount = 0;
159  private ZooKeeper zkClient;
160  private WatcherWithClientRef watcher;
161  private ConnectionState zkConnectionState = ConnectionState.TERMINATED;
162
163  private final ActiveStandbyElectorCallback appClient;
164  private final String zkHostPort;
165  private final int zkSessionTimeout;
166  private final List<ACL> zkAcl;
167  private final List<ZKAuthInfo> zkAuthInfo;
168  private byte[] appData;
169  private final String zkLockFilePath;
170  private final String zkBreadCrumbPath;
171  private final String znodeWorkingDir;
172  private final int maxRetryNum;
173
174  private Lock sessionReestablishLockForTests = new ReentrantLock();
175  private boolean wantToBeInElection;
176  private boolean monitorLockNodePending = false;
177  private ZooKeeper monitorLockNodeClient;
178
179  /**
180   * Create a new ActiveStandbyElector object <br/>
181   * The elector is created by providing to it the Zookeeper configuration, the
182   * parent znode under which to create the znode and a reference to the
183   * callback interface. <br/>
184   * The parent znode name must be the same for all service instances and
185   * different across services. <br/>
186   * After the leader has been lost, a new leader will be elected after the
187   * session timeout expires. Hence, the app must set this parameter based on
188   * its needs for failure response time. The session timeout must be greater
189   * than the Zookeeper disconnect timeout and is recommended to be 3X that
190   * value to enable Zookeeper to retry transient disconnections. Setting a very
191   * short session timeout may result in frequent transitions between active and
192   * standby states during issues like network outages/GS pauses.
193   * 
194   * @param zookeeperHostPorts
195   *          ZooKeeper hostPort for all ZooKeeper servers
196   * @param zookeeperSessionTimeout
197   *          ZooKeeper session timeout
198   * @param parentZnodeName
199   *          znode under which to create the lock
200   * @param acl
201   *          ZooKeeper ACL's
202   * @param authInfo a list of authentication credentials to add to the
203   *                 ZK connection
204   * @param app
205   *          reference to callback interface object
206   * @throws IOException
207   * @throws HadoopIllegalArgumentException
208   */
209  public ActiveStandbyElector(String zookeeperHostPorts,
210      int zookeeperSessionTimeout, String parentZnodeName, List<ACL> acl,
211      List<ZKAuthInfo> authInfo, ActiveStandbyElectorCallback app,
212      int maxRetryNum) throws IOException, HadoopIllegalArgumentException,
213      KeeperException {
214    this(zookeeperHostPorts, zookeeperSessionTimeout, parentZnodeName, acl,
215      authInfo, app, maxRetryNum, true);
216  }
217
218  /**
219   * Create a new ActiveStandbyElector object <br/>
220   * The elector is created by providing to it the Zookeeper configuration, the
221   * parent znode under which to create the znode and a reference to the
222   * callback interface. <br/>
223   * The parent znode name must be the same for all service instances and
224   * different across services. <br/>
225   * After the leader has been lost, a new leader will be elected after the
226   * session timeout expires. Hence, the app must set this parameter based on
227   * its needs for failure response time. The session timeout must be greater
228   * than the Zookeeper disconnect timeout and is recommended to be 3X that
229   * value to enable Zookeeper to retry transient disconnections. Setting a very
230   * short session timeout may result in frequent transitions between active and
231   * standby states during issues like network outages/GS pauses.
232   * 
233   * @param zookeeperHostPorts
234   *          ZooKeeper hostPort for all ZooKeeper servers
235   * @param zookeeperSessionTimeout
236   *          ZooKeeper session timeout
237   * @param parentZnodeName
238   *          znode under which to create the lock
239   * @param acl
240   *          ZooKeeper ACL's
241   * @param authInfo a list of authentication credentials to add to the
242   *                 ZK connection
243   * @param app
244   *          reference to callback interface object
245   * @param failFast
246   *          whether need to add the retry when establishing ZK connection.
247   * @throws IOException
248   * @throws HadoopIllegalArgumentException
249   */
250  public ActiveStandbyElector(String zookeeperHostPorts,
251      int zookeeperSessionTimeout, String parentZnodeName, List<ACL> acl,
252      List<ZKAuthInfo> authInfo, ActiveStandbyElectorCallback app,
253      int maxRetryNum, boolean failFast) throws IOException,
254      HadoopIllegalArgumentException, KeeperException {
255    if (app == null || acl == null || parentZnodeName == null
256        || zookeeperHostPorts == null || zookeeperSessionTimeout <= 0) {
257      throw new HadoopIllegalArgumentException("Invalid argument");
258    }
259    zkHostPort = zookeeperHostPorts;
260    zkSessionTimeout = zookeeperSessionTimeout;
261    zkAcl = acl;
262    zkAuthInfo = authInfo;
263    appClient = app;
264    znodeWorkingDir = parentZnodeName;
265    zkLockFilePath = znodeWorkingDir + "/" + LOCK_FILENAME;
266    zkBreadCrumbPath = znodeWorkingDir + "/" + BREADCRUMB_FILENAME;
267    this.maxRetryNum = maxRetryNum;
268
269    // establish the ZK Connection for future API calls
270    if (failFast) {
271      createConnection();
272    } else {
273      reEstablishSession();
274    }
275  }
276
277  /**
278   * To participate in election, the app will call joinElection. The result will
279   * be notified by a callback on either the becomeActive or becomeStandby app
280   * interfaces. <br/>
281   * After this the elector will automatically monitor the leader status and
282   * perform re-election if necessary<br/>
283   * The app could potentially start off in standby mode and ignore the
284   * becomeStandby call.
285   * 
286   * @param data
287   *          to be set by the app. non-null data must be set.
288   * @throws HadoopIllegalArgumentException
289   *           if valid data is not supplied
290   */
291  public synchronized void joinElection(byte[] data)
292      throws HadoopIllegalArgumentException {
293    
294    if (data == null) {
295      throw new HadoopIllegalArgumentException("data cannot be null");
296    }
297    
298    if (wantToBeInElection) {
299      LOG.info("Already in election. Not re-connecting.");
300      return;
301    }
302
303    appData = new byte[data.length];
304    System.arraycopy(data, 0, appData, 0, data.length);
305
306    if (LOG.isDebugEnabled()) {
307      LOG.debug("Attempting active election for " + this);
308    }
309    joinElectionInternal();
310  }
311  
312  /**
313   * @return true if the configured parent znode exists
314   */
315  public synchronized boolean parentZNodeExists()
316      throws IOException, InterruptedException {
317    Preconditions.checkState(zkClient != null);
318    try {
319      return zkClient.exists(znodeWorkingDir, false) != null;
320    } catch (KeeperException e) {
321      throw new IOException("Couldn't determine existence of znode '" +
322          znodeWorkingDir + "'", e);
323    }
324  }
325
326  /**
327   * Utility function to ensure that the configured base znode exists.
328   * This recursively creates the znode as well as all of its parents.
329   */
330  public synchronized void ensureParentZNode()
331      throws IOException, InterruptedException {
332    Preconditions.checkState(!wantToBeInElection,
333        "ensureParentZNode() may not be called while in the election");
334
335    String pathParts[] = znodeWorkingDir.split("/");
336    Preconditions.checkArgument(pathParts.length >= 1 &&
337        pathParts[0].isEmpty(),
338        "Invalid path: %s", znodeWorkingDir);
339    
340    StringBuilder sb = new StringBuilder();
341    for (int i = 1; i < pathParts.length; i++) {
342      sb.append("/").append(pathParts[i]);
343      String prefixPath = sb.toString();
344      LOG.debug("Ensuring existence of " + prefixPath);
345      try {
346        createWithRetries(prefixPath, new byte[]{}, zkAcl, CreateMode.PERSISTENT);
347      } catch (KeeperException e) {
348        if (isNodeExists(e.code())) {
349          // This is OK - just ensuring existence.
350          continue;
351        } else {
352          throw new IOException("Couldn't create " + prefixPath, e);
353        }
354      }
355    }
356    
357    LOG.info("Successfully created " + znodeWorkingDir + " in ZK.");
358  }
359  
360  /**
361   * Clear all of the state held within the parent ZNode.
362   * This recursively deletes everything within the znode as well as the
363   * parent znode itself. It should only be used when it's certain that
364   * no electors are currently participating in the election.
365   */
366  public synchronized void clearParentZNode()
367      throws IOException, InterruptedException {
368    Preconditions.checkState(!wantToBeInElection,
369        "clearParentZNode() may not be called while in the election");
370
371    try {
372      LOG.info("Recursively deleting " + znodeWorkingDir + " from ZK...");
373
374      zkDoWithRetries(new ZKAction<Void>() {
375        @Override
376        public Void run() throws KeeperException, InterruptedException {
377          ZKUtil.deleteRecursive(zkClient, znodeWorkingDir);
378          return null;
379        }
380      });
381    } catch (KeeperException e) {
382      throw new IOException("Couldn't clear parent znode " + znodeWorkingDir,
383          e);
384    }
385    LOG.info("Successfully deleted " + znodeWorkingDir + " from ZK.");
386  }
387
388
389  /**
390   * Any service instance can drop out of the election by calling quitElection. 
391   * <br/>
392   * This will lose any leader status, if held, and stop monitoring of the lock
393   * node. <br/>
394   * If the instance wants to participate in election again, then it needs to
395   * call joinElection(). <br/>
396   * This allows service instances to take themselves out of rotation for known
397   * impending unavailable states (e.g. long GC pause or software upgrade).
398   * 
399   * @param needFence true if the underlying daemon may need to be fenced
400   * if a failover occurs due to dropping out of the election.
401   */
402  public synchronized void quitElection(boolean needFence) {
403    LOG.info("Yielding from election");
404    if (!needFence && state == State.ACTIVE) {
405      // If active is gracefully going back to standby mode, remove
406      // our permanent znode so no one fences us.
407      tryDeleteOwnBreadCrumbNode();
408    }
409    reset();
410    wantToBeInElection = false;
411  }
412
413  /**
414   * Exception thrown when there is no active leader
415   */
416  public static class ActiveNotFoundException extends Exception {
417    private static final long serialVersionUID = 3505396722342846462L;
418  }
419
420  /**
421   * get data set by the active leader
422   * 
423   * @return data set by the active instance
424   * @throws ActiveNotFoundException
425   *           when there is no active leader
426   * @throws KeeperException
427   *           other zookeeper operation errors
428   * @throws InterruptedException
429   * @throws IOException
430   *           when ZooKeeper connection could not be established
431   */
432  public synchronized byte[] getActiveData() throws ActiveNotFoundException,
433      KeeperException, InterruptedException, IOException {
434    try {
435      if (zkClient == null) {
436        createConnection();
437      }
438      Stat stat = new Stat();
439      return getDataWithRetries(zkLockFilePath, false, stat);
440    } catch(KeeperException e) {
441      Code code = e.code();
442      if (isNodeDoesNotExist(code)) {
443        // handle the commonly expected cases that make sense for us
444        throw new ActiveNotFoundException();
445      } else {
446        throw e;
447      }
448    }
449  }
450
451  /**
452   * interface implementation of Zookeeper callback for create
453   */
454  @Override
455  public synchronized void processResult(int rc, String path, Object ctx,
456      String name) {
457    if (isStaleClient(ctx)) return;
458    if (LOG.isDebugEnabled()) {
459      LOG.debug("CreateNode result: " + rc + " for path: " + path
460          + " connectionState: " + zkConnectionState +
461          "  for " + this);
462    }
463
464    Code code = Code.get(rc);
465    if (isSuccess(code)) {
466      // we successfully created the znode. we are the leader. start monitoring
467      if (becomeActive()) {
468        monitorActiveStatus();
469      } else {
470        reJoinElectionAfterFailureToBecomeActive();
471      }
472      return;
473    }
474
475    if (isNodeExists(code)) {
476      if (createRetryCount == 0) {
477        // znode exists and we did not retry the operation. so a different
478        // instance has created it. become standby and monitor lock.
479        becomeStandby();
480      }
481      // if we had retried then the znode could have been created by our first
482      // attempt to the server (that we lost) and this node exists response is
483      // for the second attempt. verify this case via ephemeral node owner. this
484      // will happen on the callback for monitoring the lock.
485      monitorActiveStatus();
486      return;
487    }
488
489    String errorMessage = "Received create error from Zookeeper. code:"
490        + code.toString() + " for path " + path;
491    LOG.debug(errorMessage);
492
493    if (shouldRetry(code)) {
494      if (createRetryCount < maxRetryNum) {
495        LOG.debug("Retrying createNode createRetryCount: " + createRetryCount);
496        ++createRetryCount;
497        createLockNodeAsync();
498        return;
499      }
500      errorMessage = errorMessage
501          + ". Not retrying further znode create connection errors.";
502    } else if (isSessionExpired(code)) {
503      // This isn't fatal - the client Watcher will re-join the election
504      LOG.warn("Lock acquisition failed because session was lost");
505      return;
506    }
507
508    fatalError(errorMessage);
509  }
510
511  /**
512   * interface implementation of Zookeeper callback for monitor (exists)
513   */
514  @Override
515  public synchronized void processResult(int rc, String path, Object ctx,
516      Stat stat) {
517    if (isStaleClient(ctx)) return;
518    monitorLockNodePending = false;
519
520    assert wantToBeInElection :
521        "Got a StatNode result after quitting election";
522
523    if (LOG.isDebugEnabled()) {
524      LOG.debug("StatNode result: " + rc + " for path: " + path
525          + " connectionState: " + zkConnectionState + " for " + this);
526    }
527
528    Code code = Code.get(rc);
529    if (isSuccess(code)) {
530      // the following owner check completes verification in case the lock znode
531      // creation was retried
532      if (stat.getEphemeralOwner() == zkClient.getSessionId()) {
533        // we own the lock znode. so we are the leader
534        if (!becomeActive()) {
535          reJoinElectionAfterFailureToBecomeActive();
536        }
537      } else {
538        // we dont own the lock znode. so we are a standby.
539        becomeStandby();
540      }
541      // the watch set by us will notify about changes
542      return;
543    }
544
545    if (isNodeDoesNotExist(code)) {
546      // the lock znode disappeared before we started monitoring it
547      enterNeutralMode();
548      joinElectionInternal();
549      return;
550    }
551
552    String errorMessage = "Received stat error from Zookeeper. code:"
553        + code.toString();
554    LOG.debug(errorMessage);
555
556    if (shouldRetry(code)) {
557      if (statRetryCount < maxRetryNum) {
558        ++statRetryCount;
559        monitorLockNodeAsync();
560        return;
561      }
562      errorMessage = errorMessage
563          + ". Not retrying further znode monitoring connection errors.";
564    } else if (isSessionExpired(code)) {
565      // This isn't fatal - the client Watcher will re-join the election
566      LOG.warn("Lock monitoring failed because session was lost");
567      return;
568    }
569
570    fatalError(errorMessage);
571  }
572
573  /**
574   * We failed to become active. Re-join the election, but
575   * sleep for a few seconds after terminating our existing
576   * session, so that other nodes have a chance to become active.
577   * The failure to become active is already logged inside
578   * becomeActive().
579   */
580  private void reJoinElectionAfterFailureToBecomeActive() {
581    reJoinElection(SLEEP_AFTER_FAILURE_TO_BECOME_ACTIVE);
582  }
583
584  /**
585   * interface implementation of Zookeeper watch events (connection and node),
586   * proxied by {@link WatcherWithClientRef}.
587   */
588  synchronized void processWatchEvent(ZooKeeper zk, WatchedEvent event) {
589    Event.EventType eventType = event.getType();
590    if (isStaleClient(zk)) return;
591    if (LOG.isDebugEnabled()) {
592      LOG.debug("Watcher event type: " + eventType + " with state:"
593          + event.getState() + " for path:" + event.getPath()
594          + " connectionState: " + zkConnectionState
595          + " for " + this);
596    }
597
598    if (eventType == Event.EventType.None) {
599      // the connection state has changed
600      switch (event.getState()) {
601      case SyncConnected:
602        LOG.info("Session connected.");
603        // if the listener was asked to move to safe state then it needs to
604        // be undone
605        ConnectionState prevConnectionState = zkConnectionState;
606        zkConnectionState = ConnectionState.CONNECTED;
607        if (prevConnectionState == ConnectionState.DISCONNECTED &&
608            wantToBeInElection) {
609          monitorActiveStatus();
610        }
611        break;
612      case Disconnected:
613        LOG.info("Session disconnected. Entering neutral mode...");
614
615        // ask the app to move to safe state because zookeeper connection
616        // is not active and we dont know our state
617        zkConnectionState = ConnectionState.DISCONNECTED;
618        enterNeutralMode();
619        break;
620      case Expired:
621        // the connection got terminated because of session timeout
622        // call listener to reconnect
623        LOG.info("Session expired. Entering neutral mode and rejoining...");
624        enterNeutralMode();
625        reJoinElection(0);
626        break;
627      case SaslAuthenticated:
628        LOG.info("Successfully authenticated to ZooKeeper using SASL.");
629        break;
630      default:
631        fatalError("Unexpected Zookeeper watch event state: "
632            + event.getState());
633        break;
634      }
635
636      return;
637    }
638
639    // a watch on lock path in zookeeper has fired. so something has changed on
640    // the lock. ideally we should check that the path is the same as the lock
641    // path but trusting zookeeper for now
642    String path = event.getPath();
643    if (path != null) {
644      switch (eventType) {
645      case NodeDeleted:
646        if (state == State.ACTIVE) {
647          enterNeutralMode();
648        }
649        joinElectionInternal();
650        break;
651      case NodeDataChanged:
652        monitorActiveStatus();
653        break;
654      default:
655        if (LOG.isDebugEnabled()) {
656          LOG.debug("Unexpected node event: " + eventType + " for path: " + path);
657        }
658        monitorActiveStatus();
659      }
660
661      return;
662    }
663
664    // some unexpected error has occurred
665    fatalError("Unexpected watch error from Zookeeper");
666  }
667
668  /**
669   * Get a new zookeeper client instance. protected so that test class can
670   * inherit and mock out the zookeeper instance
671   * 
672   * @return new zookeeper client instance
673   * @throws IOException
674   * @throws KeeperException zookeeper connectionloss exception
675   */
676  protected synchronized ZooKeeper connectToZooKeeper() throws IOException,
677      KeeperException {
678    
679    // Unfortunately, the ZooKeeper constructor connects to ZooKeeper and
680    // may trigger the Connected event immediately. So, if we register the
681    // watcher after constructing ZooKeeper, we may miss that event. Instead,
682    // we construct the watcher first, and have it block any events it receives
683    // before we can set its ZooKeeper reference.
684    watcher = new WatcherWithClientRef();
685    ZooKeeper zk = createZooKeeper();
686    watcher.setZooKeeperRef(zk);
687
688    // Wait for the asynchronous success/failure. This may throw an exception
689    // if we don't connect within the session timeout.
690    watcher.waitForZKConnectionEvent(zkSessionTimeout);
691    
692    for (ZKAuthInfo auth : zkAuthInfo) {
693      zk.addAuthInfo(auth.getScheme(), auth.getAuth());
694    }
695    return zk;
696  }
697
698  /**
699   * Get a new zookeeper client instance. protected so that test class can
700   * inherit and pass in a mock object for zookeeper
701   *
702   * @return new zookeeper client instance
703   * @throws IOException
704   */
705  protected ZooKeeper createZooKeeper() throws IOException {
706    return new ZooKeeper(zkHostPort, zkSessionTimeout, watcher);
707  }
708
709  private void fatalError(String errorMessage) {
710    LOG.fatal(errorMessage);
711    reset();
712    appClient.notifyFatalError(errorMessage);
713  }
714
715  private void monitorActiveStatus() {
716    assert wantToBeInElection;
717    if (LOG.isDebugEnabled()) {
718      LOG.debug("Monitoring active leader for " + this);
719    }
720    statRetryCount = 0;
721    monitorLockNodeAsync();
722  }
723
724  private void joinElectionInternal() {
725    Preconditions.checkState(appData != null,
726        "trying to join election without any app data");
727    if (zkClient == null) {
728      if (!reEstablishSession()) {
729        fatalError("Failed to reEstablish connection with ZooKeeper");
730        return;
731      }
732    }
733
734    createRetryCount = 0;
735    wantToBeInElection = true;
736    createLockNodeAsync();
737  }
738
739  private void reJoinElection(int sleepTime) {
740    LOG.info("Trying to re-establish ZK session");
741    
742    // Some of the test cases rely on expiring the ZK sessions and
743    // ensuring that the other node takes over. But, there's a race
744    // where the original lease holder could reconnect faster than the other
745    // thread manages to take the lock itself. This lock allows the
746    // tests to block the reconnection. It's a shame that this leaked
747    // into non-test code, but the lock is only acquired here so will never
748    // be contended.
749    sessionReestablishLockForTests.lock();
750    try {
751      terminateConnection();
752      sleepFor(sleepTime);
753      // Should not join election even before the SERVICE is reported
754      // as HEALTHY from ZKFC monitoring.
755      if (appData != null) {
756        joinElectionInternal();
757      } else {
758        LOG.info("Not joining election since service has not yet been " +
759            "reported as healthy.");
760      }
761    } finally {
762      sessionReestablishLockForTests.unlock();
763    }
764  }
765
766  /**
767   * Sleep for the given number of milliseconds.
768   * This is non-static, and separated out, so that unit tests
769   * can override the behavior not to sleep.
770   */
771  @VisibleForTesting
772  protected void sleepFor(int sleepMs) {
773    if (sleepMs > 0) {
774      try {
775        Thread.sleep(sleepMs);
776      } catch (InterruptedException e) {
777        Thread.currentThread().interrupt();
778      }
779    }
780  }
781
782  @VisibleForTesting
783  void preventSessionReestablishmentForTests() {
784    sessionReestablishLockForTests.lock();
785  }
786  
787  @VisibleForTesting
788  void allowSessionReestablishmentForTests() {
789    sessionReestablishLockForTests.unlock();
790  }
791  
792  @VisibleForTesting
793  synchronized long getZKSessionIdForTests() {
794    if (zkClient != null) {
795      return zkClient.getSessionId();
796    } else {
797      return -1;
798    }
799  }
800  
801  @VisibleForTesting
802  synchronized State getStateForTests() {
803    return state;
804  }
805
806  @VisibleForTesting
807  synchronized boolean isMonitorLockNodePending() {
808    return monitorLockNodePending;
809  }
810
811  private boolean reEstablishSession() {
812    int connectionRetryCount = 0;
813    boolean success = false;
814    while(!success && connectionRetryCount < maxRetryNum) {
815      if (LOG.isDebugEnabled()) {
816        LOG.debug("Establishing zookeeper connection for " + this);
817      }
818      try {
819        createConnection();
820        success = true;
821      } catch(IOException e) {
822        LOG.warn(e);
823        sleepFor(5000);
824      } catch(KeeperException e) {
825        LOG.warn(e);
826        sleepFor(5000);
827      }
828      ++connectionRetryCount;
829    }
830    return success;
831  }
832
833  private void createConnection() throws IOException, KeeperException {
834    if (zkClient != null) {
835      try {
836        zkClient.close();
837      } catch (InterruptedException e) {
838        throw new IOException("Interrupted while closing ZK",
839            e);
840      }
841      zkClient = null;
842      watcher = null;
843    }
844    zkClient = connectToZooKeeper();
845    if (LOG.isDebugEnabled()) {
846      LOG.debug("Created new connection for " + this);
847    }
848  }
849
850  @InterfaceAudience.Private
851  public synchronized void terminateConnection() {
852    if (zkClient == null) {
853      return;
854    }
855    if (LOG.isDebugEnabled()) {
856      LOG.debug("Terminating ZK connection for " + this);
857    }
858    ZooKeeper tempZk = zkClient;
859    zkClient = null;
860    watcher = null;
861    try {
862      tempZk.close();
863    } catch(InterruptedException e) {
864      LOG.warn(e);
865    }
866    zkConnectionState = ConnectionState.TERMINATED;
867    wantToBeInElection = false;
868  }
869
870  private void reset() {
871    state = State.INIT;
872    terminateConnection();
873  }
874
875  private boolean becomeActive() {
876    assert wantToBeInElection;
877    if (state == State.ACTIVE) {
878      // already active
879      return true;
880    }
881    try {
882      Stat oldBreadcrumbStat = fenceOldActive();
883      writeBreadCrumbNode(oldBreadcrumbStat);
884
885      if (LOG.isDebugEnabled()) {
886        LOG.debug("Becoming active for " + this);
887      }
888      appClient.becomeActive();
889      state = State.ACTIVE;
890      return true;
891    } catch (Exception e) {
892      LOG.warn("Exception handling the winning of election", e);
893      // Caller will handle quitting and rejoining the election.
894      return false;
895    }
896  }
897
898  /**
899   * Write the "ActiveBreadCrumb" node, indicating that this node may need
900   * to be fenced on failover.
901   * @param oldBreadcrumbStat 
902   */
903  private void writeBreadCrumbNode(Stat oldBreadcrumbStat)
904      throws KeeperException, InterruptedException {
905    Preconditions.checkState(appData != null, "no appdata");
906    
907    LOG.info("Writing znode " + zkBreadCrumbPath +
908        " to indicate that the local node is the most recent active...");
909    if (oldBreadcrumbStat == null) {
910      // No previous active, just create the node
911      createWithRetries(zkBreadCrumbPath, appData, zkAcl,
912        CreateMode.PERSISTENT);
913    } else {
914      // There was a previous active, update the node
915      setDataWithRetries(zkBreadCrumbPath, appData, oldBreadcrumbStat.getVersion());
916    }
917  }
918  
919  /**
920   * Try to delete the "ActiveBreadCrumb" node when gracefully giving up
921   * active status.
922   * If this fails, it will simply warn, since the graceful release behavior
923   * is only an optimization.
924   */
925  private void tryDeleteOwnBreadCrumbNode() {
926    assert state == State.ACTIVE;
927    LOG.info("Deleting bread-crumb of active node...");
928    
929    // Sanity check the data. This shouldn't be strictly necessary,
930    // but better to play it safe.
931    Stat stat = new Stat();
932    byte[] data = null;
933    try {
934      data = zkClient.getData(zkBreadCrumbPath, false, stat);
935
936      if (!Arrays.equals(data, appData)) {
937        throw new IllegalStateException(
938            "We thought we were active, but in fact " +
939            "the active znode had the wrong data: " +
940            StringUtils.byteToHexString(data) + " (stat=" + stat + ")");
941      }
942      
943      deleteWithRetries(zkBreadCrumbPath, stat.getVersion());
944    } catch (Exception e) {
945      LOG.warn("Unable to delete our own bread-crumb of being active at " +
946          zkBreadCrumbPath + ": " + e.getLocalizedMessage() + ". " +
947          "Expecting to be fenced by the next active.");
948    }
949  }
950
951  /**
952   * If there is a breadcrumb node indicating that another node may need
953   * fencing, try to fence that node.
954   * @return the Stat of the breadcrumb node that was read, or null
955   * if no breadcrumb node existed
956   */
957  private Stat fenceOldActive() throws InterruptedException, KeeperException {
958    final Stat stat = new Stat();
959    byte[] data;
960    LOG.info("Checking for any old active which needs to be fenced...");
961    try {
962      data = zkDoWithRetries(new ZKAction<byte[]>() {
963        @Override
964        public byte[] run() throws KeeperException, InterruptedException {
965          return zkClient.getData(zkBreadCrumbPath, false, stat);
966        }
967      });
968    } catch (KeeperException ke) {
969      if (isNodeDoesNotExist(ke.code())) {
970        LOG.info("No old node to fence");
971        return null;
972      }
973      
974      // If we failed to read for any other reason, then likely we lost
975      // our session, or we don't have permissions, etc. In any case,
976      // we probably shouldn't become active, and failing the whole
977      // thing is the best bet.
978      throw ke;
979    }
980
981    LOG.info("Old node exists: " + StringUtils.byteToHexString(data));
982    if (Arrays.equals(data, appData)) {
983      LOG.info("But old node has our own data, so don't need to fence it.");
984    } else {
985      appClient.fenceOldActive(data);
986    }
987    return stat;
988  }
989
990  private void becomeStandby() {
991    if (state != State.STANDBY) {
992      if (LOG.isDebugEnabled()) {
993        LOG.debug("Becoming standby for " + this);
994      }
995      state = State.STANDBY;
996      appClient.becomeStandby();
997    }
998  }
999
1000  private void enterNeutralMode() {
1001    if (state != State.NEUTRAL) {
1002      if (LOG.isDebugEnabled()) {
1003        LOG.debug("Entering neutral mode for " + this);
1004      }
1005      state = State.NEUTRAL;
1006      appClient.enterNeutralMode();
1007    }
1008  }
1009
1010  private void createLockNodeAsync() {
1011    zkClient.create(zkLockFilePath, appData, zkAcl, CreateMode.EPHEMERAL,
1012        this, zkClient);
1013  }
1014
1015  private void monitorLockNodeAsync() {
1016    if (monitorLockNodePending && monitorLockNodeClient == zkClient) {
1017      LOG.info("Ignore duplicate monitor lock-node request.");
1018      return;
1019    }
1020    monitorLockNodePending = true;
1021    monitorLockNodeClient = zkClient;
1022    zkClient.exists(zkLockFilePath,
1023        watcher, this,
1024        zkClient);
1025  }
1026
1027  private String createWithRetries(final String path, final byte[] data,
1028      final List<ACL> acl, final CreateMode mode)
1029      throws InterruptedException, KeeperException {
1030    return zkDoWithRetries(new ZKAction<String>() {
1031      @Override
1032      public String run() throws KeeperException, InterruptedException {
1033        return zkClient.create(path, data, acl, mode);
1034      }
1035    });
1036  }
1037
1038  private byte[] getDataWithRetries(final String path, final boolean watch,
1039      final Stat stat) throws InterruptedException, KeeperException {
1040    return zkDoWithRetries(new ZKAction<byte[]>() {
1041      @Override
1042      public byte[] run() throws KeeperException, InterruptedException {
1043        return zkClient.getData(path, watch, stat);
1044      }
1045    });
1046  }
1047
1048  private Stat setDataWithRetries(final String path, final byte[] data,
1049      final int version) throws InterruptedException, KeeperException {
1050    return zkDoWithRetries(new ZKAction<Stat>() {
1051      @Override
1052      public Stat run() throws KeeperException, InterruptedException {
1053        return zkClient.setData(path, data, version);
1054      }
1055    });
1056  }
1057  
1058  private void deleteWithRetries(final String path, final int version)
1059      throws KeeperException, InterruptedException {
1060    zkDoWithRetries(new ZKAction<Void>() {
1061      @Override
1062      public Void run() throws KeeperException, InterruptedException {
1063        zkClient.delete(path, version);
1064        return null;
1065      }
1066    });
1067  }
1068
1069  private <T> T zkDoWithRetries(ZKAction<T> action) throws KeeperException,
1070      InterruptedException {
1071    int retry = 0;
1072    while (true) {
1073      try {
1074        return action.run();
1075      } catch (KeeperException ke) {
1076        if (shouldRetry(ke.code()) && ++retry < maxRetryNum) {
1077          continue;
1078        }
1079        throw ke;
1080      }
1081    }
1082  }
1083
1084  private interface ZKAction<T> {
1085    T run() throws KeeperException, InterruptedException; 
1086  }
1087  
1088  /**
1089   * The callbacks and watchers pass a reference to the ZK client
1090   * which made the original call. We don't want to take action
1091   * based on any callbacks from prior clients after we quit
1092   * the election.
1093   * @param ctx the ZK client passed into the watcher
1094   * @return true if it matches the current client
1095   */
1096  private synchronized boolean isStaleClient(Object ctx) {
1097    Preconditions.checkNotNull(ctx);
1098    if (zkClient != (ZooKeeper)ctx) {
1099      LOG.warn("Ignoring stale result from old client with sessionId " +
1100          String.format("0x%08x", ((ZooKeeper)ctx).getSessionId()));
1101      return true;
1102    }
1103    return false;
1104  }
1105
1106  /**
1107   * Watcher implementation which keeps a reference around to the
1108   * original ZK connection, and passes it back along with any
1109   * events.
1110   */
1111  private final class WatcherWithClientRef implements Watcher {
1112    private ZooKeeper zk;
1113    
1114    /**
1115     * Latch fired whenever any event arrives. This is used in order
1116     * to wait for the Connected event when the client is first created.
1117     */
1118    private CountDownLatch hasReceivedEvent = new CountDownLatch(1);
1119
1120    /**
1121     * Latch used to wait until the reference to ZooKeeper is set.
1122     */
1123    private CountDownLatch hasSetZooKeeper = new CountDownLatch(1);
1124
1125    /**
1126     * Waits for the next event from ZooKeeper to arrive.
1127     * 
1128     * @param connectionTimeoutMs zookeeper connection timeout in milliseconds
1129     * @throws KeeperException if the connection attempt times out. This will
1130     * be a ZooKeeper ConnectionLoss exception code.
1131     * @throws IOException if interrupted while connecting to ZooKeeper
1132     */
1133    private void waitForZKConnectionEvent(int connectionTimeoutMs)
1134        throws KeeperException, IOException {
1135      try {
1136        if (!hasReceivedEvent.await(connectionTimeoutMs, TimeUnit.MILLISECONDS)) {
1137          LOG.error("Connection timed out: couldn't connect to ZooKeeper in "
1138              + connectionTimeoutMs + " milliseconds");
1139          zk.close();
1140          throw KeeperException.create(Code.CONNECTIONLOSS);
1141        }
1142      } catch (InterruptedException e) {
1143        Thread.currentThread().interrupt();
1144        throw new IOException(
1145            "Interrupted when connecting to zookeeper server", e);
1146      }
1147    }
1148
1149    private void setZooKeeperRef(ZooKeeper zk) {
1150      Preconditions.checkState(this.zk == null,
1151          "zk already set -- must be set exactly once");
1152      this.zk = zk;
1153      hasSetZooKeeper.countDown();
1154    }
1155
1156    @Override
1157    public void process(WatchedEvent event) {
1158      hasReceivedEvent.countDown();
1159      try {
1160        if (!hasSetZooKeeper.await(zkSessionTimeout, TimeUnit.MILLISECONDS)) {
1161          LOG.debug("Event received with stale zk");
1162        }
1163        ActiveStandbyElector.this.processWatchEvent(
1164            zk, event);
1165      } catch (Throwable t) {
1166        fatalError(
1167            "Failed to process watcher event " + event + ": " +
1168            StringUtils.stringifyException(t));
1169      }
1170    }
1171  }
1172
1173  private static boolean isSuccess(Code code) {
1174    return (code == Code.OK);
1175  }
1176
1177  private static boolean isNodeExists(Code code) {
1178    return (code == Code.NODEEXISTS);
1179  }
1180
1181  private static boolean isNodeDoesNotExist(Code code) {
1182    return (code == Code.NONODE);
1183  }
1184  
1185  private static boolean isSessionExpired(Code code) {
1186    return (code == Code.SESSIONEXPIRED);
1187  }
1188
1189  private static boolean shouldRetry(Code code) {
1190    return code == Code.CONNECTIONLOSS || code == Code.OPERATIONTIMEOUT;
1191  }
1192  
1193  @Override
1194  public String toString() {
1195    return "elector id=" + System.identityHashCode(this) +
1196      " appData=" +
1197      ((appData == null) ? "null" : StringUtils.byteToHexString(appData)) + 
1198      " cb=" + appClient;
1199  }
1200
1201  public String getHAZookeeperConnectionState() {
1202    return this.zkConnectionState.name();
1203  }
1204}