001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.registry.client.binding; 020 021import com.google.common.annotations.VisibleForTesting; 022import com.google.common.base.Preconditions; 023import org.apache.commons.lang.StringUtils; 024import org.apache.hadoop.classification.InterfaceAudience; 025import org.apache.hadoop.classification.InterfaceStability; 026import org.apache.hadoop.fs.PathNotFoundException; 027import org.apache.hadoop.security.UserGroupInformation; 028import org.apache.hadoop.registry.client.api.RegistryConstants; 029import org.apache.hadoop.registry.client.api.RegistryOperations; 030import org.apache.hadoop.registry.client.exceptions.InvalidPathnameException; 031import org.apache.hadoop.registry.client.exceptions.InvalidRecordException; 032import org.apache.hadoop.registry.client.exceptions.NoRecordException; 033import org.apache.hadoop.registry.client.impl.zk.RegistryInternalConstants; 034import org.apache.hadoop.registry.client.types.RegistryPathStatus; 035import org.apache.hadoop.registry.client.types.ServiceRecord; 036import org.slf4j.Logger; 037import org.slf4j.LoggerFactory; 038 039import static org.apache.hadoop.registry.client.binding.RegistryPathUtils.*; 040 041import java.io.EOFException; 042import java.io.IOException; 043import java.util.Collection; 044import java.util.HashMap; 045import java.util.List; 046import java.util.Locale; 047import java.util.Map; 048 049/** 050 * Utility methods for working with a registry. 051 */ 052@InterfaceAudience.Public 053@InterfaceStability.Evolving 054public class RegistryUtils { 055 private static final Logger LOG = 056 LoggerFactory.getLogger(RegistryUtils.class); 057 058 /** 059 * Buld the user path -switches to the system path if the user is "". 060 * It also cross-converts the username to ascii via punycode 061 * @param username username or "" 062 * @return the path to the user 063 */ 064 public static String homePathForUser(String username) { 065 Preconditions.checkArgument(username != null, "null user"); 066 067 // catch recursion 068 if (username.startsWith(RegistryConstants.PATH_USERS)) { 069 return username; 070 } 071 if (username.isEmpty()) { 072 return RegistryConstants.PATH_SYSTEM_SERVICES; 073 } 074 075 // convert username to registry name 076 String convertedName = convertUsername(username); 077 078 return RegistryPathUtils.join(RegistryConstants.PATH_USERS, 079 encodeForRegistry(convertedName)); 080 } 081 082 /** 083 * Convert the username to that which can be used for registry 084 * entries. Lower cases it, 085 * Strip the kerberos realm off a username if needed, and any "/" hostname 086 * entries 087 * @param username user 088 * @return the converted username 089 */ 090 public static String convertUsername(String username) { 091 String converted = 092 org.apache.hadoop.util.StringUtils.toLowerCase(username); 093 int atSymbol = converted.indexOf('@'); 094 if (atSymbol > 0) { 095 converted = converted.substring(0, atSymbol); 096 } 097 int slashSymbol = converted.indexOf('/'); 098 if (slashSymbol > 0) { 099 converted = converted.substring(0, slashSymbol); 100 } 101 return converted; 102 } 103 104 /** 105 * Create a service classpath 106 * @param user username or "" 107 * @param serviceClass service name 108 * @return a full path 109 */ 110 public static String serviceclassPath(String user, 111 String serviceClass) { 112 String services = join(homePathForUser(user), 113 RegistryConstants.PATH_USER_SERVICES); 114 return join(services, 115 serviceClass); 116 } 117 118 /** 119 * Create a path to a service under a user and service class 120 * @param user username or "" 121 * @param serviceClass service name 122 * @param serviceName service name unique for that user and service class 123 * @return a full path 124 */ 125 public static String servicePath(String user, 126 String serviceClass, 127 String serviceName) { 128 129 return join( 130 serviceclassPath(user, serviceClass), 131 serviceName); 132 } 133 134 /** 135 * Create a path for listing components under a service 136 * @param user username or "" 137 * @param serviceClass service name 138 * @param serviceName service name unique for that user and service class 139 * @return a full path 140 */ 141 public static String componentListPath(String user, 142 String serviceClass, String serviceName) { 143 144 return join(servicePath(user, serviceClass, serviceName), 145 RegistryConstants.SUBPATH_COMPONENTS); 146 } 147 148 /** 149 * Create the path to a service record for a component 150 * @param user username or "" 151 * @param serviceClass service name 152 * @param serviceName service name unique for that user and service class 153 * @param componentName unique name/ID of the component 154 * @return a full path 155 */ 156 public static String componentPath(String user, 157 String serviceClass, String serviceName, String componentName) { 158 159 return join( 160 componentListPath(user, serviceClass, serviceName), 161 componentName); 162 } 163 164 /** 165 * List service records directly under a path 166 * @param registryOperations registry operations instance 167 * @param path path to list 168 * @return a mapping of the service records that were resolved, indexed 169 * by their full path 170 * @throws IOException 171 */ 172 public static Map<String, ServiceRecord> listServiceRecords( 173 RegistryOperations registryOperations, 174 String path) throws IOException { 175 Map<String, RegistryPathStatus> children = 176 statChildren(registryOperations, path); 177 return extractServiceRecords(registryOperations, 178 path, 179 children.values()); 180 } 181 182 /** 183 * List children of a directory and retrieve their 184 * {@link RegistryPathStatus} values. 185 * <p> 186 * This is not an atomic operation; A child may be deleted 187 * during the iteration through the child entries. If this happens, 188 * the <code>PathNotFoundException</code> is caught and that child 189 * entry ommitted. 190 * 191 * @param path path 192 * @return a possibly empty map of child entries listed by 193 * their short name. 194 * @throws PathNotFoundException path is not in the registry. 195 * @throws InvalidPathnameException the path is invalid. 196 * @throws IOException Any other IO Exception 197 */ 198 public static Map<String, RegistryPathStatus> statChildren( 199 RegistryOperations registryOperations, 200 String path) 201 throws PathNotFoundException, 202 InvalidPathnameException, 203 IOException { 204 List<String> childNames = registryOperations.list(path); 205 Map<String, RegistryPathStatus> results = 206 new HashMap<String, RegistryPathStatus>(); 207 for (String childName : childNames) { 208 String child = join(path, childName); 209 try { 210 RegistryPathStatus stat = registryOperations.stat(child); 211 results.put(childName, stat); 212 } catch (PathNotFoundException pnfe) { 213 if (LOG.isDebugEnabled()) { 214 LOG.debug("stat failed on {}: moved? {}", child, pnfe, pnfe); 215 } 216 // and continue 217 } 218 } 219 return results; 220 } 221 222 /** 223 * Get the home path of the current user. 224 * <p> 225 * In an insecure cluster, the environment variable 226 * <code>HADOOP_USER_NAME</code> is queried <i>first</i>. 227 * <p> 228 * This means that in a YARN container where the creator set this 229 * environment variable to propagate their identity, the defined 230 * user name is used in preference to the actual user. 231 * <p> 232 * In a secure cluster, the kerberos identity of the current user is used. 233 * @return a path for the current user's home dir. 234 * @throws RuntimeException if the current user identity cannot be determined 235 * from the OS/kerberos. 236 */ 237 public static String homePathForCurrentUser() { 238 String shortUserName = currentUsernameUnencoded(); 239 return homePathForUser(shortUserName); 240 } 241 242 /** 243 * Get the current username, before any encoding has been applied. 244 * @return the current user from the kerberos identity, falling back 245 * to the user and/or env variables. 246 */ 247 private static String currentUsernameUnencoded() { 248 String env_hadoop_username = System.getenv( 249 RegistryInternalConstants.HADOOP_USER_NAME); 250 return getCurrentUsernameUnencoded(env_hadoop_username); 251 } 252 253 /** 254 * Get the current username, using the value of the parameter 255 * <code>env_hadoop_username</code> if it is set on an insecure cluster. 256 * This ensures that the username propagates correctly across processes 257 * started by YARN. 258 * <p> 259 * This method is primarly made visible for testing. 260 * @param env_hadoop_username the environment variable 261 * @return the selected username 262 * @throws RuntimeException if there is a problem getting the short user 263 * name of the current user. 264 */ 265 @VisibleForTesting 266 public static String getCurrentUsernameUnencoded(String env_hadoop_username) { 267 String shortUserName = null; 268 if (!UserGroupInformation.isSecurityEnabled()) { 269 shortUserName = env_hadoop_username; 270 } 271 if (StringUtils.isEmpty(shortUserName)) { 272 try { 273 shortUserName = UserGroupInformation.getCurrentUser().getShortUserName(); 274 } catch (IOException e) { 275 throw new RuntimeException(e); 276 } 277 } 278 return shortUserName; 279 } 280 281 /** 282 * Get the current user path formatted for the registry 283 * <p> 284 * In an insecure cluster, the environment variable 285 * <code>HADOOP_USER_NAME </code> is queried <i>first</i>. 286 * <p> 287 * This means that in a YARN container where the creator set this 288 * environment variable to propagate their identity, the defined 289 * user name is used in preference to the actual user. 290 * <p> 291 * In a secure cluster, the kerberos identity of the current user is used. 292 * @return the encoded shortname of the current user 293 * @throws RuntimeException if the current user identity cannot be determined 294 * from the OS/kerberos. 295 * 296 */ 297 public static String currentUser() { 298 String shortUserName = currentUsernameUnencoded(); 299 return encodeForRegistry(shortUserName); 300 } 301 302 /** 303 * Extract all service records under a list of stat operations...this 304 * skips entries that are too short or simply not matching 305 * @param operations operation support for fetches 306 * @param parentpath path of the parent of all the entries 307 * @param stats Collection of stat results 308 * @return a possibly empty map of fullpath:record. 309 * @throws IOException for any IO Operation that wasn't ignored. 310 */ 311 public static Map<String, ServiceRecord> extractServiceRecords( 312 RegistryOperations operations, 313 String parentpath, 314 Collection<RegistryPathStatus> stats) throws IOException { 315 Map<String, ServiceRecord> results = new HashMap<String, ServiceRecord>(stats.size()); 316 for (RegistryPathStatus stat : stats) { 317 if (stat.size > ServiceRecord.RECORD_TYPE.length()) { 318 // maybe has data 319 String path = join(parentpath, stat.path); 320 try { 321 ServiceRecord serviceRecord = operations.resolve(path); 322 results.put(path, serviceRecord); 323 } catch (EOFException ignored) { 324 if (LOG.isDebugEnabled()) { 325 LOG.debug("data too short for {}", path); 326 } 327 } catch (InvalidRecordException record) { 328 if (LOG.isDebugEnabled()) { 329 LOG.debug("Invalid record at {}", path); 330 } 331 } catch (NoRecordException record) { 332 if (LOG.isDebugEnabled()) { 333 LOG.debug("No record at {}", path); 334 } 335 } 336 } 337 } 338 return results; 339 } 340 341 /** 342 * Extract all service records under a list of stat operations...this 343 * non-atomic action skips entries that are too short or simply not matching. 344 * <p> 345 * @param operations operation support for fetches 346 * @param parentpath path of the parent of all the entries 347 * @return a possibly empty map of fullpath:record. 348 * @throws IOException for any IO Operation that wasn't ignored. 349 */ 350 public static Map<String, ServiceRecord> extractServiceRecords( 351 RegistryOperations operations, 352 String parentpath, 353 Map<String , RegistryPathStatus> stats) throws IOException { 354 return extractServiceRecords(operations, parentpath, stats.values()); 355 } 356 357 358 /** 359 * Extract all service records under a list of stat operations...this 360 * non-atomic action skips entries that are too short or simply not matching. 361 * <p> 362 * @param operations operation support for fetches 363 * @param parentpath path of the parent of all the entries 364 * @return a possibly empty map of fullpath:record. 365 * @throws IOException for any IO Operation that wasn't ignored. 366 */ 367 public static Map<String, ServiceRecord> extractServiceRecords( 368 RegistryOperations operations, 369 String parentpath) throws IOException { 370 return 371 extractServiceRecords(operations, 372 parentpath, 373 statChildren(operations, parentpath).values()); 374 } 375 376 377 378 /** 379 * Static instance of service record marshalling 380 */ 381 public static class ServiceRecordMarshal extends JsonSerDeser<ServiceRecord> { 382 public ServiceRecordMarshal() { 383 super(ServiceRecord.class); 384 } 385 } 386}