001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.mapreduce.security;
020    
021    import java.io.IOException;
022    import java.util.HashSet;
023    import java.util.Set;
024    
025    import org.apache.commons.logging.Log;
026    import org.apache.commons.logging.LogFactory;
027    import org.apache.hadoop.classification.InterfaceAudience;
028    import org.apache.hadoop.classification.InterfaceStability;
029    import org.apache.hadoop.conf.Configuration;
030    import org.apache.hadoop.fs.FileSystem;
031    import org.apache.hadoop.fs.Path;
032    import org.apache.hadoop.io.Text;
033    import org.apache.hadoop.mapred.JobConf;
034    import org.apache.hadoop.mapred.Master;
035    import org.apache.hadoop.mapreduce.MRJobConfig;
036    import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
037    import org.apache.hadoop.security.Credentials;
038    import org.apache.hadoop.security.UserGroupInformation;
039    import org.apache.hadoop.security.token.Token;
040    import org.apache.hadoop.security.token.TokenIdentifier;
041    
042    
043    /**
044     * This class provides user facing APIs for transferring secrets from
045     * the job client to the tasks.
046     * The secrets can be stored just before submission of jobs and read during
047     * the task execution.  
048     */
049    @InterfaceAudience.Public
050    @InterfaceStability.Evolving
051    public class TokenCache {
052      
053      private static final Log LOG = LogFactory.getLog(TokenCache.class);
054    
055      
056      /**
057       * auxiliary method to get user's secret keys..
058       * @param alias
059       * @return secret key from the storage
060       */
061      public static byte[] getSecretKey(Credentials credentials, Text alias) {
062        if(credentials == null)
063          return null;
064        return credentials.getSecretKey(alias);
065      }
066      
067      /**
068       * Convenience method to obtain delegation tokens from namenodes 
069       * corresponding to the paths passed.
070       * @param credentials
071       * @param ps array of paths
072       * @param conf configuration
073       * @throws IOException
074       */
075      public static void obtainTokensForNamenodes(Credentials credentials,
076          Path[] ps, Configuration conf) throws IOException {
077        if (!UserGroupInformation.isSecurityEnabled()) {
078          return;
079        }
080        obtainTokensForNamenodesInternal(credentials, ps, conf);
081      }
082    
083      /**
084       * Remove jobtoken referrals which don't make sense in the context
085       * of the task execution.
086       *
087       * @param conf
088       */
089      public static void cleanUpTokenReferral(Configuration conf) {
090        conf.unset(MRJobConfig.MAPREDUCE_JOB_CREDENTIALS_BINARY);
091      }
092    
093      static void obtainTokensForNamenodesInternal(Credentials credentials,
094          Path[] ps, Configuration conf) throws IOException {
095        Set<FileSystem> fsSet = new HashSet<FileSystem>();
096        for(Path p: ps) {
097          fsSet.add(p.getFileSystem(conf));
098        }
099        for (FileSystem fs : fsSet) {
100          obtainTokensForNamenodesInternal(fs, credentials, conf);
101        }
102      }
103    
104      /**
105       * get delegation token for a specific FS
106       * @param fs
107       * @param credentials
108       * @param p
109       * @param conf
110       * @throws IOException
111       */
112      static void obtainTokensForNamenodesInternal(FileSystem fs, 
113          Credentials credentials, Configuration conf) throws IOException {
114        String delegTokenRenewer = Master.getMasterPrincipal(conf);
115        if (delegTokenRenewer == null || delegTokenRenewer.length() == 0) {
116          throw new IOException(
117              "Can't get Master Kerberos principal for use as renewer");
118        }
119        mergeBinaryTokens(credentials, conf);
120    
121        final Token<?> tokens[] = fs.addDelegationTokens(delegTokenRenewer,
122                                                         credentials);
123        if (tokens != null) {
124          for (Token<?> token : tokens) {
125            LOG.info("Got dt for " + fs.getUri() + "; "+token);
126          }
127        }
128      }
129    
130      private static void mergeBinaryTokens(Credentials creds, Configuration conf) {
131        String binaryTokenFilename =
132            conf.get(MRJobConfig.MAPREDUCE_JOB_CREDENTIALS_BINARY);
133        if (binaryTokenFilename != null) {
134          Credentials binary;
135          try {
136            binary = Credentials.readTokenStorageFile(
137                new Path("file:///" +  binaryTokenFilename), conf);
138          } catch (IOException e) {
139            throw new RuntimeException(e);
140          }
141          // supplement existing tokens with the tokens in the binary file
142          creds.mergeAll(binary);
143        }
144      }
145      
146      /**
147       * file name used on HDFS for generated job token
148       */
149      @InterfaceAudience.Private
150      public static final String JOB_TOKEN_HDFS_FILE = "jobToken";
151    
152      /**
153       * conf setting for job tokens cache file name
154       */
155      @InterfaceAudience.Private
156      public static final String JOB_TOKENS_FILENAME = "mapreduce.job.jobTokenFile";
157      private static final Text JOB_TOKEN = new Text("JobToken");
158      private static final Text SHUFFLE_TOKEN = new Text("MapReduceShuffleToken");
159      
160      /**
161       * load job token from a file
162       * @deprecated Use {@link Credentials#readTokenStorageFile} instead,
163       * this method is included for compatibility against Hadoop-1.
164       * @param conf
165       * @throws IOException
166       */
167      @InterfaceAudience.Private
168      @Deprecated
169      public static Credentials loadTokens(String jobTokenFile, JobConf conf)
170      throws IOException {
171        Path localJobTokenFile = new Path ("file:///" + jobTokenFile);
172    
173        Credentials ts = Credentials.readTokenStorageFile(localJobTokenFile, conf);
174    
175        if(LOG.isDebugEnabled()) {
176          LOG.debug("Task: Loaded jobTokenFile from: "+
177              localJobTokenFile.toUri().getPath() 
178              +"; num of sec keys  = " + ts.numberOfSecretKeys() +
179              " Number of tokens " +  ts.numberOfTokens());
180        }
181        return ts;
182      }
183      
184      /**
185       * load job token from a file
186       * @deprecated Use {@link Credentials#readTokenStorageFile} instead,
187       * this method is included for compatibility against Hadoop-1.
188       * @param conf
189       * @throws IOException
190       */
191      @InterfaceAudience.Private
192      @Deprecated
193      public static Credentials loadTokens(String jobTokenFile, Configuration conf)
194          throws IOException {
195        return loadTokens(jobTokenFile, new JobConf(conf));
196      }
197      
198      /**
199       * store job token
200       * @param t
201       */
202      @InterfaceAudience.Private
203      public static void setJobToken(Token<? extends TokenIdentifier> t, 
204          Credentials credentials) {
205        credentials.addToken(JOB_TOKEN, t);
206      }
207      /**
208       * 
209       * @return job token
210       */
211      @SuppressWarnings("unchecked")
212      @InterfaceAudience.Private
213      public static Token<JobTokenIdentifier> getJobToken(Credentials credentials) {
214        return (Token<JobTokenIdentifier>) credentials.getToken(JOB_TOKEN);
215      }
216    
217      @InterfaceAudience.Private
218      public static void setShuffleSecretKey(byte[] key, Credentials credentials) {
219        credentials.addSecretKey(SHUFFLE_TOKEN, key);
220      }
221    
222      @InterfaceAudience.Private
223      public static byte[] getShuffleSecretKey(Credentials credentials) {
224        return getSecretKey(credentials, SHUFFLE_TOKEN);
225      }
226    
227      /**
228       * @deprecated Use {@link Credentials#getToken(org.apache.hadoop.io.Text)}
229       * instead, this method is included for compatibility against Hadoop-1
230       * @param namenode
231       * @return delegation token
232       */
233      @InterfaceAudience.Private
234      @Deprecated
235      public static
236          Token<?> getDelegationToken(
237              Credentials credentials, String namenode) {
238        return (Token<?>) credentials.getToken(new Text(
239          namenode));
240      }
241    }