001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.crypto.key.kms; 019 020import org.apache.commons.codec.binary.Base64; 021import org.apache.hadoop.classification.InterfaceAudience; 022import org.apache.hadoop.conf.Configuration; 023import org.apache.hadoop.crypto.key.KeyProvider; 024import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersion; 025import org.apache.hadoop.crypto.key.KeyProviderDelegationTokenExtension; 026import org.apache.hadoop.crypto.key.KeyProviderFactory; 027import org.apache.hadoop.fs.CommonConfigurationKeysPublic; 028import org.apache.hadoop.fs.Path; 029import org.apache.hadoop.io.IOUtils; 030import org.apache.hadoop.io.Text; 031import org.apache.hadoop.security.Credentials; 032import org.apache.hadoop.security.ProviderUtils; 033import org.apache.hadoop.security.SecurityUtil; 034import org.apache.hadoop.security.UserGroupInformation; 035import org.apache.hadoop.security.authentication.client.AuthenticatedURL; 036import org.apache.hadoop.security.authentication.client.AuthenticationException; 037import org.apache.hadoop.security.authentication.client.ConnectionConfigurator; 038import org.apache.hadoop.security.ssl.SSLFactory; 039import org.apache.hadoop.security.token.Token; 040import org.apache.hadoop.security.token.TokenIdentifier; 041import org.apache.hadoop.security.token.TokenRenewer; 042import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier; 043import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticatedURL; 044import org.apache.hadoop.util.HttpExceptionUtils; 045import org.apache.hadoop.util.KMSUtil; 046import org.apache.http.client.utils.URIBuilder; 047import org.codehaus.jackson.map.ObjectMapper; 048import org.slf4j.Logger; 049import org.slf4j.LoggerFactory; 050 051import javax.net.ssl.HttpsURLConnection; 052 053import java.io.IOException; 054import java.io.InputStream; 055import java.io.OutputStream; 056import java.io.OutputStreamWriter; 057import java.io.Writer; 058import java.lang.reflect.UndeclaredThrowableException; 059import java.net.HttpURLConnection; 060import java.net.InetSocketAddress; 061import java.net.MalformedURLException; 062import java.net.SocketTimeoutException; 063import java.net.URI; 064import java.net.URISyntaxException; 065import java.net.URL; 066import java.net.URLEncoder; 067import java.nio.charset.StandardCharsets; 068import java.security.GeneralSecurityException; 069import java.security.NoSuchAlgorithmException; 070import java.security.PrivilegedExceptionAction; 071import java.util.ArrayList; 072import java.util.Date; 073import java.util.HashMap; 074import java.util.LinkedList; 075import java.util.List; 076import java.util.Map; 077import java.util.Queue; 078import java.util.concurrent.ExecutionException; 079 080import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension; 081import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.CryptoExtension; 082 083import com.google.common.annotations.VisibleForTesting; 084import com.google.common.base.Preconditions; 085import com.google.common.base.Strings; 086 087/** 088 * KMS client <code>KeyProvider</code> implementation. 089 */ 090@InterfaceAudience.Private 091public class KMSClientProvider extends KeyProvider implements CryptoExtension, 092 KeyProviderDelegationTokenExtension.DelegationTokenExtension { 093 094 private static final Logger LOG = 095 LoggerFactory.getLogger(KMSClientProvider.class); 096 097 private static final String INVALID_SIGNATURE = "Invalid signature"; 098 099 private static final String ANONYMOUS_REQUESTS_DISALLOWED = "Anonymous requests are disallowed"; 100 101 public static final String TOKEN_KIND_STR = KMSDelegationToken.TOKEN_KIND_STR; 102 public static final Text TOKEN_KIND = KMSDelegationToken.TOKEN_KIND; 103 104 public static final String SCHEME_NAME = "kms"; 105 106 private static final String UTF8 = "UTF-8"; 107 108 private static final String CONTENT_TYPE = "Content-Type"; 109 private static final String APPLICATION_JSON_MIME = "application/json"; 110 111 private static final String HTTP_GET = "GET"; 112 private static final String HTTP_POST = "POST"; 113 private static final String HTTP_PUT = "PUT"; 114 private static final String HTTP_DELETE = "DELETE"; 115 116 117 private static final String CONFIG_PREFIX = "hadoop.security.kms.client."; 118 119 /* It's possible to specify a timeout, in seconds, in the config file */ 120 public static final String TIMEOUT_ATTR = CONFIG_PREFIX + "timeout"; 121 public static final int DEFAULT_TIMEOUT = 60; 122 123 /* Number of times to retry authentication in the event of auth failure 124 * (normally happens due to stale authToken) 125 */ 126 public static final String AUTH_RETRY = CONFIG_PREFIX 127 + "authentication.retry-count"; 128 public static final int DEFAULT_AUTH_RETRY = 1; 129 130 private final ValueQueue<EncryptedKeyVersion> encKeyVersionQueue; 131 132 private class EncryptedQueueRefiller implements 133 ValueQueue.QueueRefiller<EncryptedKeyVersion> { 134 135 @Override 136 public void fillQueueForKey(String keyName, 137 Queue<EncryptedKeyVersion> keyQueue, int numEKVs) throws IOException { 138 checkNotNull(keyName, "keyName"); 139 Map<String, String> params = new HashMap<String, String>(); 140 params.put(KMSRESTConstants.EEK_OP, KMSRESTConstants.EEK_GENERATE); 141 params.put(KMSRESTConstants.EEK_NUM_KEYS, "" + numEKVs); 142 URL url = createURL(KMSRESTConstants.KEY_RESOURCE, keyName, 143 KMSRESTConstants.EEK_SUB_RESOURCE, params); 144 HttpURLConnection conn = createConnection(url, HTTP_GET); 145 conn.setRequestProperty(CONTENT_TYPE, APPLICATION_JSON_MIME); 146 List response = call(conn, null, 147 HttpURLConnection.HTTP_OK, List.class); 148 List<EncryptedKeyVersion> ekvs = 149 parseJSONEncKeyVersion(keyName, response); 150 keyQueue.addAll(ekvs); 151 } 152 } 153 154 /** 155 * The KMS implementation of {@link TokenRenewer}. 156 */ 157 public static class KMSTokenRenewer extends TokenRenewer { 158 private static final Logger LOG = 159 LoggerFactory.getLogger(KMSTokenRenewer.class); 160 161 @Override 162 public boolean handleKind(Text kind) { 163 return kind.equals(TOKEN_KIND); 164 } 165 166 @Override 167 public boolean isManaged(Token<?> token) throws IOException { 168 return true; 169 } 170 171 @Override 172 public long renew(Token<?> token, Configuration conf) throws IOException { 173 LOG.debug("Renewing delegation token {}", token); 174 KeyProvider keyProvider = KMSUtil.createKeyProvider(conf, 175 KeyProviderFactory.KEY_PROVIDER_PATH); 176 try { 177 if (!(keyProvider instanceof 178 KeyProviderDelegationTokenExtension.DelegationTokenExtension)) { 179 LOG.warn("keyProvider {} cannot renew dt.", keyProvider == null ? 180 "null" : keyProvider.getClass()); 181 return 0; 182 } 183 return ((KeyProviderDelegationTokenExtension.DelegationTokenExtension) 184 keyProvider).renewDelegationToken(token); 185 } finally { 186 if (keyProvider != null) { 187 keyProvider.close(); 188 } 189 } 190 } 191 192 @Override 193 public void cancel(Token<?> token, Configuration conf) throws IOException { 194 LOG.debug("Canceling delegation token {}", token); 195 KeyProvider keyProvider = KMSUtil.createKeyProvider(conf, 196 KeyProviderFactory.KEY_PROVIDER_PATH); 197 try { 198 if (!(keyProvider instanceof 199 KeyProviderDelegationTokenExtension.DelegationTokenExtension)) { 200 LOG.warn("keyProvider {} cannot cancel dt.", keyProvider == null ? 201 "null" : keyProvider.getClass()); 202 return; 203 } 204 ((KeyProviderDelegationTokenExtension.DelegationTokenExtension) 205 keyProvider).cancelDelegationToken(token); 206 } finally { 207 if (keyProvider != null) { 208 keyProvider.close(); 209 } 210 } 211 } 212 } 213 214 public static class KMSEncryptedKeyVersion extends EncryptedKeyVersion { 215 public KMSEncryptedKeyVersion(String keyName, String keyVersionName, 216 byte[] iv, String encryptedVersionName, byte[] keyMaterial) { 217 super(keyName, keyVersionName, iv, new KMSKeyVersion(null, 218 encryptedVersionName, keyMaterial)); 219 } 220 } 221 222 @SuppressWarnings("rawtypes") 223 private static List<EncryptedKeyVersion> 224 parseJSONEncKeyVersion(String keyName, List valueList) { 225 List<EncryptedKeyVersion> ekvs = new LinkedList<EncryptedKeyVersion>(); 226 if (!valueList.isEmpty()) { 227 for (Object values : valueList) { 228 Map valueMap = (Map) values; 229 230 String versionName = checkNotNull( 231 (String) valueMap.get(KMSRESTConstants.VERSION_NAME_FIELD), 232 KMSRESTConstants.VERSION_NAME_FIELD); 233 234 byte[] iv = Base64.decodeBase64(checkNotNull( 235 (String) valueMap.get(KMSRESTConstants.IV_FIELD), 236 KMSRESTConstants.IV_FIELD)); 237 238 Map encValueMap = checkNotNull((Map) 239 valueMap.get(KMSRESTConstants.ENCRYPTED_KEY_VERSION_FIELD), 240 KMSRESTConstants.ENCRYPTED_KEY_VERSION_FIELD); 241 242 String encVersionName = checkNotNull((String) 243 encValueMap.get(KMSRESTConstants.VERSION_NAME_FIELD), 244 KMSRESTConstants.VERSION_NAME_FIELD); 245 246 byte[] encKeyMaterial = Base64.decodeBase64(checkNotNull((String) 247 encValueMap.get(KMSRESTConstants.MATERIAL_FIELD), 248 KMSRESTConstants.MATERIAL_FIELD)); 249 250 ekvs.add(new KMSEncryptedKeyVersion(keyName, versionName, iv, 251 encVersionName, encKeyMaterial)); 252 } 253 } 254 return ekvs; 255 } 256 257 private static KeyVersion parseJSONKeyVersion(Map valueMap) { 258 KeyVersion keyVersion = null; 259 if (!valueMap.isEmpty()) { 260 byte[] material = (valueMap.containsKey(KMSRESTConstants.MATERIAL_FIELD)) 261 ? Base64.decodeBase64((String) valueMap.get(KMSRESTConstants.MATERIAL_FIELD)) 262 : null; 263 String versionName = (String)valueMap.get(KMSRESTConstants.VERSION_NAME_FIELD); 264 String keyName = (String)valueMap.get(KMSRESTConstants.NAME_FIELD); 265 keyVersion = new KMSKeyVersion(keyName, versionName, material); 266 } 267 return keyVersion; 268 } 269 270 @SuppressWarnings("unchecked") 271 private static Metadata parseJSONMetadata(Map valueMap) { 272 Metadata metadata = null; 273 if (!valueMap.isEmpty()) { 274 metadata = new KMSMetadata( 275 (String) valueMap.get(KMSRESTConstants.CIPHER_FIELD), 276 (Integer) valueMap.get(KMSRESTConstants.LENGTH_FIELD), 277 (String) valueMap.get(KMSRESTConstants.DESCRIPTION_FIELD), 278 (Map<String, String>) valueMap.get(KMSRESTConstants.ATTRIBUTES_FIELD), 279 new Date((Long) valueMap.get(KMSRESTConstants.CREATED_FIELD)), 280 (Integer) valueMap.get(KMSRESTConstants.VERSIONS_FIELD)); 281 } 282 return metadata; 283 } 284 285 private static void writeJson(Map map, OutputStream os) throws IOException { 286 Writer writer = new OutputStreamWriter(os, StandardCharsets.UTF_8); 287 ObjectMapper jsonMapper = new ObjectMapper(); 288 jsonMapper.writerWithDefaultPrettyPrinter().writeValue(writer, map); 289 } 290 291 /** 292 * The factory to create KMSClientProvider, which is used by the 293 * ServiceLoader. 294 */ 295 public static class Factory extends KeyProviderFactory { 296 297 /** 298 * This provider expects URIs in the following form : 299 * kms://<PROTO>@<AUTHORITY>/<PATH> 300 * 301 * where : 302 * - PROTO = http or https 303 * - AUTHORITY = <HOSTS>[:<PORT>] 304 * - HOSTS = <HOSTNAME>[;<HOSTS>] 305 * - HOSTNAME = string 306 * - PORT = integer 307 * 308 * If multiple hosts are provider, the Factory will create a 309 * {@link LoadBalancingKMSClientProvider} that round-robins requests 310 * across the provided list of hosts. 311 */ 312 @Override 313 public KeyProvider createProvider(URI providerUri, Configuration conf) 314 throws IOException { 315 if (SCHEME_NAME.equals(providerUri.getScheme())) { 316 URL origUrl = new URL(extractKMSPath(providerUri).toString()); 317 String authority = origUrl.getAuthority(); 318 // check for ';' which delimits the backup hosts 319 if (Strings.isNullOrEmpty(authority)) { 320 throw new IOException( 321 "No valid authority in kms uri [" + origUrl + "]"); 322 } 323 // Check if port is present in authority 324 // In the current scheme, all hosts have to run on the same port 325 int port = -1; 326 String hostsPart = authority; 327 if (authority.contains(":")) { 328 String[] t = authority.split(":"); 329 try { 330 port = Integer.parseInt(t[1]); 331 } catch (Exception e) { 332 throw new IOException( 333 "Could not parse port in kms uri [" + origUrl + "]"); 334 } 335 hostsPart = t[0]; 336 } 337 return createProvider(providerUri, conf, origUrl, port, hostsPart); 338 } 339 return null; 340 } 341 342 private KeyProvider createProvider(URI providerUri, Configuration conf, 343 URL origUrl, int port, String hostsPart) throws IOException { 344 String[] hosts = hostsPart.split(";"); 345 if (hosts.length == 1) { 346 return new KMSClientProvider(providerUri, conf); 347 } else { 348 KMSClientProvider[] providers = new KMSClientProvider[hosts.length]; 349 for (int i = 0; i < hosts.length; i++) { 350 try { 351 providers[i] = 352 new KMSClientProvider( 353 new URI("kms", origUrl.getProtocol(), hosts[i], port, 354 origUrl.getPath(), null, null), conf); 355 } catch (URISyntaxException e) { 356 throw new IOException("Could not instantiate KMSProvider..", e); 357 } 358 } 359 return new LoadBalancingKMSClientProvider(providers, conf); 360 } 361 } 362 } 363 364 public static <T> T checkNotNull(T o, String name) 365 throws IllegalArgumentException { 366 if (o == null) { 367 throw new IllegalArgumentException("Parameter '" + name + 368 "' cannot be null"); 369 } 370 return o; 371 } 372 373 public static String checkNotEmpty(String s, String name) 374 throws IllegalArgumentException { 375 checkNotNull(s, name); 376 if (s.isEmpty()) { 377 throw new IllegalArgumentException("Parameter '" + name + 378 "' cannot be empty"); 379 } 380 return s; 381 } 382 383 private String kmsUrl; 384 private SSLFactory sslFactory; 385 private ConnectionConfigurator configurator; 386 private DelegationTokenAuthenticatedURL.Token authToken; 387 private final int authRetry; 388 389 @Override 390 public String toString() { 391 final StringBuilder sb = new StringBuilder("KMSClientProvider["); 392 sb.append(kmsUrl).append("]"); 393 return sb.toString(); 394 } 395 396 /** 397 * This small class exists to set the timeout values for a connection 398 */ 399 private static class TimeoutConnConfigurator 400 implements ConnectionConfigurator { 401 private ConnectionConfigurator cc; 402 private int timeout; 403 404 /** 405 * Sets the timeout and wraps another connection configurator 406 * @param timeout - will set both connect and read timeouts - in seconds 407 * @param cc - another configurator to wrap - may be null 408 */ 409 public TimeoutConnConfigurator(int timeout, ConnectionConfigurator cc) { 410 this.timeout = timeout; 411 this.cc = cc; 412 } 413 414 /** 415 * Calls the wrapped configure() method, then sets timeouts 416 * @param conn the {@link HttpURLConnection} instance to configure. 417 * @return the connection 418 * @throws IOException 419 */ 420 @Override 421 public HttpURLConnection configure(HttpURLConnection conn) 422 throws IOException { 423 if (cc != null) { 424 conn = cc.configure(conn); 425 } 426 conn.setConnectTimeout(timeout * 1000); // conversion to milliseconds 427 conn.setReadTimeout(timeout * 1000); 428 return conn; 429 } 430 } 431 432 public KMSClientProvider(URI uri, Configuration conf) throws IOException { 433 super(conf); 434 kmsUrl = createServiceURL(extractKMSPath(uri)); 435 if ("https".equalsIgnoreCase(new URL(kmsUrl).getProtocol())) { 436 sslFactory = new SSLFactory(SSLFactory.Mode.CLIENT, conf); 437 try { 438 sslFactory.init(); 439 } catch (GeneralSecurityException ex) { 440 throw new IOException(ex); 441 } 442 } 443 int timeout = conf.getInt(TIMEOUT_ATTR, DEFAULT_TIMEOUT); 444 authRetry = conf.getInt(AUTH_RETRY, DEFAULT_AUTH_RETRY); 445 configurator = new TimeoutConnConfigurator(timeout, sslFactory); 446 encKeyVersionQueue = 447 new ValueQueue<KeyProviderCryptoExtension.EncryptedKeyVersion>( 448 conf.getInt( 449 CommonConfigurationKeysPublic.KMS_CLIENT_ENC_KEY_CACHE_SIZE, 450 CommonConfigurationKeysPublic. 451 KMS_CLIENT_ENC_KEY_CACHE_SIZE_DEFAULT), 452 conf.getFloat( 453 CommonConfigurationKeysPublic. 454 KMS_CLIENT_ENC_KEY_CACHE_LOW_WATERMARK, 455 CommonConfigurationKeysPublic. 456 KMS_CLIENT_ENC_KEY_CACHE_LOW_WATERMARK_DEFAULT), 457 conf.getInt( 458 CommonConfigurationKeysPublic. 459 KMS_CLIENT_ENC_KEY_CACHE_EXPIRY_MS, 460 CommonConfigurationKeysPublic. 461 KMS_CLIENT_ENC_KEY_CACHE_EXPIRY_DEFAULT), 462 conf.getInt( 463 CommonConfigurationKeysPublic. 464 KMS_CLIENT_ENC_KEY_CACHE_NUM_REFILL_THREADS, 465 CommonConfigurationKeysPublic. 466 KMS_CLIENT_ENC_KEY_CACHE_NUM_REFILL_THREADS_DEFAULT), 467 new EncryptedQueueRefiller()); 468 authToken = new DelegationTokenAuthenticatedURL.Token(); 469 } 470 471 private static Path extractKMSPath(URI uri) throws MalformedURLException, IOException { 472 return ProviderUtils.unnestUri(uri); 473 } 474 475 private static String createServiceURL(Path path) throws IOException { 476 String str = new URL(path.toString()).toExternalForm(); 477 if (str.endsWith("/")) { 478 str = str.substring(0, str.length() - 1); 479 } 480 return new URL(str + KMSRESTConstants.SERVICE_VERSION + "/"). 481 toExternalForm(); 482 } 483 484 private URL createURL(String collection, String resource, String subResource, 485 Map<String, ?> parameters) throws IOException { 486 try { 487 StringBuilder sb = new StringBuilder(); 488 sb.append(kmsUrl); 489 if (collection != null) { 490 sb.append(collection); 491 if (resource != null) { 492 sb.append("/").append(URLEncoder.encode(resource, UTF8)); 493 if (subResource != null) { 494 sb.append("/").append(subResource); 495 } 496 } 497 } 498 URIBuilder uriBuilder = new URIBuilder(sb.toString()); 499 if (parameters != null) { 500 for (Map.Entry<String, ?> param : parameters.entrySet()) { 501 Object value = param.getValue(); 502 if (value instanceof String) { 503 uriBuilder.addParameter(param.getKey(), (String) value); 504 } else { 505 for (String s : (String[]) value) { 506 uriBuilder.addParameter(param.getKey(), s); 507 } 508 } 509 } 510 } 511 return uriBuilder.build().toURL(); 512 } catch (URISyntaxException ex) { 513 throw new IOException(ex); 514 } 515 } 516 517 private HttpURLConnection configureConnection(HttpURLConnection conn) 518 throws IOException { 519 if (sslFactory != null) { 520 HttpsURLConnection httpsConn = (HttpsURLConnection) conn; 521 try { 522 httpsConn.setSSLSocketFactory(sslFactory.createSSLSocketFactory()); 523 } catch (GeneralSecurityException ex) { 524 throw new IOException(ex); 525 } 526 httpsConn.setHostnameVerifier(sslFactory.getHostnameVerifier()); 527 } 528 return conn; 529 } 530 531 private HttpURLConnection createConnection(final URL url, String method) 532 throws IOException { 533 HttpURLConnection conn; 534 try { 535 final String doAsUser = getDoAsUser(); 536 conn = getActualUgi().doAs(new PrivilegedExceptionAction 537 <HttpURLConnection>() { 538 @Override 539 public HttpURLConnection run() throws Exception { 540 DelegationTokenAuthenticatedURL authUrl = 541 new DelegationTokenAuthenticatedURL(configurator); 542 return authUrl.openConnection(url, authToken, doAsUser); 543 } 544 }); 545 } catch (IOException ex) { 546 if (ex instanceof SocketTimeoutException) { 547 LOG.warn("Failed to connect to {}:{}", url.getHost(), url.getPort()); 548 } 549 throw ex; 550 } catch (UndeclaredThrowableException ex) { 551 throw new IOException(ex.getUndeclaredThrowable()); 552 } catch (Exception ex) { 553 throw new IOException(ex); 554 } 555 conn.setUseCaches(false); 556 conn.setRequestMethod(method); 557 if (method.equals(HTTP_POST) || method.equals(HTTP_PUT)) { 558 conn.setDoOutput(true); 559 } 560 conn = configureConnection(conn); 561 return conn; 562 } 563 564 private <T> T call(HttpURLConnection conn, Map jsonOutput, 565 int expectedResponse, Class<T> klass) throws IOException { 566 return call(conn, jsonOutput, expectedResponse, klass, authRetry); 567 } 568 569 private <T> T call(HttpURLConnection conn, Map jsonOutput, 570 int expectedResponse, Class<T> klass, int authRetryCount) 571 throws IOException { 572 T ret = null; 573 try { 574 if (jsonOutput != null) { 575 writeJson(jsonOutput, conn.getOutputStream()); 576 } 577 } catch (IOException ex) { 578 IOUtils.closeStream(conn.getInputStream()); 579 throw ex; 580 } 581 if ((conn.getResponseCode() == HttpURLConnection.HTTP_FORBIDDEN 582 && (conn.getResponseMessage().equals(ANONYMOUS_REQUESTS_DISALLOWED) || 583 conn.getResponseMessage().contains(INVALID_SIGNATURE))) 584 || conn.getResponseCode() == HttpURLConnection.HTTP_UNAUTHORIZED) { 585 // Ideally, this should happen only when there is an Authentication 586 // failure. Unfortunately, the AuthenticationFilter returns 403 when it 587 // cannot authenticate (Since a 401 requires Server to send 588 // WWW-Authenticate header as well).. 589 KMSClientProvider.this.authToken = 590 new DelegationTokenAuthenticatedURL.Token(); 591 if (authRetryCount > 0) { 592 String contentType = conn.getRequestProperty(CONTENT_TYPE); 593 String requestMethod = conn.getRequestMethod(); 594 URL url = conn.getURL(); 595 conn = createConnection(url, requestMethod); 596 conn.setRequestProperty(CONTENT_TYPE, contentType); 597 return call(conn, jsonOutput, expectedResponse, klass, 598 authRetryCount - 1); 599 } 600 } 601 try { 602 AuthenticatedURL.extractToken(conn, authToken); 603 } catch (AuthenticationException e) { 604 // Ignore the AuthExceptions.. since we are just using the method to 605 // extract and set the authToken.. (Workaround till we actually fix 606 // AuthenticatedURL properly to set authToken post initialization) 607 } 608 HttpExceptionUtils.validateResponse(conn, expectedResponse); 609 if (conn.getContentType() != null 610 && conn.getContentType().trim().toLowerCase() 611 .startsWith(APPLICATION_JSON_MIME) 612 && klass != null) { 613 ObjectMapper mapper = new ObjectMapper(); 614 InputStream is = null; 615 try { 616 is = conn.getInputStream(); 617 ret = mapper.readValue(is, klass); 618 } finally { 619 IOUtils.closeStream(is); 620 } 621 } 622 return ret; 623 } 624 625 public static class KMSKeyVersion extends KeyVersion { 626 public KMSKeyVersion(String keyName, String versionName, byte[] material) { 627 super(keyName, versionName, material); 628 } 629 } 630 631 @Override 632 public KeyVersion getKeyVersion(String versionName) throws IOException { 633 checkNotEmpty(versionName, "versionName"); 634 URL url = createURL(KMSRESTConstants.KEY_VERSION_RESOURCE, 635 versionName, null, null); 636 HttpURLConnection conn = createConnection(url, HTTP_GET); 637 Map response = call(conn, null, HttpURLConnection.HTTP_OK, Map.class); 638 return parseJSONKeyVersion(response); 639 } 640 641 @Override 642 public KeyVersion getCurrentKey(String name) throws IOException { 643 checkNotEmpty(name, "name"); 644 URL url = createURL(KMSRESTConstants.KEY_RESOURCE, name, 645 KMSRESTConstants.CURRENT_VERSION_SUB_RESOURCE, null); 646 HttpURLConnection conn = createConnection(url, HTTP_GET); 647 Map response = call(conn, null, HttpURLConnection.HTTP_OK, Map.class); 648 return parseJSONKeyVersion(response); 649 } 650 651 @Override 652 @SuppressWarnings("unchecked") 653 public List<String> getKeys() throws IOException { 654 URL url = createURL(KMSRESTConstants.KEYS_NAMES_RESOURCE, null, null, 655 null); 656 HttpURLConnection conn = createConnection(url, HTTP_GET); 657 List response = call(conn, null, HttpURLConnection.HTTP_OK, List.class); 658 return (List<String>) response; 659 } 660 661 public static class KMSMetadata extends Metadata { 662 public KMSMetadata(String cipher, int bitLength, String description, 663 Map<String, String> attributes, Date created, int versions) { 664 super(cipher, bitLength, description, attributes, created, versions); 665 } 666 } 667 668 // breaking keyNames into sets to keep resulting URL undler 2000 chars 669 private List<String[]> createKeySets(String[] keyNames) { 670 List<String[]> list = new ArrayList<String[]>(); 671 List<String> batch = new ArrayList<String>(); 672 int batchLen = 0; 673 for (String name : keyNames) { 674 int additionalLen = KMSRESTConstants.KEY.length() + 1 + name.length(); 675 batchLen += additionalLen; 676 // topping at 1500 to account for initial URL and encoded names 677 if (batchLen > 1500) { 678 list.add(batch.toArray(new String[batch.size()])); 679 batch = new ArrayList<String>(); 680 batchLen = additionalLen; 681 } 682 batch.add(name); 683 } 684 if (!batch.isEmpty()) { 685 list.add(batch.toArray(new String[batch.size()])); 686 } 687 return list; 688 } 689 690 @Override 691 @SuppressWarnings("unchecked") 692 public Metadata[] getKeysMetadata(String ... keyNames) throws IOException { 693 List<Metadata> keysMetadata = new ArrayList<Metadata>(); 694 List<String[]> keySets = createKeySets(keyNames); 695 for (String[] keySet : keySets) { 696 if (keyNames.length > 0) { 697 Map<String, Object> queryStr = new HashMap<String, Object>(); 698 queryStr.put(KMSRESTConstants.KEY, keySet); 699 URL url = createURL(KMSRESTConstants.KEYS_METADATA_RESOURCE, null, 700 null, queryStr); 701 HttpURLConnection conn = createConnection(url, HTTP_GET); 702 List<Map> list = call(conn, null, HttpURLConnection.HTTP_OK, List.class); 703 for (Map map : list) { 704 keysMetadata.add(parseJSONMetadata(map)); 705 } 706 } 707 } 708 return keysMetadata.toArray(new Metadata[keysMetadata.size()]); 709 } 710 711 private KeyVersion createKeyInternal(String name, byte[] material, 712 Options options) 713 throws NoSuchAlgorithmException, IOException { 714 checkNotEmpty(name, "name"); 715 checkNotNull(options, "options"); 716 Map<String, Object> jsonKey = new HashMap<String, Object>(); 717 jsonKey.put(KMSRESTConstants.NAME_FIELD, name); 718 jsonKey.put(KMSRESTConstants.CIPHER_FIELD, options.getCipher()); 719 jsonKey.put(KMSRESTConstants.LENGTH_FIELD, options.getBitLength()); 720 if (material != null) { 721 jsonKey.put(KMSRESTConstants.MATERIAL_FIELD, 722 Base64.encodeBase64String(material)); 723 } 724 if (options.getDescription() != null) { 725 jsonKey.put(KMSRESTConstants.DESCRIPTION_FIELD, 726 options.getDescription()); 727 } 728 if (options.getAttributes() != null && !options.getAttributes().isEmpty()) { 729 jsonKey.put(KMSRESTConstants.ATTRIBUTES_FIELD, options.getAttributes()); 730 } 731 URL url = createURL(KMSRESTConstants.KEYS_RESOURCE, null, null, null); 732 HttpURLConnection conn = createConnection(url, HTTP_POST); 733 conn.setRequestProperty(CONTENT_TYPE, APPLICATION_JSON_MIME); 734 Map response = call(conn, jsonKey, HttpURLConnection.HTTP_CREATED, 735 Map.class); 736 return parseJSONKeyVersion(response); 737 } 738 739 @Override 740 public KeyVersion createKey(String name, Options options) 741 throws NoSuchAlgorithmException, IOException { 742 return createKeyInternal(name, null, options); 743 } 744 745 @Override 746 public KeyVersion createKey(String name, byte[] material, Options options) 747 throws IOException { 748 checkNotNull(material, "material"); 749 try { 750 return createKeyInternal(name, material, options); 751 } catch (NoSuchAlgorithmException ex) { 752 throw new RuntimeException("It should not happen", ex); 753 } 754 } 755 756 private KeyVersion rollNewVersionInternal(String name, byte[] material) 757 throws NoSuchAlgorithmException, IOException { 758 checkNotEmpty(name, "name"); 759 Map<String, String> jsonMaterial = new HashMap<String, String>(); 760 if (material != null) { 761 jsonMaterial.put(KMSRESTConstants.MATERIAL_FIELD, 762 Base64.encodeBase64String(material)); 763 } 764 URL url = createURL(KMSRESTConstants.KEY_RESOURCE, name, null, null); 765 HttpURLConnection conn = createConnection(url, HTTP_POST); 766 conn.setRequestProperty(CONTENT_TYPE, APPLICATION_JSON_MIME); 767 Map response = call(conn, jsonMaterial, 768 HttpURLConnection.HTTP_OK, Map.class); 769 KeyVersion keyVersion = parseJSONKeyVersion(response); 770 encKeyVersionQueue.drain(name); 771 return keyVersion; 772 } 773 774 775 @Override 776 public KeyVersion rollNewVersion(String name) 777 throws NoSuchAlgorithmException, IOException { 778 return rollNewVersionInternal(name, null); 779 } 780 781 @Override 782 public KeyVersion rollNewVersion(String name, byte[] material) 783 throws IOException { 784 checkNotNull(material, "material"); 785 try { 786 return rollNewVersionInternal(name, material); 787 } catch (NoSuchAlgorithmException ex) { 788 throw new RuntimeException("It should not happen", ex); 789 } 790 } 791 792 @Override 793 public EncryptedKeyVersion generateEncryptedKey( 794 String encryptionKeyName) throws IOException, GeneralSecurityException { 795 try { 796 return encKeyVersionQueue.getNext(encryptionKeyName); 797 } catch (ExecutionException e) { 798 if (e.getCause() instanceof SocketTimeoutException) { 799 throw (SocketTimeoutException)e.getCause(); 800 } 801 throw new IOException(e); 802 } 803 } 804 805 @SuppressWarnings("rawtypes") 806 @Override 807 public KeyVersion decryptEncryptedKey( 808 EncryptedKeyVersion encryptedKeyVersion) throws IOException, 809 GeneralSecurityException { 810 checkNotNull(encryptedKeyVersion.getEncryptionKeyVersionName(), 811 "versionName"); 812 checkNotNull(encryptedKeyVersion.getEncryptedKeyIv(), "iv"); 813 Preconditions.checkArgument( 814 encryptedKeyVersion.getEncryptedKeyVersion().getVersionName() 815 .equals(KeyProviderCryptoExtension.EEK), 816 "encryptedKey version name must be '%s', is '%s'", 817 KeyProviderCryptoExtension.EEK, 818 encryptedKeyVersion.getEncryptedKeyVersion().getVersionName() 819 ); 820 checkNotNull(encryptedKeyVersion.getEncryptedKeyVersion(), "encryptedKey"); 821 Map<String, String> params = new HashMap<String, String>(); 822 params.put(KMSRESTConstants.EEK_OP, KMSRESTConstants.EEK_DECRYPT); 823 Map<String, Object> jsonPayload = new HashMap<String, Object>(); 824 jsonPayload.put(KMSRESTConstants.NAME_FIELD, 825 encryptedKeyVersion.getEncryptionKeyName()); 826 jsonPayload.put(KMSRESTConstants.IV_FIELD, Base64.encodeBase64String( 827 encryptedKeyVersion.getEncryptedKeyIv())); 828 jsonPayload.put(KMSRESTConstants.MATERIAL_FIELD, Base64.encodeBase64String( 829 encryptedKeyVersion.getEncryptedKeyVersion().getMaterial())); 830 URL url = createURL(KMSRESTConstants.KEY_VERSION_RESOURCE, 831 encryptedKeyVersion.getEncryptionKeyVersionName(), 832 KMSRESTConstants.EEK_SUB_RESOURCE, params); 833 HttpURLConnection conn = createConnection(url, HTTP_POST); 834 conn.setRequestProperty(CONTENT_TYPE, APPLICATION_JSON_MIME); 835 Map response = 836 call(conn, jsonPayload, HttpURLConnection.HTTP_OK, Map.class); 837 return parseJSONKeyVersion(response); 838 } 839 840 @Override 841 public List<KeyVersion> getKeyVersions(String name) throws IOException { 842 checkNotEmpty(name, "name"); 843 URL url = createURL(KMSRESTConstants.KEY_RESOURCE, name, 844 KMSRESTConstants.VERSIONS_SUB_RESOURCE, null); 845 HttpURLConnection conn = createConnection(url, HTTP_GET); 846 List response = call(conn, null, HttpURLConnection.HTTP_OK, List.class); 847 List<KeyVersion> versions = null; 848 if (!response.isEmpty()) { 849 versions = new ArrayList<KeyVersion>(); 850 for (Object obj : response) { 851 versions.add(parseJSONKeyVersion((Map) obj)); 852 } 853 } 854 return versions; 855 } 856 857 @Override 858 public Metadata getMetadata(String name) throws IOException { 859 checkNotEmpty(name, "name"); 860 URL url = createURL(KMSRESTConstants.KEY_RESOURCE, name, 861 KMSRESTConstants.METADATA_SUB_RESOURCE, null); 862 HttpURLConnection conn = createConnection(url, HTTP_GET); 863 Map response = call(conn, null, HttpURLConnection.HTTP_OK, Map.class); 864 return parseJSONMetadata(response); 865 } 866 867 @Override 868 public void deleteKey(String name) throws IOException { 869 checkNotEmpty(name, "name"); 870 URL url = createURL(KMSRESTConstants.KEY_RESOURCE, name, null, null); 871 HttpURLConnection conn = createConnection(url, HTTP_DELETE); 872 call(conn, null, HttpURLConnection.HTTP_OK, null); 873 } 874 875 @Override 876 public void flush() throws IOException { 877 // NOP 878 // the client does not keep any local state, thus flushing is not required 879 // because of the client. 880 // the server should not keep in memory state on behalf of clients either. 881 } 882 883 @Override 884 public void warmUpEncryptedKeys(String... keyNames) 885 throws IOException { 886 try { 887 encKeyVersionQueue.initializeQueuesForKeys(keyNames); 888 } catch (ExecutionException e) { 889 throw new IOException(e); 890 } 891 } 892 893 @Override 894 public void drain(String keyName) { 895 encKeyVersionQueue.drain(keyName); 896 } 897 898 @VisibleForTesting 899 public int getEncKeyQueueSize(String keyName) { 900 return encKeyVersionQueue.getSize(keyName); 901 } 902 903 @Override 904 public long renewDelegationToken(final Token<?> dToken) throws IOException { 905 try { 906 final String doAsUser = getDoAsUser(); 907 final DelegationTokenAuthenticatedURL.Token token = 908 generateDelegationToken(dToken); 909 final URL url = createURL(null, null, null, null); 910 LOG.debug("Renewing delegation token {} with url:{}, as:{}", 911 token, url, doAsUser); 912 final DelegationTokenAuthenticatedURL authUrl = 913 new DelegationTokenAuthenticatedURL(configurator); 914 return getActualUgi().doAs( 915 new PrivilegedExceptionAction<Long>() { 916 @Override 917 public Long run() throws Exception { 918 return authUrl.renewDelegationToken(url, token, doAsUser); 919 } 920 } 921 ); 922 } catch (Exception ex) { 923 if (ex instanceof IOException) { 924 throw (IOException) ex; 925 } else { 926 throw new IOException(ex); 927 } 928 } 929 } 930 931 @Override 932 public Void cancelDelegationToken(final Token<?> dToken) throws IOException { 933 try { 934 final String doAsUser = getDoAsUser(); 935 final DelegationTokenAuthenticatedURL.Token token = 936 generateDelegationToken(dToken); 937 return getActualUgi().doAs( 938 new PrivilegedExceptionAction<Void>() { 939 @Override 940 public Void run() throws Exception { 941 final URL url = createURL(null, null, null, null); 942 LOG.debug("Cancelling delegation token {} with url:{}, as:{}", 943 dToken, url, doAsUser); 944 final DelegationTokenAuthenticatedURL authUrl = 945 new DelegationTokenAuthenticatedURL(configurator); 946 authUrl.cancelDelegationToken(url, token, doAsUser); 947 return null; 948 } 949 } 950 ); 951 } catch (Exception ex) { 952 if (ex instanceof IOException) { 953 throw (IOException) ex; 954 } else { 955 throw new IOException(ex); 956 } 957 } 958 } 959 960 /** 961 * Get the doAs user name. 962 * 963 * 'actualUGI' is the UGI of the user creating the client 964 * It is possible that the creator of the KMSClientProvier 965 * calls this method on behalf of a proxyUser (the doAsUser). 966 * In which case this call has to be made as the proxy user. 967 * 968 * @return the doAs user name. 969 * @throws IOException 970 */ 971 private String getDoAsUser() throws IOException { 972 UserGroupInformation currentUgi = UserGroupInformation.getCurrentUser(); 973 return (currentUgi.getAuthenticationMethod() == 974 UserGroupInformation.AuthenticationMethod.PROXY) 975 ? currentUgi.getShortUserName() : null; 976 } 977 978 /** 979 * Generate a DelegationTokenAuthenticatedURL.Token from the given generic 980 * typed delegation token. 981 * 982 * @param dToken The delegation token. 983 * @return The DelegationTokenAuthenticatedURL.Token, with its delegation 984 * token set to the delegation token passed in. 985 */ 986 private DelegationTokenAuthenticatedURL.Token generateDelegationToken( 987 final Token<?> dToken) { 988 DelegationTokenAuthenticatedURL.Token token = 989 new DelegationTokenAuthenticatedURL.Token(); 990 Token<AbstractDelegationTokenIdentifier> dt = 991 new Token<>(dToken.getIdentifier(), dToken.getPassword(), 992 dToken.getKind(), dToken.getService()); 993 token.setDelegationToken(dt); 994 return token; 995 } 996 997 @Override 998 public Token<?>[] addDelegationTokens(final String renewer, 999 Credentials credentials) throws IOException { 1000 Token<?>[] tokens = null; 1001 Text dtService = getDelegationTokenService(); 1002 Token<?> token = credentials.getToken(dtService); 1003 if (token == null) { 1004 final URL url = createURL(null, null, null, null); 1005 final DelegationTokenAuthenticatedURL authUrl = 1006 new DelegationTokenAuthenticatedURL(configurator); 1007 try { 1008 final String doAsUser = getDoAsUser(); 1009 token = getActualUgi().doAs(new PrivilegedExceptionAction<Token<?>>() { 1010 @Override 1011 public Token<?> run() throws Exception { 1012 // Not using the cached token here.. Creating a new token here 1013 // everytime. 1014 return authUrl.getDelegationToken(url, 1015 new DelegationTokenAuthenticatedURL.Token(), renewer, doAsUser); 1016 } 1017 }); 1018 if (token != null) { 1019 credentials.addToken(token.getService(), token); 1020 tokens = new Token<?>[] { token }; 1021 } else { 1022 throw new IOException("Got NULL as delegation token"); 1023 } 1024 } catch (InterruptedException e) { 1025 Thread.currentThread().interrupt(); 1026 } catch (Exception e) { 1027 throw new IOException(e); 1028 } 1029 } 1030 return tokens; 1031 } 1032 1033 private Text getDelegationTokenService() throws IOException { 1034 URL url = new URL(kmsUrl); 1035 InetSocketAddress addr = new InetSocketAddress(url.getHost(), 1036 url.getPort()); 1037 Text dtService = SecurityUtil.buildTokenService(addr); 1038 return dtService; 1039 } 1040 1041 private boolean currentUgiContainsKmsDt() throws IOException { 1042 // Add existing credentials from current UGI, since provider is cached. 1043 Credentials creds = UserGroupInformation.getCurrentUser(). 1044 getCredentials(); 1045 if (!creds.getAllTokens().isEmpty()) { 1046 org.apache.hadoop.security.token.Token<? extends TokenIdentifier> 1047 dToken = creds.getToken(getDelegationTokenService()); 1048 if (dToken != null) { 1049 return true; 1050 } 1051 } 1052 return false; 1053 } 1054 1055 private UserGroupInformation getActualUgi() throws IOException { 1056 final UserGroupInformation currentUgi = UserGroupInformation 1057 .getCurrentUser(); 1058 if (LOG.isDebugEnabled()) { 1059 UserGroupInformation.logAllUserInfo(currentUgi); 1060 } 1061 // Use current user by default 1062 UserGroupInformation actualUgi = currentUgi; 1063 if (currentUgi.getRealUser() != null) { 1064 // Use real user for proxy user 1065 actualUgi = currentUgi.getRealUser(); 1066 } else if (!currentUgiContainsKmsDt() && 1067 !currentUgi.hasKerberosCredentials()) { 1068 // Use login user for user that does not have either 1069 // Kerberos credential or KMS delegation token for KMS operations 1070 actualUgi = currentUgi.getLoginUser(); 1071 } 1072 return actualUgi; 1073 } 1074 1075 /** 1076 * Shutdown valueQueue executor threads 1077 */ 1078 @Override 1079 public void close() throws IOException { 1080 try { 1081 encKeyVersionQueue.shutdown(); 1082 } catch (Exception e) { 1083 throw new IOException(e); 1084 } finally { 1085 if (sslFactory != null) { 1086 sslFactory.destroy(); 1087 sslFactory = null; 1088 } 1089 } 1090 } 1091 1092 @VisibleForTesting 1093 String getKMSUrl() { 1094 return kmsUrl; 1095 } 1096}