001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.mapreduce.security; 020 021import java.io.IOException; 022import java.util.HashSet; 023import java.util.Set; 024 025import org.apache.commons.logging.Log; 026import org.apache.commons.logging.LogFactory; 027import org.apache.hadoop.classification.InterfaceAudience; 028import org.apache.hadoop.classification.InterfaceStability; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.fs.FileSystem; 031import org.apache.hadoop.fs.Path; 032import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; 033import org.apache.hadoop.io.Text; 034import org.apache.hadoop.mapred.JobConf; 035import org.apache.hadoop.mapred.Master; 036import org.apache.hadoop.mapreduce.MRJobConfig; 037import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier; 038import org.apache.hadoop.security.Credentials; 039import org.apache.hadoop.security.UserGroupInformation; 040import org.apache.hadoop.security.token.Token; 041import org.apache.hadoop.security.token.TokenIdentifier; 042 043 044/** 045 * This class provides user facing APIs for transferring secrets from 046 * the job client to the tasks. 047 * The secrets can be stored just before submission of jobs and read during 048 * the task execution. 049 */ 050@InterfaceAudience.Public 051@InterfaceStability.Evolving 052public class TokenCache { 053 054 private static final Log LOG = LogFactory.getLog(TokenCache.class); 055 056 057 /** 058 * auxiliary method to get user's secret keys.. 059 * @param alias 060 * @return secret key from the storage 061 */ 062 public static byte[] getSecretKey(Credentials credentials, Text alias) { 063 if(credentials == null) 064 return null; 065 return credentials.getSecretKey(alias); 066 } 067 068 /** 069 * Convenience method to obtain delegation tokens from namenodes 070 * corresponding to the paths passed. 071 * @param credentials 072 * @param ps array of paths 073 * @param conf configuration 074 * @throws IOException 075 */ 076 public static void obtainTokensForNamenodes(Credentials credentials, 077 Path[] ps, Configuration conf) throws IOException { 078 if (!UserGroupInformation.isSecurityEnabled()) { 079 return; 080 } 081 obtainTokensForNamenodesInternal(credentials, ps, conf); 082 } 083 084 /** 085 * Remove jobtoken referrals which don't make sense in the context 086 * of the task execution. 087 * 088 * @param conf 089 */ 090 public static void cleanUpTokenReferral(Configuration conf) { 091 conf.unset(MRJobConfig.MAPREDUCE_JOB_CREDENTIALS_BINARY); 092 } 093 094 static void obtainTokensForNamenodesInternal(Credentials credentials, 095 Path[] ps, Configuration conf) throws IOException { 096 Set<FileSystem> fsSet = new HashSet<FileSystem>(); 097 for(Path p: ps) { 098 fsSet.add(p.getFileSystem(conf)); 099 } 100 for (FileSystem fs : fsSet) { 101 obtainTokensForNamenodesInternal(fs, credentials, conf); 102 } 103 } 104 105 /** 106 * get delegation token for a specific FS 107 * @param fs 108 * @param credentials 109 * @param p 110 * @param conf 111 * @throws IOException 112 */ 113 static void obtainTokensForNamenodesInternal(FileSystem fs, 114 Credentials credentials, Configuration conf) throws IOException { 115 String delegTokenRenewer = Master.getMasterPrincipal(conf); 116 if (delegTokenRenewer == null || delegTokenRenewer.length() == 0) { 117 throw new IOException( 118 "Can't get Master Kerberos principal for use as renewer"); 119 } 120 mergeBinaryTokens(credentials, conf); 121 122 final Token<?> tokens[] = fs.addDelegationTokens(delegTokenRenewer, 123 credentials); 124 if (tokens != null) { 125 for (Token<?> token : tokens) { 126 LOG.info("Got dt for " + fs.getUri() + "; "+token); 127 } 128 } 129 } 130 131 private static void mergeBinaryTokens(Credentials creds, Configuration conf) { 132 String binaryTokenFilename = 133 conf.get(MRJobConfig.MAPREDUCE_JOB_CREDENTIALS_BINARY); 134 if (binaryTokenFilename != null) { 135 Credentials binary; 136 try { 137 binary = Credentials.readTokenStorageFile( 138 new Path("file:///" + binaryTokenFilename), conf); 139 } catch (IOException e) { 140 throw new RuntimeException(e); 141 } 142 // supplement existing tokens with the tokens in the binary file 143 creds.mergeAll(binary); 144 } 145 } 146 147 /** 148 * file name used on HDFS for generated job token 149 */ 150 @InterfaceAudience.Private 151 public static final String JOB_TOKEN_HDFS_FILE = "jobToken"; 152 153 /** 154 * conf setting for job tokens cache file name 155 */ 156 @InterfaceAudience.Private 157 public static final String JOB_TOKENS_FILENAME = "mapreduce.job.jobTokenFile"; 158 private static final Text JOB_TOKEN = new Text("JobToken"); 159 private static final Text SHUFFLE_TOKEN = new Text("MapReduceShuffleToken"); 160 161 /** 162 * load job token from a file 163 * @param conf 164 * @throws IOException 165 */ 166 @InterfaceAudience.Private 167 public static Credentials loadTokens(String jobTokenFile, JobConf conf) 168 throws IOException { 169 Path localJobTokenFile = new Path ("file:///" + jobTokenFile); 170 171 Credentials ts = Credentials.readTokenStorageFile(localJobTokenFile, conf); 172 173 if(LOG.isDebugEnabled()) { 174 LOG.debug("Task: Loaded jobTokenFile from: "+ 175 localJobTokenFile.toUri().getPath() 176 +"; num of sec keys = " + ts.numberOfSecretKeys() + 177 " Number of tokens " + ts.numberOfTokens()); 178 } 179 return ts; 180 } 181 /** 182 * store job token 183 * @param t 184 */ 185 @InterfaceAudience.Private 186 public static void setJobToken(Token<? extends TokenIdentifier> t, 187 Credentials credentials) { 188 credentials.addToken(JOB_TOKEN, t); 189 } 190 /** 191 * 192 * @return job token 193 */ 194 @SuppressWarnings("unchecked") 195 @InterfaceAudience.Private 196 public static Token<JobTokenIdentifier> getJobToken(Credentials credentials) { 197 return (Token<JobTokenIdentifier>) credentials.getToken(JOB_TOKEN); 198 } 199 200 @InterfaceAudience.Private 201 public static void setShuffleSecretKey(byte[] key, Credentials credentials) { 202 credentials.addSecretKey(SHUFFLE_TOKEN, key); 203 } 204 205 @InterfaceAudience.Private 206 public static byte[] getShuffleSecretKey(Credentials credentials) { 207 return getSecretKey(credentials, SHUFFLE_TOKEN); 208 } 209}