001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.ha; 019 020import java.io.IOException; 021import java.net.InetSocketAddress; 022import java.util.Collection; 023import java.util.regex.Matcher; 024import java.util.regex.Pattern; 025 026import org.apache.commons.logging.Log; 027import org.apache.commons.logging.LogFactory; 028import org.apache.hadoop.conf.Configured; 029 030import com.google.common.annotations.VisibleForTesting; 031import com.jcraft.jsch.ChannelExec; 032import com.jcraft.jsch.JSch; 033import com.jcraft.jsch.JSchException; 034import com.jcraft.jsch.Session; 035 036/** 037 * This fencing implementation sshes to the target node and uses 038 * <code>fuser</code> to kill the process listening on the service's 039 * TCP port. This is more accurate than using "jps" since it doesn't 040 * require parsing, and will work even if there are multiple service 041 * processes running on the same machine.<p> 042 * It returns a successful status code if: 043 * <ul> 044 * <li><code>fuser</code> indicates it successfully killed a process, <em>or</em> 045 * <li><code>nc -z</code> indicates that nothing is listening on the target port 046 * </ul> 047 * <p> 048 * This fencing mechanism is configured as following in the fencing method 049 * list: 050 * <code>sshfence([[username][:ssh-port]])</code> 051 * where the optional argument specifies the username and port to use 052 * with ssh. 053 * <p> 054 * In order to achieve passwordless SSH, the operator must also configure 055 * <code>dfs.ha.fencing.ssh.private-key-files<code> to point to an 056 * SSH key that has passphrase-less access to the given username and host. 057 */ 058public class SshFenceByTcpPort extends Configured 059 implements FenceMethod { 060 061 static final Log LOG = LogFactory.getLog( 062 SshFenceByTcpPort.class); 063 064 static final String CONF_CONNECT_TIMEOUT_KEY = 065 "dfs.ha.fencing.ssh.connect-timeout"; 066 private static final int CONF_CONNECT_TIMEOUT_DEFAULT = 067 30*1000; 068 static final String CONF_IDENTITIES_KEY = 069 "dfs.ha.fencing.ssh.private-key-files"; 070 071 /** 072 * Verify that the argument, if given, in the conf is parseable. 073 */ 074 @Override 075 public void checkArgs(String argStr) throws BadFencingConfigurationException { 076 if (argStr != null) { 077 new Args(argStr); 078 } 079 } 080 081 @Override 082 public boolean tryFence(HAServiceTarget target, String argsStr) 083 throws BadFencingConfigurationException { 084 085 Args args = new Args(argsStr); 086 InetSocketAddress serviceAddr = target.getAddress(); 087 String host = serviceAddr.getHostName(); 088 089 Session session; 090 try { 091 session = createSession(serviceAddr.getHostName(), args); 092 } catch (JSchException e) { 093 LOG.warn("Unable to create SSH session", e); 094 return false; 095 } 096 097 LOG.info("Connecting to " + host + "..."); 098 099 try { 100 session.connect(getSshConnectTimeout()); 101 } catch (JSchException e) { 102 LOG.warn("Unable to connect to " + host 103 + " as user " + args.user, e); 104 return false; 105 } 106 LOG.info("Connected to " + host); 107 108 try { 109 return doFence(session, serviceAddr); 110 } catch (JSchException e) { 111 LOG.warn("Unable to achieve fencing on remote host", e); 112 return false; 113 } finally { 114 session.disconnect(); 115 } 116 } 117 118 119 private Session createSession(String host, Args args) throws JSchException { 120 JSch jsch = new JSch(); 121 for (String keyFile : getKeyFiles()) { 122 jsch.addIdentity(keyFile); 123 } 124 JSch.setLogger(new LogAdapter()); 125 126 Session session = jsch.getSession(args.user, host, args.sshPort); 127 session.setConfig("StrictHostKeyChecking", "no"); 128 return session; 129 } 130 131 private boolean doFence(Session session, InetSocketAddress serviceAddr) 132 throws JSchException { 133 int port = serviceAddr.getPort(); 134 try { 135 LOG.info("Looking for process running on port " + port); 136 int rc = execCommand(session, 137 "PATH=$PATH:/sbin:/usr/sbin fuser -v -k -n tcp " + port); 138 if (rc == 0) { 139 LOG.info("Successfully killed process that was " + 140 "listening on port " + port); 141 // exit code 0 indicates the process was successfully killed. 142 return true; 143 } else if (rc == 1) { 144 // exit code 1 indicates either that the process was not running 145 // or that fuser didn't have root privileges in order to find it 146 // (eg running as a different user) 147 LOG.info( 148 "Indeterminate response from trying to kill service. " + 149 "Verifying whether it is running using nc..."); 150 rc = execCommand(session, "nc -z " + serviceAddr.getHostName() + 151 " " + serviceAddr.getPort()); 152 if (rc == 0) { 153 // the service is still listening - we are unable to fence 154 LOG.warn("Unable to fence - it is running but we cannot kill it"); 155 return false; 156 } else { 157 LOG.info("Verified that the service is down."); 158 return true; 159 } 160 } else { 161 // other 162 } 163 LOG.info("rc: " + rc); 164 return rc == 0; 165 } catch (InterruptedException e) { 166 LOG.warn("Interrupted while trying to fence via ssh", e); 167 return false; 168 } catch (IOException e) { 169 LOG.warn("Unknown failure while trying to fence via ssh", e); 170 return false; 171 } 172 } 173 174 /** 175 * Execute a command through the ssh session, pumping its 176 * stderr and stdout to our own logs. 177 */ 178 private int execCommand(Session session, String cmd) 179 throws JSchException, InterruptedException, IOException { 180 LOG.debug("Running cmd: " + cmd); 181 ChannelExec exec = null; 182 try { 183 exec = (ChannelExec)session.openChannel("exec"); 184 exec.setCommand(cmd); 185 exec.setInputStream(null); 186 exec.connect(); 187 188 // Pump stdout of the command to our WARN logs 189 StreamPumper outPumper = new StreamPumper(LOG, cmd + " via ssh", 190 exec.getInputStream(), StreamPumper.StreamType.STDOUT); 191 outPumper.start(); 192 193 // Pump stderr of the command to our WARN logs 194 StreamPumper errPumper = new StreamPumper(LOG, cmd + " via ssh", 195 exec.getErrStream(), StreamPumper.StreamType.STDERR); 196 errPumper.start(); 197 198 outPumper.join(); 199 errPumper.join(); 200 return exec.getExitStatus(); 201 } finally { 202 cleanup(exec); 203 } 204 } 205 206 private static void cleanup(ChannelExec exec) { 207 if (exec != null) { 208 try { 209 exec.disconnect(); 210 } catch (Throwable t) { 211 LOG.warn("Couldn't disconnect ssh channel", t); 212 } 213 } 214 } 215 216 private int getSshConnectTimeout() { 217 return getConf().getInt( 218 CONF_CONNECT_TIMEOUT_KEY, CONF_CONNECT_TIMEOUT_DEFAULT); 219 } 220 221 private Collection<String> getKeyFiles() { 222 return getConf().getTrimmedStringCollection(CONF_IDENTITIES_KEY); 223 } 224 225 /** 226 * Container for the parsed arg line for this fencing method. 227 */ 228 @VisibleForTesting 229 static class Args { 230 private static final Pattern USER_PORT_RE = Pattern.compile( 231 "([^:]+?)?(?:\\:(\\d+))?"); 232 233 private static final int DEFAULT_SSH_PORT = 22; 234 235 String user; 236 int sshPort; 237 238 public Args(String arg) 239 throws BadFencingConfigurationException { 240 user = System.getProperty("user.name"); 241 sshPort = DEFAULT_SSH_PORT; 242 243 // Parse optional user and ssh port 244 if (arg != null && !arg.isEmpty()) { 245 Matcher m = USER_PORT_RE.matcher(arg); 246 if (!m.matches()) { 247 throw new BadFencingConfigurationException( 248 "Unable to parse user and SSH port: "+ arg); 249 } 250 if (m.group(1) != null) { 251 user = m.group(1); 252 } 253 if (m.group(2) != null) { 254 sshPort = parseConfiggedPort(m.group(2)); 255 } 256 } 257 } 258 259 private int parseConfiggedPort(String portStr) 260 throws BadFencingConfigurationException { 261 try { 262 return Integer.parseInt(portStr); 263 } catch (NumberFormatException nfe) { 264 throw new BadFencingConfigurationException( 265 "Port number '" + portStr + "' invalid"); 266 } 267 } 268 } 269 270 /** 271 * Adapter from JSch's logger interface to our log4j 272 */ 273 private static class LogAdapter implements com.jcraft.jsch.Logger { 274 static final Log LOG = LogFactory.getLog( 275 SshFenceByTcpPort.class.getName() + ".jsch"); 276 277 @Override 278 public boolean isEnabled(int level) { 279 switch (level) { 280 case com.jcraft.jsch.Logger.DEBUG: 281 return LOG.isDebugEnabled(); 282 case com.jcraft.jsch.Logger.INFO: 283 return LOG.isInfoEnabled(); 284 case com.jcraft.jsch.Logger.WARN: 285 return LOG.isWarnEnabled(); 286 case com.jcraft.jsch.Logger.ERROR: 287 return LOG.isErrorEnabled(); 288 case com.jcraft.jsch.Logger.FATAL: 289 return LOG.isFatalEnabled(); 290 default: 291 return false; 292 } 293 } 294 295 @Override 296 public void log(int level, String message) { 297 switch (level) { 298 case com.jcraft.jsch.Logger.DEBUG: 299 LOG.debug(message); 300 break; 301 case com.jcraft.jsch.Logger.INFO: 302 LOG.info(message); 303 break; 304 case com.jcraft.jsch.Logger.WARN: 305 LOG.warn(message); 306 break; 307 case com.jcraft.jsch.Logger.ERROR: 308 LOG.error(message); 309 break; 310 case com.jcraft.jsch.Logger.FATAL: 311 LOG.fatal(message); 312 break; 313 default: 314 break; 315 } 316 } 317 } 318}