001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.ha; 020 021import java.io.IOException; 022import java.util.Arrays; 023import java.util.List; 024import java.util.concurrent.CountDownLatch; 025import java.util.concurrent.TimeUnit; 026import java.util.concurrent.locks.Lock; 027import java.util.concurrent.locks.ReentrantLock; 028 029import org.apache.commons.logging.Log; 030import org.apache.commons.logging.LogFactory; 031import org.apache.hadoop.HadoopIllegalArgumentException; 032import org.apache.hadoop.classification.InterfaceAudience; 033import org.apache.hadoop.classification.InterfaceStability; 034import org.apache.hadoop.util.ZKUtil.ZKAuthInfo; 035import org.apache.hadoop.util.StringUtils; 036import org.apache.zookeeper.data.ACL; 037import org.apache.zookeeper.KeeperException; 038import org.apache.zookeeper.Watcher; 039import org.apache.zookeeper.WatchedEvent; 040import org.apache.zookeeper.Watcher.Event; 041import org.apache.zookeeper.ZKUtil; 042import org.apache.zookeeper.ZooKeeper; 043import org.apache.zookeeper.CreateMode; 044import org.apache.zookeeper.AsyncCallback.*; 045import org.apache.zookeeper.data.Stat; 046import org.apache.zookeeper.KeeperException.Code; 047 048import com.google.common.annotations.VisibleForTesting; 049import com.google.common.base.Preconditions; 050 051/** 052 * 053 * This class implements a simple library to perform leader election on top of 054 * Apache Zookeeper. Using Zookeeper as a coordination service, leader election 055 * can be performed by atomically creating an ephemeral lock file (znode) on 056 * Zookeeper. The service instance that successfully creates the znode becomes 057 * active and the rest become standbys. <br/> 058 * This election mechanism is only efficient for small number of election 059 * candidates (order of 10's) because contention on single znode by a large 060 * number of candidates can result in Zookeeper overload. <br/> 061 * The elector does not guarantee fencing (protection of shared resources) among 062 * service instances. After it has notified an instance about becoming a leader, 063 * then that instance must ensure that it meets the service consistency 064 * requirements. If it cannot do so, then it is recommended to quit the 065 * election. The application implements the {@link ActiveStandbyElectorCallback} 066 * to interact with the elector 067 */ 068@InterfaceAudience.Private 069@InterfaceStability.Evolving 070public class ActiveStandbyElector implements StatCallback, StringCallback { 071 072 /** 073 * Callback interface to interact with the ActiveStandbyElector object. <br/> 074 * The application will be notified with a callback only on state changes 075 * (i.e. there will never be successive calls to becomeActive without an 076 * intermediate call to enterNeutralMode). <br/> 077 * The callbacks will be running on Zookeeper client library threads. The 078 * application should return from these callbacks quickly so as not to impede 079 * Zookeeper client library performance and notifications. The app will 080 * typically remember the state change and return from the callback. It will 081 * then proceed with implementing actions around that state change. It is 082 * possible to be called back again while these actions are in flight and the 083 * app should handle this scenario. 084 */ 085 public interface ActiveStandbyElectorCallback { 086 /** 087 * This method is called when the app becomes the active leader. 088 * If the service fails to become active, it should throw 089 * ServiceFailedException. This will cause the elector to 090 * sleep for a short period, then re-join the election. 091 * 092 * Callback implementations are expected to manage their own 093 * timeouts (e.g. when making an RPC to a remote node). 094 */ 095 void becomeActive() throws ServiceFailedException; 096 097 /** 098 * This method is called when the app becomes a standby 099 */ 100 void becomeStandby(); 101 102 /** 103 * If the elector gets disconnected from Zookeeper and does not know about 104 * the lock state, then it will notify the service via the enterNeutralMode 105 * interface. The service may choose to ignore this or stop doing state 106 * changing operations. Upon reconnection, the elector verifies the leader 107 * status and calls back on the becomeActive and becomeStandby app 108 * interfaces. <br/> 109 * Zookeeper disconnects can happen due to network issues or loss of 110 * Zookeeper quorum. Thus enterNeutralMode can be used to guard against 111 * split-brain issues. In such situations it might be prudent to call 112 * becomeStandby too. However, such state change operations might be 113 * expensive and enterNeutralMode can help guard against doing that for 114 * transient issues. 115 */ 116 void enterNeutralMode(); 117 118 /** 119 * If there is any fatal error (e.g. wrong ACL's, unexpected Zookeeper 120 * errors or Zookeeper persistent unavailability) then notifyFatalError is 121 * called to notify the app about it. 122 */ 123 void notifyFatalError(String errorMessage); 124 125 /** 126 * If an old active has failed, rather than exited gracefully, then 127 * the new active may need to take some fencing actions against it 128 * before proceeding with failover. 129 * 130 * @param oldActiveData the application data provided by the prior active 131 */ 132 void fenceOldActive(byte[] oldActiveData); 133 } 134 135 /** 136 * Name of the lock znode used by the library. Protected for access in test 137 * classes 138 */ 139 @VisibleForTesting 140 protected static final String LOCK_FILENAME = "ActiveStandbyElectorLock"; 141 @VisibleForTesting 142 protected static final String BREADCRUMB_FILENAME = "ActiveBreadCrumb"; 143 144 public static final Log LOG = LogFactory.getLog(ActiveStandbyElector.class); 145 146 private static final int SLEEP_AFTER_FAILURE_TO_BECOME_ACTIVE = 1000; 147 148 private static enum ConnectionState { 149 DISCONNECTED, CONNECTED, TERMINATED 150 }; 151 152 static enum State { 153 INIT, ACTIVE, STANDBY, NEUTRAL 154 }; 155 156 private State state = State.INIT; 157 private int createRetryCount = 0; 158 private int statRetryCount = 0; 159 private ZooKeeper zkClient; 160 private WatcherWithClientRef watcher; 161 private ConnectionState zkConnectionState = ConnectionState.TERMINATED; 162 163 private final ActiveStandbyElectorCallback appClient; 164 private final String zkHostPort; 165 private final int zkSessionTimeout; 166 private final List<ACL> zkAcl; 167 private final List<ZKAuthInfo> zkAuthInfo; 168 private byte[] appData; 169 private final String zkLockFilePath; 170 private final String zkBreadCrumbPath; 171 private final String znodeWorkingDir; 172 private final int maxRetryNum; 173 174 private Lock sessionReestablishLockForTests = new ReentrantLock(); 175 private boolean wantToBeInElection; 176 private boolean monitorLockNodePending = false; 177 private ZooKeeper monitorLockNodeClient; 178 179 /** 180 * Create a new ActiveStandbyElector object <br/> 181 * The elector is created by providing to it the Zookeeper configuration, the 182 * parent znode under which to create the znode and a reference to the 183 * callback interface. <br/> 184 * The parent znode name must be the same for all service instances and 185 * different across services. <br/> 186 * After the leader has been lost, a new leader will be elected after the 187 * session timeout expires. Hence, the app must set this parameter based on 188 * its needs for failure response time. The session timeout must be greater 189 * than the Zookeeper disconnect timeout and is recommended to be 3X that 190 * value to enable Zookeeper to retry transient disconnections. Setting a very 191 * short session timeout may result in frequent transitions between active and 192 * standby states during issues like network outages/GS pauses. 193 * 194 * @param zookeeperHostPorts 195 * ZooKeeper hostPort for all ZooKeeper servers 196 * @param zookeeperSessionTimeout 197 * ZooKeeper session timeout 198 * @param parentZnodeName 199 * znode under which to create the lock 200 * @param acl 201 * ZooKeeper ACL's 202 * @param authInfo a list of authentication credentials to add to the 203 * ZK connection 204 * @param app 205 * reference to callback interface object 206 * @throws IOException 207 * @throws HadoopIllegalArgumentException 208 */ 209 public ActiveStandbyElector(String zookeeperHostPorts, 210 int zookeeperSessionTimeout, String parentZnodeName, List<ACL> acl, 211 List<ZKAuthInfo> authInfo, ActiveStandbyElectorCallback app, 212 int maxRetryNum) throws IOException, HadoopIllegalArgumentException, 213 KeeperException { 214 this(zookeeperHostPorts, zookeeperSessionTimeout, parentZnodeName, acl, 215 authInfo, app, maxRetryNum, true); 216 } 217 218 /** 219 * Create a new ActiveStandbyElector object <br/> 220 * The elector is created by providing to it the Zookeeper configuration, the 221 * parent znode under which to create the znode and a reference to the 222 * callback interface. <br/> 223 * The parent znode name must be the same for all service instances and 224 * different across services. <br/> 225 * After the leader has been lost, a new leader will be elected after the 226 * session timeout expires. Hence, the app must set this parameter based on 227 * its needs for failure response time. The session timeout must be greater 228 * than the Zookeeper disconnect timeout and is recommended to be 3X that 229 * value to enable Zookeeper to retry transient disconnections. Setting a very 230 * short session timeout may result in frequent transitions between active and 231 * standby states during issues like network outages/GS pauses. 232 * 233 * @param zookeeperHostPorts 234 * ZooKeeper hostPort for all ZooKeeper servers 235 * @param zookeeperSessionTimeout 236 * ZooKeeper session timeout 237 * @param parentZnodeName 238 * znode under which to create the lock 239 * @param acl 240 * ZooKeeper ACL's 241 * @param authInfo a list of authentication credentials to add to the 242 * ZK connection 243 * @param app 244 * reference to callback interface object 245 * @param failFast 246 * whether need to add the retry when establishing ZK connection. 247 * @throws IOException 248 * @throws HadoopIllegalArgumentException 249 */ 250 public ActiveStandbyElector(String zookeeperHostPorts, 251 int zookeeperSessionTimeout, String parentZnodeName, List<ACL> acl, 252 List<ZKAuthInfo> authInfo, ActiveStandbyElectorCallback app, 253 int maxRetryNum, boolean failFast) throws IOException, 254 HadoopIllegalArgumentException, KeeperException { 255 if (app == null || acl == null || parentZnodeName == null 256 || zookeeperHostPorts == null || zookeeperSessionTimeout <= 0) { 257 throw new HadoopIllegalArgumentException("Invalid argument"); 258 } 259 zkHostPort = zookeeperHostPorts; 260 zkSessionTimeout = zookeeperSessionTimeout; 261 zkAcl = acl; 262 zkAuthInfo = authInfo; 263 appClient = app; 264 znodeWorkingDir = parentZnodeName; 265 zkLockFilePath = znodeWorkingDir + "/" + LOCK_FILENAME; 266 zkBreadCrumbPath = znodeWorkingDir + "/" + BREADCRUMB_FILENAME; 267 this.maxRetryNum = maxRetryNum; 268 269 // establish the ZK Connection for future API calls 270 if (failFast) { 271 createConnection(); 272 } else { 273 reEstablishSession(); 274 } 275 } 276 277 /** 278 * To participate in election, the app will call joinElection. The result will 279 * be notified by a callback on either the becomeActive or becomeStandby app 280 * interfaces. <br/> 281 * After this the elector will automatically monitor the leader status and 282 * perform re-election if necessary<br/> 283 * The app could potentially start off in standby mode and ignore the 284 * becomeStandby call. 285 * 286 * @param data 287 * to be set by the app. non-null data must be set. 288 * @throws HadoopIllegalArgumentException 289 * if valid data is not supplied 290 */ 291 public synchronized void joinElection(byte[] data) 292 throws HadoopIllegalArgumentException { 293 294 if (data == null) { 295 throw new HadoopIllegalArgumentException("data cannot be null"); 296 } 297 298 if (wantToBeInElection) { 299 LOG.info("Already in election. Not re-connecting."); 300 return; 301 } 302 303 appData = new byte[data.length]; 304 System.arraycopy(data, 0, appData, 0, data.length); 305 306 if (LOG.isDebugEnabled()) { 307 LOG.debug("Attempting active election for " + this); 308 } 309 joinElectionInternal(); 310 } 311 312 /** 313 * @return true if the configured parent znode exists 314 */ 315 public synchronized boolean parentZNodeExists() 316 throws IOException, InterruptedException { 317 Preconditions.checkState(zkClient != null); 318 try { 319 return zkClient.exists(znodeWorkingDir, false) != null; 320 } catch (KeeperException e) { 321 throw new IOException("Couldn't determine existence of znode '" + 322 znodeWorkingDir + "'", e); 323 } 324 } 325 326 /** 327 * Utility function to ensure that the configured base znode exists. 328 * This recursively creates the znode as well as all of its parents. 329 */ 330 public synchronized void ensureParentZNode() 331 throws IOException, InterruptedException { 332 Preconditions.checkState(!wantToBeInElection, 333 "ensureParentZNode() may not be called while in the election"); 334 335 String pathParts[] = znodeWorkingDir.split("/"); 336 Preconditions.checkArgument(pathParts.length >= 1 && 337 pathParts[0].isEmpty(), 338 "Invalid path: %s", znodeWorkingDir); 339 340 StringBuilder sb = new StringBuilder(); 341 for (int i = 1; i < pathParts.length; i++) { 342 sb.append("/").append(pathParts[i]); 343 String prefixPath = sb.toString(); 344 LOG.debug("Ensuring existence of " + prefixPath); 345 try { 346 createWithRetries(prefixPath, new byte[]{}, zkAcl, CreateMode.PERSISTENT); 347 } catch (KeeperException e) { 348 if (isNodeExists(e.code())) { 349 // This is OK - just ensuring existence. 350 continue; 351 } else { 352 throw new IOException("Couldn't create " + prefixPath, e); 353 } 354 } 355 } 356 357 LOG.info("Successfully created " + znodeWorkingDir + " in ZK."); 358 } 359 360 /** 361 * Clear all of the state held within the parent ZNode. 362 * This recursively deletes everything within the znode as well as the 363 * parent znode itself. It should only be used when it's certain that 364 * no electors are currently participating in the election. 365 */ 366 public synchronized void clearParentZNode() 367 throws IOException, InterruptedException { 368 Preconditions.checkState(!wantToBeInElection, 369 "clearParentZNode() may not be called while in the election"); 370 371 try { 372 LOG.info("Recursively deleting " + znodeWorkingDir + " from ZK..."); 373 374 zkDoWithRetries(new ZKAction<Void>() { 375 @Override 376 public Void run() throws KeeperException, InterruptedException { 377 ZKUtil.deleteRecursive(zkClient, znodeWorkingDir); 378 return null; 379 } 380 }); 381 } catch (KeeperException e) { 382 throw new IOException("Couldn't clear parent znode " + znodeWorkingDir, 383 e); 384 } 385 LOG.info("Successfully deleted " + znodeWorkingDir + " from ZK."); 386 } 387 388 389 /** 390 * Any service instance can drop out of the election by calling quitElection. 391 * <br/> 392 * This will lose any leader status, if held, and stop monitoring of the lock 393 * node. <br/> 394 * If the instance wants to participate in election again, then it needs to 395 * call joinElection(). <br/> 396 * This allows service instances to take themselves out of rotation for known 397 * impending unavailable states (e.g. long GC pause or software upgrade). 398 * 399 * @param needFence true if the underlying daemon may need to be fenced 400 * if a failover occurs due to dropping out of the election. 401 */ 402 public synchronized void quitElection(boolean needFence) { 403 LOG.info("Yielding from election"); 404 if (!needFence && state == State.ACTIVE) { 405 // If active is gracefully going back to standby mode, remove 406 // our permanent znode so no one fences us. 407 tryDeleteOwnBreadCrumbNode(); 408 } 409 reset(); 410 wantToBeInElection = false; 411 } 412 413 /** 414 * Exception thrown when there is no active leader 415 */ 416 public static class ActiveNotFoundException extends Exception { 417 private static final long serialVersionUID = 3505396722342846462L; 418 } 419 420 /** 421 * get data set by the active leader 422 * 423 * @return data set by the active instance 424 * @throws ActiveNotFoundException 425 * when there is no active leader 426 * @throws KeeperException 427 * other zookeeper operation errors 428 * @throws InterruptedException 429 * @throws IOException 430 * when ZooKeeper connection could not be established 431 */ 432 public synchronized byte[] getActiveData() throws ActiveNotFoundException, 433 KeeperException, InterruptedException, IOException { 434 try { 435 if (zkClient == null) { 436 createConnection(); 437 } 438 Stat stat = new Stat(); 439 return getDataWithRetries(zkLockFilePath, false, stat); 440 } catch(KeeperException e) { 441 Code code = e.code(); 442 if (isNodeDoesNotExist(code)) { 443 // handle the commonly expected cases that make sense for us 444 throw new ActiveNotFoundException(); 445 } else { 446 throw e; 447 } 448 } 449 } 450 451 /** 452 * interface implementation of Zookeeper callback for create 453 */ 454 @Override 455 public synchronized void processResult(int rc, String path, Object ctx, 456 String name) { 457 if (isStaleClient(ctx)) return; 458 if (LOG.isDebugEnabled()) { 459 LOG.debug("CreateNode result: " + rc + " for path: " + path 460 + " connectionState: " + zkConnectionState + 461 " for " + this); 462 } 463 464 Code code = Code.get(rc); 465 if (isSuccess(code)) { 466 // we successfully created the znode. we are the leader. start monitoring 467 if (becomeActive()) { 468 monitorActiveStatus(); 469 } else { 470 reJoinElectionAfterFailureToBecomeActive(); 471 } 472 return; 473 } 474 475 if (isNodeExists(code)) { 476 if (createRetryCount == 0) { 477 // znode exists and we did not retry the operation. so a different 478 // instance has created it. become standby and monitor lock. 479 becomeStandby(); 480 } 481 // if we had retried then the znode could have been created by our first 482 // attempt to the server (that we lost) and this node exists response is 483 // for the second attempt. verify this case via ephemeral node owner. this 484 // will happen on the callback for monitoring the lock. 485 monitorActiveStatus(); 486 return; 487 } 488 489 String errorMessage = "Received create error from Zookeeper. code:" 490 + code.toString() + " for path " + path; 491 LOG.debug(errorMessage); 492 493 if (shouldRetry(code)) { 494 if (createRetryCount < maxRetryNum) { 495 LOG.debug("Retrying createNode createRetryCount: " + createRetryCount); 496 ++createRetryCount; 497 createLockNodeAsync(); 498 return; 499 } 500 errorMessage = errorMessage 501 + ". Not retrying further znode create connection errors."; 502 } else if (isSessionExpired(code)) { 503 // This isn't fatal - the client Watcher will re-join the election 504 LOG.warn("Lock acquisition failed because session was lost"); 505 return; 506 } 507 508 fatalError(errorMessage); 509 } 510 511 /** 512 * interface implementation of Zookeeper callback for monitor (exists) 513 */ 514 @Override 515 public synchronized void processResult(int rc, String path, Object ctx, 516 Stat stat) { 517 if (isStaleClient(ctx)) return; 518 monitorLockNodePending = false; 519 520 assert wantToBeInElection : 521 "Got a StatNode result after quitting election"; 522 523 if (LOG.isDebugEnabled()) { 524 LOG.debug("StatNode result: " + rc + " for path: " + path 525 + " connectionState: " + zkConnectionState + " for " + this); 526 } 527 528 Code code = Code.get(rc); 529 if (isSuccess(code)) { 530 // the following owner check completes verification in case the lock znode 531 // creation was retried 532 if (stat.getEphemeralOwner() == zkClient.getSessionId()) { 533 // we own the lock znode. so we are the leader 534 if (!becomeActive()) { 535 reJoinElectionAfterFailureToBecomeActive(); 536 } 537 } else { 538 // we dont own the lock znode. so we are a standby. 539 becomeStandby(); 540 } 541 // the watch set by us will notify about changes 542 return; 543 } 544 545 if (isNodeDoesNotExist(code)) { 546 // the lock znode disappeared before we started monitoring it 547 enterNeutralMode(); 548 joinElectionInternal(); 549 return; 550 } 551 552 String errorMessage = "Received stat error from Zookeeper. code:" 553 + code.toString(); 554 LOG.debug(errorMessage); 555 556 if (shouldRetry(code)) { 557 if (statRetryCount < maxRetryNum) { 558 ++statRetryCount; 559 monitorLockNodeAsync(); 560 return; 561 } 562 errorMessage = errorMessage 563 + ". Not retrying further znode monitoring connection errors."; 564 } else if (isSessionExpired(code)) { 565 // This isn't fatal - the client Watcher will re-join the election 566 LOG.warn("Lock monitoring failed because session was lost"); 567 return; 568 } 569 570 fatalError(errorMessage); 571 } 572 573 /** 574 * We failed to become active. Re-join the election, but 575 * sleep for a few seconds after terminating our existing 576 * session, so that other nodes have a chance to become active. 577 * The failure to become active is already logged inside 578 * becomeActive(). 579 */ 580 private void reJoinElectionAfterFailureToBecomeActive() { 581 reJoinElection(SLEEP_AFTER_FAILURE_TO_BECOME_ACTIVE); 582 } 583 584 /** 585 * interface implementation of Zookeeper watch events (connection and node), 586 * proxied by {@link WatcherWithClientRef}. 587 */ 588 synchronized void processWatchEvent(ZooKeeper zk, WatchedEvent event) { 589 Event.EventType eventType = event.getType(); 590 if (isStaleClient(zk)) return; 591 if (LOG.isDebugEnabled()) { 592 LOG.debug("Watcher event type: " + eventType + " with state:" 593 + event.getState() + " for path:" + event.getPath() 594 + " connectionState: " + zkConnectionState 595 + " for " + this); 596 } 597 598 if (eventType == Event.EventType.None) { 599 // the connection state has changed 600 switch (event.getState()) { 601 case SyncConnected: 602 LOG.info("Session connected."); 603 // if the listener was asked to move to safe state then it needs to 604 // be undone 605 ConnectionState prevConnectionState = zkConnectionState; 606 zkConnectionState = ConnectionState.CONNECTED; 607 if (prevConnectionState == ConnectionState.DISCONNECTED && 608 wantToBeInElection) { 609 monitorActiveStatus(); 610 } 611 break; 612 case Disconnected: 613 LOG.info("Session disconnected. Entering neutral mode..."); 614 615 // ask the app to move to safe state because zookeeper connection 616 // is not active and we dont know our state 617 zkConnectionState = ConnectionState.DISCONNECTED; 618 enterNeutralMode(); 619 break; 620 case Expired: 621 // the connection got terminated because of session timeout 622 // call listener to reconnect 623 LOG.info("Session expired. Entering neutral mode and rejoining..."); 624 enterNeutralMode(); 625 reJoinElection(0); 626 break; 627 case SaslAuthenticated: 628 LOG.info("Successfully authenticated to ZooKeeper using SASL."); 629 break; 630 default: 631 fatalError("Unexpected Zookeeper watch event state: " 632 + event.getState()); 633 break; 634 } 635 636 return; 637 } 638 639 // a watch on lock path in zookeeper has fired. so something has changed on 640 // the lock. ideally we should check that the path is the same as the lock 641 // path but trusting zookeeper for now 642 String path = event.getPath(); 643 if (path != null) { 644 switch (eventType) { 645 case NodeDeleted: 646 if (state == State.ACTIVE) { 647 enterNeutralMode(); 648 } 649 joinElectionInternal(); 650 break; 651 case NodeDataChanged: 652 monitorActiveStatus(); 653 break; 654 default: 655 if (LOG.isDebugEnabled()) { 656 LOG.debug("Unexpected node event: " + eventType + " for path: " + path); 657 } 658 monitorActiveStatus(); 659 } 660 661 return; 662 } 663 664 // some unexpected error has occurred 665 fatalError("Unexpected watch error from Zookeeper"); 666 } 667 668 /** 669 * Get a new zookeeper client instance. protected so that test class can 670 * inherit and mock out the zookeeper instance 671 * 672 * @return new zookeeper client instance 673 * @throws IOException 674 * @throws KeeperException zookeeper connectionloss exception 675 */ 676 protected synchronized ZooKeeper connectToZooKeeper() throws IOException, 677 KeeperException { 678 679 // Unfortunately, the ZooKeeper constructor connects to ZooKeeper and 680 // may trigger the Connected event immediately. So, if we register the 681 // watcher after constructing ZooKeeper, we may miss that event. Instead, 682 // we construct the watcher first, and have it block any events it receives 683 // before we can set its ZooKeeper reference. 684 watcher = new WatcherWithClientRef(); 685 ZooKeeper zk = createZooKeeper(); 686 watcher.setZooKeeperRef(zk); 687 688 // Wait for the asynchronous success/failure. This may throw an exception 689 // if we don't connect within the session timeout. 690 watcher.waitForZKConnectionEvent(zkSessionTimeout); 691 692 for (ZKAuthInfo auth : zkAuthInfo) { 693 zk.addAuthInfo(auth.getScheme(), auth.getAuth()); 694 } 695 return zk; 696 } 697 698 /** 699 * Get a new zookeeper client instance. protected so that test class can 700 * inherit and pass in a mock object for zookeeper 701 * 702 * @return new zookeeper client instance 703 * @throws IOException 704 */ 705 protected ZooKeeper createZooKeeper() throws IOException { 706 return new ZooKeeper(zkHostPort, zkSessionTimeout, watcher); 707 } 708 709 private void fatalError(String errorMessage) { 710 LOG.fatal(errorMessage); 711 reset(); 712 appClient.notifyFatalError(errorMessage); 713 } 714 715 private void monitorActiveStatus() { 716 assert wantToBeInElection; 717 if (LOG.isDebugEnabled()) { 718 LOG.debug("Monitoring active leader for " + this); 719 } 720 statRetryCount = 0; 721 monitorLockNodeAsync(); 722 } 723 724 private void joinElectionInternal() { 725 Preconditions.checkState(appData != null, 726 "trying to join election without any app data"); 727 if (zkClient == null) { 728 if (!reEstablishSession()) { 729 fatalError("Failed to reEstablish connection with ZooKeeper"); 730 return; 731 } 732 } 733 734 createRetryCount = 0; 735 wantToBeInElection = true; 736 createLockNodeAsync(); 737 } 738 739 private void reJoinElection(int sleepTime) { 740 LOG.info("Trying to re-establish ZK session"); 741 742 // Some of the test cases rely on expiring the ZK sessions and 743 // ensuring that the other node takes over. But, there's a race 744 // where the original lease holder could reconnect faster than the other 745 // thread manages to take the lock itself. This lock allows the 746 // tests to block the reconnection. It's a shame that this leaked 747 // into non-test code, but the lock is only acquired here so will never 748 // be contended. 749 sessionReestablishLockForTests.lock(); 750 try { 751 terminateConnection(); 752 sleepFor(sleepTime); 753 // Should not join election even before the SERVICE is reported 754 // as HEALTHY from ZKFC monitoring. 755 if (appData != null) { 756 joinElectionInternal(); 757 } else { 758 LOG.info("Not joining election since service has not yet been " + 759 "reported as healthy."); 760 } 761 } finally { 762 sessionReestablishLockForTests.unlock(); 763 } 764 } 765 766 /** 767 * Sleep for the given number of milliseconds. 768 * This is non-static, and separated out, so that unit tests 769 * can override the behavior not to sleep. 770 */ 771 @VisibleForTesting 772 protected void sleepFor(int sleepMs) { 773 if (sleepMs > 0) { 774 try { 775 Thread.sleep(sleepMs); 776 } catch (InterruptedException e) { 777 Thread.currentThread().interrupt(); 778 } 779 } 780 } 781 782 @VisibleForTesting 783 void preventSessionReestablishmentForTests() { 784 sessionReestablishLockForTests.lock(); 785 } 786 787 @VisibleForTesting 788 void allowSessionReestablishmentForTests() { 789 sessionReestablishLockForTests.unlock(); 790 } 791 792 @VisibleForTesting 793 synchronized long getZKSessionIdForTests() { 794 if (zkClient != null) { 795 return zkClient.getSessionId(); 796 } else { 797 return -1; 798 } 799 } 800 801 @VisibleForTesting 802 synchronized State getStateForTests() { 803 return state; 804 } 805 806 @VisibleForTesting 807 synchronized boolean isMonitorLockNodePending() { 808 return monitorLockNodePending; 809 } 810 811 private boolean reEstablishSession() { 812 int connectionRetryCount = 0; 813 boolean success = false; 814 while(!success && connectionRetryCount < maxRetryNum) { 815 if (LOG.isDebugEnabled()) { 816 LOG.debug("Establishing zookeeper connection for " + this); 817 } 818 try { 819 createConnection(); 820 success = true; 821 } catch(IOException e) { 822 LOG.warn(e); 823 sleepFor(5000); 824 } catch(KeeperException e) { 825 LOG.warn(e); 826 sleepFor(5000); 827 } 828 ++connectionRetryCount; 829 } 830 return success; 831 } 832 833 private void createConnection() throws IOException, KeeperException { 834 if (zkClient != null) { 835 try { 836 zkClient.close(); 837 } catch (InterruptedException e) { 838 throw new IOException("Interrupted while closing ZK", 839 e); 840 } 841 zkClient = null; 842 watcher = null; 843 } 844 zkClient = connectToZooKeeper(); 845 if (LOG.isDebugEnabled()) { 846 LOG.debug("Created new connection for " + this); 847 } 848 } 849 850 @InterfaceAudience.Private 851 public synchronized void terminateConnection() { 852 if (zkClient == null) { 853 return; 854 } 855 if (LOG.isDebugEnabled()) { 856 LOG.debug("Terminating ZK connection for " + this); 857 } 858 ZooKeeper tempZk = zkClient; 859 zkClient = null; 860 watcher = null; 861 try { 862 tempZk.close(); 863 } catch(InterruptedException e) { 864 LOG.warn(e); 865 } 866 zkConnectionState = ConnectionState.TERMINATED; 867 wantToBeInElection = false; 868 } 869 870 private void reset() { 871 state = State.INIT; 872 terminateConnection(); 873 } 874 875 private boolean becomeActive() { 876 assert wantToBeInElection; 877 if (state == State.ACTIVE) { 878 // already active 879 return true; 880 } 881 try { 882 Stat oldBreadcrumbStat = fenceOldActive(); 883 writeBreadCrumbNode(oldBreadcrumbStat); 884 885 if (LOG.isDebugEnabled()) { 886 LOG.debug("Becoming active for " + this); 887 } 888 appClient.becomeActive(); 889 state = State.ACTIVE; 890 return true; 891 } catch (Exception e) { 892 LOG.warn("Exception handling the winning of election", e); 893 // Caller will handle quitting and rejoining the election. 894 return false; 895 } 896 } 897 898 /** 899 * Write the "ActiveBreadCrumb" node, indicating that this node may need 900 * to be fenced on failover. 901 * @param oldBreadcrumbStat 902 */ 903 private void writeBreadCrumbNode(Stat oldBreadcrumbStat) 904 throws KeeperException, InterruptedException { 905 Preconditions.checkState(appData != null, "no appdata"); 906 907 LOG.info("Writing znode " + zkBreadCrumbPath + 908 " to indicate that the local node is the most recent active..."); 909 if (oldBreadcrumbStat == null) { 910 // No previous active, just create the node 911 createWithRetries(zkBreadCrumbPath, appData, zkAcl, 912 CreateMode.PERSISTENT); 913 } else { 914 // There was a previous active, update the node 915 setDataWithRetries(zkBreadCrumbPath, appData, oldBreadcrumbStat.getVersion()); 916 } 917 } 918 919 /** 920 * Try to delete the "ActiveBreadCrumb" node when gracefully giving up 921 * active status. 922 * If this fails, it will simply warn, since the graceful release behavior 923 * is only an optimization. 924 */ 925 private void tryDeleteOwnBreadCrumbNode() { 926 assert state == State.ACTIVE; 927 LOG.info("Deleting bread-crumb of active node..."); 928 929 // Sanity check the data. This shouldn't be strictly necessary, 930 // but better to play it safe. 931 Stat stat = new Stat(); 932 byte[] data = null; 933 try { 934 data = zkClient.getData(zkBreadCrumbPath, false, stat); 935 936 if (!Arrays.equals(data, appData)) { 937 throw new IllegalStateException( 938 "We thought we were active, but in fact " + 939 "the active znode had the wrong data: " + 940 StringUtils.byteToHexString(data) + " (stat=" + stat + ")"); 941 } 942 943 deleteWithRetries(zkBreadCrumbPath, stat.getVersion()); 944 } catch (Exception e) { 945 LOG.warn("Unable to delete our own bread-crumb of being active at " + 946 zkBreadCrumbPath + ": " + e.getLocalizedMessage() + ". " + 947 "Expecting to be fenced by the next active."); 948 } 949 } 950 951 /** 952 * If there is a breadcrumb node indicating that another node may need 953 * fencing, try to fence that node. 954 * @return the Stat of the breadcrumb node that was read, or null 955 * if no breadcrumb node existed 956 */ 957 private Stat fenceOldActive() throws InterruptedException, KeeperException { 958 final Stat stat = new Stat(); 959 byte[] data; 960 LOG.info("Checking for any old active which needs to be fenced..."); 961 try { 962 data = zkDoWithRetries(new ZKAction<byte[]>() { 963 @Override 964 public byte[] run() throws KeeperException, InterruptedException { 965 return zkClient.getData(zkBreadCrumbPath, false, stat); 966 } 967 }); 968 } catch (KeeperException ke) { 969 if (isNodeDoesNotExist(ke.code())) { 970 LOG.info("No old node to fence"); 971 return null; 972 } 973 974 // If we failed to read for any other reason, then likely we lost 975 // our session, or we don't have permissions, etc. In any case, 976 // we probably shouldn't become active, and failing the whole 977 // thing is the best bet. 978 throw ke; 979 } 980 981 LOG.info("Old node exists: " + StringUtils.byteToHexString(data)); 982 if (Arrays.equals(data, appData)) { 983 LOG.info("But old node has our own data, so don't need to fence it."); 984 } else { 985 appClient.fenceOldActive(data); 986 } 987 return stat; 988 } 989 990 private void becomeStandby() { 991 if (state != State.STANDBY) { 992 if (LOG.isDebugEnabled()) { 993 LOG.debug("Becoming standby for " + this); 994 } 995 state = State.STANDBY; 996 appClient.becomeStandby(); 997 } 998 } 999 1000 private void enterNeutralMode() { 1001 if (state != State.NEUTRAL) { 1002 if (LOG.isDebugEnabled()) { 1003 LOG.debug("Entering neutral mode for " + this); 1004 } 1005 state = State.NEUTRAL; 1006 appClient.enterNeutralMode(); 1007 } 1008 } 1009 1010 private void createLockNodeAsync() { 1011 zkClient.create(zkLockFilePath, appData, zkAcl, CreateMode.EPHEMERAL, 1012 this, zkClient); 1013 } 1014 1015 private void monitorLockNodeAsync() { 1016 if (monitorLockNodePending && monitorLockNodeClient == zkClient) { 1017 LOG.info("Ignore duplicate monitor lock-node request."); 1018 return; 1019 } 1020 monitorLockNodePending = true; 1021 monitorLockNodeClient = zkClient; 1022 zkClient.exists(zkLockFilePath, 1023 watcher, this, 1024 zkClient); 1025 } 1026 1027 private String createWithRetries(final String path, final byte[] data, 1028 final List<ACL> acl, final CreateMode mode) 1029 throws InterruptedException, KeeperException { 1030 return zkDoWithRetries(new ZKAction<String>() { 1031 @Override 1032 public String run() throws KeeperException, InterruptedException { 1033 return zkClient.create(path, data, acl, mode); 1034 } 1035 }); 1036 } 1037 1038 private byte[] getDataWithRetries(final String path, final boolean watch, 1039 final Stat stat) throws InterruptedException, KeeperException { 1040 return zkDoWithRetries(new ZKAction<byte[]>() { 1041 @Override 1042 public byte[] run() throws KeeperException, InterruptedException { 1043 return zkClient.getData(path, watch, stat); 1044 } 1045 }); 1046 } 1047 1048 private Stat setDataWithRetries(final String path, final byte[] data, 1049 final int version) throws InterruptedException, KeeperException { 1050 return zkDoWithRetries(new ZKAction<Stat>() { 1051 @Override 1052 public Stat run() throws KeeperException, InterruptedException { 1053 return zkClient.setData(path, data, version); 1054 } 1055 }); 1056 } 1057 1058 private void deleteWithRetries(final String path, final int version) 1059 throws KeeperException, InterruptedException { 1060 zkDoWithRetries(new ZKAction<Void>() { 1061 @Override 1062 public Void run() throws KeeperException, InterruptedException { 1063 zkClient.delete(path, version); 1064 return null; 1065 } 1066 }); 1067 } 1068 1069 private <T> T zkDoWithRetries(ZKAction<T> action) throws KeeperException, 1070 InterruptedException { 1071 int retry = 0; 1072 while (true) { 1073 try { 1074 return action.run(); 1075 } catch (KeeperException ke) { 1076 if (shouldRetry(ke.code()) && ++retry < maxRetryNum) { 1077 continue; 1078 } 1079 throw ke; 1080 } 1081 } 1082 } 1083 1084 private interface ZKAction<T> { 1085 T run() throws KeeperException, InterruptedException; 1086 } 1087 1088 /** 1089 * The callbacks and watchers pass a reference to the ZK client 1090 * which made the original call. We don't want to take action 1091 * based on any callbacks from prior clients after we quit 1092 * the election. 1093 * @param ctx the ZK client passed into the watcher 1094 * @return true if it matches the current client 1095 */ 1096 private synchronized boolean isStaleClient(Object ctx) { 1097 Preconditions.checkNotNull(ctx); 1098 if (zkClient != (ZooKeeper)ctx) { 1099 LOG.warn("Ignoring stale result from old client with sessionId " + 1100 String.format("0x%08x", ((ZooKeeper)ctx).getSessionId())); 1101 return true; 1102 } 1103 return false; 1104 } 1105 1106 /** 1107 * Watcher implementation which keeps a reference around to the 1108 * original ZK connection, and passes it back along with any 1109 * events. 1110 */ 1111 private final class WatcherWithClientRef implements Watcher { 1112 private ZooKeeper zk; 1113 1114 /** 1115 * Latch fired whenever any event arrives. This is used in order 1116 * to wait for the Connected event when the client is first created. 1117 */ 1118 private CountDownLatch hasReceivedEvent = new CountDownLatch(1); 1119 1120 /** 1121 * Latch used to wait until the reference to ZooKeeper is set. 1122 */ 1123 private CountDownLatch hasSetZooKeeper = new CountDownLatch(1); 1124 1125 /** 1126 * Waits for the next event from ZooKeeper to arrive. 1127 * 1128 * @param connectionTimeoutMs zookeeper connection timeout in milliseconds 1129 * @throws KeeperException if the connection attempt times out. This will 1130 * be a ZooKeeper ConnectionLoss exception code. 1131 * @throws IOException if interrupted while connecting to ZooKeeper 1132 */ 1133 private void waitForZKConnectionEvent(int connectionTimeoutMs) 1134 throws KeeperException, IOException { 1135 try { 1136 if (!hasReceivedEvent.await(connectionTimeoutMs, TimeUnit.MILLISECONDS)) { 1137 LOG.error("Connection timed out: couldn't connect to ZooKeeper in " 1138 + connectionTimeoutMs + " milliseconds"); 1139 zk.close(); 1140 throw KeeperException.create(Code.CONNECTIONLOSS); 1141 } 1142 } catch (InterruptedException e) { 1143 Thread.currentThread().interrupt(); 1144 throw new IOException( 1145 "Interrupted when connecting to zookeeper server", e); 1146 } 1147 } 1148 1149 private void setZooKeeperRef(ZooKeeper zk) { 1150 Preconditions.checkState(this.zk == null, 1151 "zk already set -- must be set exactly once"); 1152 this.zk = zk; 1153 hasSetZooKeeper.countDown(); 1154 } 1155 1156 @Override 1157 public void process(WatchedEvent event) { 1158 hasReceivedEvent.countDown(); 1159 try { 1160 if (!hasSetZooKeeper.await(zkSessionTimeout, TimeUnit.MILLISECONDS)) { 1161 LOG.debug("Event received with stale zk"); 1162 } 1163 ActiveStandbyElector.this.processWatchEvent( 1164 zk, event); 1165 } catch (Throwable t) { 1166 fatalError( 1167 "Failed to process watcher event " + event + ": " + 1168 StringUtils.stringifyException(t)); 1169 } 1170 } 1171 } 1172 1173 private static boolean isSuccess(Code code) { 1174 return (code == Code.OK); 1175 } 1176 1177 private static boolean isNodeExists(Code code) { 1178 return (code == Code.NODEEXISTS); 1179 } 1180 1181 private static boolean isNodeDoesNotExist(Code code) { 1182 return (code == Code.NONODE); 1183 } 1184 1185 private static boolean isSessionExpired(Code code) { 1186 return (code == Code.SESSIONEXPIRED); 1187 } 1188 1189 private static boolean shouldRetry(Code code) { 1190 return code == Code.CONNECTIONLOSS || code == Code.OPERATIONTIMEOUT; 1191 } 1192 1193 @Override 1194 public String toString() { 1195 return "elector id=" + System.identityHashCode(this) + 1196 " appData=" + 1197 ((appData == null) ? "null" : StringUtils.byteToHexString(appData)) + 1198 " cb=" + appClient; 1199 } 1200 1201 public String getHAZookeeperConnectionState() { 1202 return this.zkConnectionState.name(); 1203 } 1204}