001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.mapreduce.security;
020
021import java.io.IOException;
022import java.util.HashSet;
023import java.util.Set;
024
025import org.apache.commons.logging.Log;
026import org.apache.commons.logging.LogFactory;
027import org.apache.hadoop.classification.InterfaceAudience;
028import org.apache.hadoop.classification.InterfaceStability;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.fs.FileSystem;
031import org.apache.hadoop.fs.Path;
032import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
033import org.apache.hadoop.io.Text;
034import org.apache.hadoop.mapred.JobConf;
035import org.apache.hadoop.mapred.Master;
036import org.apache.hadoop.mapreduce.MRJobConfig;
037import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
038import org.apache.hadoop.security.Credentials;
039import org.apache.hadoop.security.UserGroupInformation;
040import org.apache.hadoop.security.token.Token;
041import org.apache.hadoop.security.token.TokenIdentifier;
042
043
044/**
045 * This class provides user facing APIs for transferring secrets from
046 * the job client to the tasks.
047 * The secrets can be stored just before submission of jobs and read during
048 * the task execution.  
049 */
050@InterfaceAudience.Public
051@InterfaceStability.Evolving
052public class TokenCache {
053  
054  private static final Log LOG = LogFactory.getLog(TokenCache.class);
055
056  
057  /**
058   * auxiliary method to get user's secret keys..
059   * @param alias
060   * @return secret key from the storage
061   */
062  public static byte[] getSecretKey(Credentials credentials, Text alias) {
063    if(credentials == null)
064      return null;
065    return credentials.getSecretKey(alias);
066  }
067  
068  /**
069   * Convenience method to obtain delegation tokens from namenodes 
070   * corresponding to the paths passed.
071   * @param credentials
072   * @param ps array of paths
073   * @param conf configuration
074   * @throws IOException
075   */
076  public static void obtainTokensForNamenodes(Credentials credentials,
077      Path[] ps, Configuration conf) throws IOException {
078    if (!UserGroupInformation.isSecurityEnabled()) {
079      return;
080    }
081    obtainTokensForNamenodesInternal(credentials, ps, conf);
082  }
083
084  /**
085   * Remove jobtoken referrals which don't make sense in the context
086   * of the task execution.
087   *
088   * @param conf
089   */
090  public static void cleanUpTokenReferral(Configuration conf) {
091    conf.unset(MRJobConfig.MAPREDUCE_JOB_CREDENTIALS_BINARY);
092  }
093
094  static void obtainTokensForNamenodesInternal(Credentials credentials,
095      Path[] ps, Configuration conf) throws IOException {
096    Set<FileSystem> fsSet = new HashSet<FileSystem>();
097    for(Path p: ps) {
098      fsSet.add(p.getFileSystem(conf));
099    }
100    for (FileSystem fs : fsSet) {
101      obtainTokensForNamenodesInternal(fs, credentials, conf);
102    }
103  }
104
105  /**
106   * get delegation token for a specific FS
107   * @param fs
108   * @param credentials
109   * @param p
110   * @param conf
111   * @throws IOException
112   */
113  static void obtainTokensForNamenodesInternal(FileSystem fs, 
114      Credentials credentials, Configuration conf) throws IOException {
115    String delegTokenRenewer = Master.getMasterPrincipal(conf);
116    if (delegTokenRenewer == null || delegTokenRenewer.length() == 0) {
117      throw new IOException(
118          "Can't get Master Kerberos principal for use as renewer");
119    }
120    mergeBinaryTokens(credentials, conf);
121
122    final Token<?> tokens[] = fs.addDelegationTokens(delegTokenRenewer,
123                                                     credentials);
124    if (tokens != null) {
125      for (Token<?> token : tokens) {
126        LOG.info("Got dt for " + fs.getUri() + "; "+token);
127      }
128    }
129  }
130
131  private static void mergeBinaryTokens(Credentials creds, Configuration conf) {
132    String binaryTokenFilename =
133        conf.get(MRJobConfig.MAPREDUCE_JOB_CREDENTIALS_BINARY);
134    if (binaryTokenFilename != null) {
135      Credentials binary;
136      try {
137        binary = Credentials.readTokenStorageFile(
138            new Path("file:///" +  binaryTokenFilename), conf);
139      } catch (IOException e) {
140        throw new RuntimeException(e);
141      }
142      // supplement existing tokens with the tokens in the binary file
143      creds.mergeAll(binary);
144    }
145  }
146  
147  /**
148   * file name used on HDFS for generated job token
149   */
150  @InterfaceAudience.Private
151  public static final String JOB_TOKEN_HDFS_FILE = "jobToken";
152
153  /**
154   * conf setting for job tokens cache file name
155   */
156  @InterfaceAudience.Private
157  public static final String JOB_TOKENS_FILENAME = "mapreduce.job.jobTokenFile";
158  private static final Text JOB_TOKEN = new Text("JobToken");
159  private static final Text SHUFFLE_TOKEN = new Text("MapReduceShuffleToken");
160  
161  /**
162   * load job token from a file
163   * @param conf
164   * @throws IOException
165   */
166  @InterfaceAudience.Private
167  public static Credentials loadTokens(String jobTokenFile, JobConf conf) 
168  throws IOException {
169    Path localJobTokenFile = new Path ("file:///" + jobTokenFile);
170
171    Credentials ts = Credentials.readTokenStorageFile(localJobTokenFile, conf);
172
173    if(LOG.isDebugEnabled()) {
174      LOG.debug("Task: Loaded jobTokenFile from: "+
175          localJobTokenFile.toUri().getPath() 
176          +"; num of sec keys  = " + ts.numberOfSecretKeys() +
177          " Number of tokens " +  ts.numberOfTokens());
178    }
179    return ts;
180  }
181  /**
182   * store job token
183   * @param t
184   */
185  @InterfaceAudience.Private
186  public static void setJobToken(Token<? extends TokenIdentifier> t, 
187      Credentials credentials) {
188    credentials.addToken(JOB_TOKEN, t);
189  }
190  /**
191   * 
192   * @return job token
193   */
194  @SuppressWarnings("unchecked")
195  @InterfaceAudience.Private
196  public static Token<JobTokenIdentifier> getJobToken(Credentials credentials) {
197    return (Token<JobTokenIdentifier>) credentials.getToken(JOB_TOKEN);
198  }
199
200  @InterfaceAudience.Private
201  public static void setShuffleSecretKey(byte[] key, Credentials credentials) {
202    credentials.addSecretKey(SHUFFLE_TOKEN, key);
203  }
204
205  @InterfaceAudience.Private
206  public static byte[] getShuffleSecretKey(Credentials credentials) {
207    return getSecretKey(credentials, SHUFFLE_TOKEN);
208  }
209}