001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.mapreduce.security;
020    
021    import java.io.IOException;
022    import java.util.HashSet;
023    import java.util.Set;
024    
025    import org.apache.commons.logging.Log;
026    import org.apache.commons.logging.LogFactory;
027    import org.apache.hadoop.classification.InterfaceAudience;
028    import org.apache.hadoop.classification.InterfaceStability;
029    import org.apache.hadoop.conf.Configuration;
030    import org.apache.hadoop.fs.FileSystem;
031    import org.apache.hadoop.fs.Path;
032    import org.apache.hadoop.io.Text;
033    import org.apache.hadoop.mapred.JobConf;
034    import org.apache.hadoop.mapred.Master;
035    import org.apache.hadoop.mapreduce.MRJobConfig;
036    import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
037    import org.apache.hadoop.security.Credentials;
038    import org.apache.hadoop.security.UserGroupInformation;
039    import org.apache.hadoop.security.token.Token;
040    import org.apache.hadoop.security.token.TokenIdentifier;
041    
042    
043    /**
044     * This class provides user facing APIs for transferring secrets from
045     * the job client to the tasks.
046     * The secrets can be stored just before submission of jobs and read during
047     * the task execution.  
048     */
049    @InterfaceAudience.Public
050    @InterfaceStability.Evolving
051    public class TokenCache {
052      
053      private static final Log LOG = LogFactory.getLog(TokenCache.class);
054    
055      
056      /**
057       * auxiliary method to get user's secret keys..
058       * @param alias
059       * @return secret key from the storage
060       */
061      public static byte[] getSecretKey(Credentials credentials, Text alias) {
062        if(credentials == null)
063          return null;
064        return credentials.getSecretKey(alias);
065      }
066      
067      /**
068       * Convenience method to obtain delegation tokens from namenodes 
069       * corresponding to the paths passed.
070       * @param credentials
071       * @param ps array of paths
072       * @param conf configuration
073       * @throws IOException
074       */
075      public static void obtainTokensForNamenodes(Credentials credentials,
076          Path[] ps, Configuration conf) throws IOException {
077        if (!UserGroupInformation.isSecurityEnabled()) {
078          return;
079        }
080        obtainTokensForNamenodesInternal(credentials, ps, conf);
081      }
082    
083      /**
084       * Remove jobtoken referrals which don't make sense in the context
085       * of the task execution.
086       *
087       * @param conf
088       */
089      public static void cleanUpTokenReferral(Configuration conf) {
090        conf.unset(MRJobConfig.MAPREDUCE_JOB_CREDENTIALS_BINARY);
091      }
092    
093      static void obtainTokensForNamenodesInternal(Credentials credentials,
094          Path[] ps, Configuration conf) throws IOException {
095        Set<FileSystem> fsSet = new HashSet<FileSystem>();
096        for(Path p: ps) {
097          fsSet.add(p.getFileSystem(conf));
098        }
099        for (FileSystem fs : fsSet) {
100          obtainTokensForNamenodesInternal(fs, credentials, conf);
101        }
102      }
103    
104      /**
105       * get delegation token for a specific FS
106       * @param fs
107       * @param credentials
108       * @param p
109       * @param conf
110       * @throws IOException
111       */
112      static void obtainTokensForNamenodesInternal(FileSystem fs, 
113          Credentials credentials, Configuration conf) throws IOException {
114        String delegTokenRenewer = Master.getMasterPrincipal(conf);
115        if (delegTokenRenewer == null || delegTokenRenewer.length() == 0) {
116          throw new IOException(
117              "Can't get Master Kerberos principal for use as renewer");
118        }
119        mergeBinaryTokens(credentials, conf);
120    
121        final Token<?> tokens[] = fs.addDelegationTokens(delegTokenRenewer,
122                                                         credentials);
123        if (tokens != null) {
124          for (Token<?> token : tokens) {
125            LOG.info("Got dt for " + fs.getUri() + "; "+token);
126          }
127        }
128      }
129    
130      private static void mergeBinaryTokens(Credentials creds, Configuration conf) {
131        String binaryTokenFilename =
132            conf.get(MRJobConfig.MAPREDUCE_JOB_CREDENTIALS_BINARY);
133        if (binaryTokenFilename != null) {
134          Credentials binary;
135          try {
136            binary = Credentials.readTokenStorageFile(
137                FileSystem.getLocal(conf).makeQualified(
138                    new Path(binaryTokenFilename)),
139                conf);
140          } catch (IOException e) {
141            throw new RuntimeException(e);
142          }
143          // supplement existing tokens with the tokens in the binary file
144          creds.mergeAll(binary);
145        }
146      }
147      
148      /**
149       * file name used on HDFS for generated job token
150       */
151      @InterfaceAudience.Private
152      public static final String JOB_TOKEN_HDFS_FILE = "jobToken";
153    
154      /**
155       * conf setting for job tokens cache file name
156       */
157      @InterfaceAudience.Private
158      public static final String JOB_TOKENS_FILENAME = "mapreduce.job.jobTokenFile";
159      private static final Text JOB_TOKEN = new Text("JobToken");
160      private static final Text SHUFFLE_TOKEN = new Text("MapReduceShuffleToken");
161      
162      /**
163       * load job token from a file
164       * @deprecated Use {@link Credentials#readTokenStorageFile} instead,
165       * this method is included for compatibility against Hadoop-1.
166       * @param conf
167       * @throws IOException
168       */
169      @InterfaceAudience.Private
170      @Deprecated
171      public static Credentials loadTokens(String jobTokenFile, JobConf conf)
172      throws IOException {
173        Path localJobTokenFile = new Path ("file:///" + jobTokenFile);
174    
175        Credentials ts = Credentials.readTokenStorageFile(localJobTokenFile, conf);
176    
177        if(LOG.isDebugEnabled()) {
178          LOG.debug("Task: Loaded jobTokenFile from: "+
179              localJobTokenFile.toUri().getPath() 
180              +"; num of sec keys  = " + ts.numberOfSecretKeys() +
181              " Number of tokens " +  ts.numberOfTokens());
182        }
183        return ts;
184      }
185      
186      /**
187       * load job token from a file
188       * @deprecated Use {@link Credentials#readTokenStorageFile} instead,
189       * this method is included for compatibility against Hadoop-1.
190       * @param conf
191       * @throws IOException
192       */
193      @InterfaceAudience.Private
194      @Deprecated
195      public static Credentials loadTokens(String jobTokenFile, Configuration conf)
196          throws IOException {
197        return loadTokens(jobTokenFile, new JobConf(conf));
198      }
199      
200      /**
201       * store job token
202       * @param t
203       */
204      @InterfaceAudience.Private
205      public static void setJobToken(Token<? extends TokenIdentifier> t, 
206          Credentials credentials) {
207        credentials.addToken(JOB_TOKEN, t);
208      }
209      /**
210       * 
211       * @return job token
212       */
213      @SuppressWarnings("unchecked")
214      @InterfaceAudience.Private
215      public static Token<JobTokenIdentifier> getJobToken(Credentials credentials) {
216        return (Token<JobTokenIdentifier>) credentials.getToken(JOB_TOKEN);
217      }
218    
219      @InterfaceAudience.Private
220      public static void setShuffleSecretKey(byte[] key, Credentials credentials) {
221        credentials.addSecretKey(SHUFFLE_TOKEN, key);
222      }
223    
224      @InterfaceAudience.Private
225      public static byte[] getShuffleSecretKey(Credentials credentials) {
226        return getSecretKey(credentials, SHUFFLE_TOKEN);
227      }
228    
229      /**
230       * @deprecated Use {@link Credentials#getToken(org.apache.hadoop.io.Text)}
231       * instead, this method is included for compatibility against Hadoop-1
232       * @param namenode
233       * @return delegation token
234       */
235      @InterfaceAudience.Private
236      @Deprecated
237      public static
238          Token<?> getDelegationToken(
239              Credentials credentials, String namenode) {
240        return (Token<?>) credentials.getToken(new Text(
241          namenode));
242      }
243    }