001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.registry.client.binding; 020 021import com.google.common.annotations.VisibleForTesting; 022import com.google.common.base.Preconditions; 023import org.apache.commons.lang.StringUtils; 024import org.apache.hadoop.classification.InterfaceAudience; 025import org.apache.hadoop.classification.InterfaceStability; 026import org.apache.hadoop.fs.PathNotFoundException; 027import org.apache.hadoop.security.UserGroupInformation; 028import org.apache.hadoop.registry.client.api.RegistryConstants; 029import org.apache.hadoop.registry.client.api.RegistryOperations; 030import org.apache.hadoop.registry.client.exceptions.InvalidPathnameException; 031import org.apache.hadoop.registry.client.exceptions.InvalidRecordException; 032import org.apache.hadoop.registry.client.exceptions.NoRecordException; 033import org.apache.hadoop.registry.client.impl.zk.RegistryInternalConstants; 034import org.apache.hadoop.registry.client.types.RegistryPathStatus; 035import org.apache.hadoop.registry.client.types.ServiceRecord; 036import org.slf4j.Logger; 037import org.slf4j.LoggerFactory; 038 039import static org.apache.hadoop.registry.client.binding.RegistryPathUtils.*; 040 041import java.io.EOFException; 042import java.io.IOException; 043import java.util.Collection; 044import java.util.HashMap; 045import java.util.List; 046import java.util.Locale; 047import java.util.Map; 048 049/** 050 * Utility methods for working with a registry. 051 */ 052@InterfaceAudience.Public 053@InterfaceStability.Evolving 054public class RegistryUtils { 055 private static final Logger LOG = 056 LoggerFactory.getLogger(RegistryUtils.class); 057 058 /** 059 * Buld the user path -switches to the system path if the user is "". 060 * It also cross-converts the username to ascii via punycode 061 * @param username username or "" 062 * @return the path to the user 063 */ 064 public static String homePathForUser(String username) { 065 Preconditions.checkArgument(username != null, "null user"); 066 067 // catch recursion 068 if (username.startsWith(RegistryConstants.PATH_USERS)) { 069 return username; 070 } 071 if (username.isEmpty()) { 072 return RegistryConstants.PATH_SYSTEM_SERVICES; 073 } 074 075 // convert username to registry name 076 String convertedName = convertUsername(username); 077 078 return RegistryPathUtils.join(RegistryConstants.PATH_USERS, 079 encodeForRegistry(convertedName)); 080 } 081 082 /** 083 * Convert the username to that which can be used for registry 084 * entries. Lower cases it, 085 * Strip the kerberos realm off a username if needed, and any "/" hostname 086 * entries 087 * @param username user 088 * @return the converted username 089 */ 090 public static String convertUsername(String username) { 091 String converted= username.toLowerCase(Locale.ENGLISH); 092 int atSymbol = converted.indexOf('@'); 093 if (atSymbol > 0) { 094 converted = converted.substring(0, atSymbol); 095 } 096 int slashSymbol = converted.indexOf('/'); 097 if (slashSymbol > 0) { 098 converted = converted.substring(0, slashSymbol); 099 } 100 return converted; 101 } 102 103 /** 104 * Create a service classpath 105 * @param user username or "" 106 * @param serviceClass service name 107 * @return a full path 108 */ 109 public static String serviceclassPath(String user, 110 String serviceClass) { 111 String services = join(homePathForUser(user), 112 RegistryConstants.PATH_USER_SERVICES); 113 return join(services, 114 serviceClass); 115 } 116 117 /** 118 * Create a path to a service under a user & service class 119 * @param user username or "" 120 * @param serviceClass service name 121 * @param serviceName service name unique for that user & service class 122 * @return a full path 123 */ 124 public static String servicePath(String user, 125 String serviceClass, 126 String serviceName) { 127 128 return join( 129 serviceclassPath(user, serviceClass), 130 serviceName); 131 } 132 133 /** 134 * Create a path for listing components under a service 135 * @param user username or "" 136 * @param serviceClass service name 137 * @param serviceName service name unique for that user & service class 138 * @return a full path 139 */ 140 public static String componentListPath(String user, 141 String serviceClass, String serviceName) { 142 143 return join(servicePath(user, serviceClass, serviceName), 144 RegistryConstants.SUBPATH_COMPONENTS); 145 } 146 147 /** 148 * Create the path to a service record for a component 149 * @param user username or "" 150 * @param serviceClass service name 151 * @param serviceName service name unique for that user & service class 152 * @param componentName unique name/ID of the component 153 * @return a full path 154 */ 155 public static String componentPath(String user, 156 String serviceClass, String serviceName, String componentName) { 157 158 return join( 159 componentListPath(user, serviceClass, serviceName), 160 componentName); 161 } 162 163 /** 164 * List service records directly under a path 165 * @param registryOperations registry operations instance 166 * @param path path to list 167 * @return a mapping of the service records that were resolved, indexed 168 * by their full path 169 * @throws IOException 170 */ 171 public static Map<String, ServiceRecord> listServiceRecords( 172 RegistryOperations registryOperations, 173 String path) throws IOException { 174 Map<String, RegistryPathStatus> children = 175 statChildren(registryOperations, path); 176 return extractServiceRecords(registryOperations, 177 path, 178 children.values()); 179 } 180 181 /** 182 * List children of a directory and retrieve their 183 * {@link RegistryPathStatus} values. 184 * <p> 185 * This is not an atomic operation; A child may be deleted 186 * during the iteration through the child entries. If this happens, 187 * the <code>PathNotFoundException</code> is caught and that child 188 * entry ommitted. 189 * 190 * @param path path 191 * @return a possibly empty map of child entries listed by 192 * their short name. 193 * @throws PathNotFoundException path is not in the registry. 194 * @throws InvalidPathnameException the path is invalid. 195 * @throws IOException Any other IO Exception 196 */ 197 public static Map<String, RegistryPathStatus> statChildren( 198 RegistryOperations registryOperations, 199 String path) 200 throws PathNotFoundException, 201 InvalidPathnameException, 202 IOException { 203 List<String> childNames = registryOperations.list(path); 204 Map<String, RegistryPathStatus> results = 205 new HashMap<String, RegistryPathStatus>(); 206 for (String childName : childNames) { 207 String child = join(path, childName); 208 try { 209 RegistryPathStatus stat = registryOperations.stat(child); 210 results.put(childName, stat); 211 } catch (PathNotFoundException pnfe) { 212 if (LOG.isDebugEnabled()) { 213 LOG.debug("stat failed on {}: moved? {}", child, pnfe, pnfe); 214 } 215 // and continue 216 } 217 } 218 return results; 219 } 220 221 /** 222 * Get the home path of the current user. 223 * <p> 224 * In an insecure cluster, the environment variable 225 * <code>HADOOP_USER_NAME</code> is queried <i>first</i>. 226 * <p> 227 * This means that in a YARN container where the creator set this 228 * environment variable to propagate their identity, the defined 229 * user name is used in preference to the actual user. 230 * <p> 231 * In a secure cluster, the kerberos identity of the current user is used. 232 * @return a path for the current user's home dir. 233 * @throws RuntimeException if the current user identity cannot be determined 234 * from the OS/kerberos. 235 */ 236 public static String homePathForCurrentUser() { 237 String shortUserName = currentUsernameUnencoded(); 238 return homePathForUser(shortUserName); 239 } 240 241 /** 242 * Get the current username, before any encoding has been applied. 243 * @return the current user from the kerberos identity, falling back 244 * to the user and/or env variables. 245 */ 246 private static String currentUsernameUnencoded() { 247 String env_hadoop_username = System.getenv( 248 RegistryInternalConstants.HADOOP_USER_NAME); 249 return getCurrentUsernameUnencoded(env_hadoop_username); 250 } 251 252 /** 253 * Get the current username, using the value of the parameter 254 * <code>env_hadoop_username</code> if it is set on an insecure cluster. 255 * This ensures that the username propagates correctly across processes 256 * started by YARN. 257 * <p> 258 * This method is primarly made visible for testing. 259 * @param env_hadoop_username the environment variable 260 * @return the selected username 261 * @throws RuntimeException if there is a problem getting the short user 262 * name of the current user. 263 */ 264 @VisibleForTesting 265 public static String getCurrentUsernameUnencoded(String env_hadoop_username) { 266 String shortUserName = null; 267 if (!UserGroupInformation.isSecurityEnabled()) { 268 shortUserName = env_hadoop_username; 269 } 270 if (StringUtils.isEmpty(shortUserName)) { 271 try { 272 shortUserName = UserGroupInformation.getCurrentUser().getShortUserName(); 273 } catch (IOException e) { 274 throw new RuntimeException(e); 275 } 276 } 277 return shortUserName; 278 } 279 280 /** 281 * Get the current user path formatted for the registry 282 * <p> 283 * In an insecure cluster, the environment variable 284 * <code>HADOOP_USER_NAME </code> is queried <i>first</i>. 285 * <p> 286 * This means that in a YARN container where the creator set this 287 * environment variable to propagate their identity, the defined 288 * user name is used in preference to the actual user. 289 * <p> 290 * In a secure cluster, the kerberos identity of the current user is used. 291 * @return the encoded shortname of the current user 292 * @throws RuntimeException if the current user identity cannot be determined 293 * from the OS/kerberos. 294 * 295 */ 296 public static String currentUser() { 297 String shortUserName = currentUsernameUnencoded(); 298 return encodeForRegistry(shortUserName); 299 } 300 301 /** 302 * Extract all service records under a list of stat operations...this 303 * skips entries that are too short or simply not matching 304 * @param operations operation support for fetches 305 * @param parentpath path of the parent of all the entries 306 * @param stats Collection of stat results 307 * @return a possibly empty map of fullpath:record. 308 * @throws IOException for any IO Operation that wasn't ignored. 309 */ 310 public static Map<String, ServiceRecord> extractServiceRecords( 311 RegistryOperations operations, 312 String parentpath, 313 Collection<RegistryPathStatus> stats) throws IOException { 314 Map<String, ServiceRecord> results = new HashMap<String, ServiceRecord>(stats.size()); 315 for (RegistryPathStatus stat : stats) { 316 if (stat.size > ServiceRecord.RECORD_TYPE.length()) { 317 // maybe has data 318 String path = join(parentpath, stat.path); 319 try { 320 ServiceRecord serviceRecord = operations.resolve(path); 321 results.put(path, serviceRecord); 322 } catch (EOFException ignored) { 323 if (LOG.isDebugEnabled()) { 324 LOG.debug("data too short for {}", path); 325 } 326 } catch (InvalidRecordException record) { 327 if (LOG.isDebugEnabled()) { 328 LOG.debug("Invalid record at {}", path); 329 } 330 } catch (NoRecordException record) { 331 if (LOG.isDebugEnabled()) { 332 LOG.debug("No record at {}", path); 333 } 334 } 335 } 336 } 337 return results; 338 } 339 340 /** 341 * Extract all service records under a list of stat operations...this 342 * non-atomic action skips entries that are too short or simply not matching. 343 * <p> 344 * @param operations operation support for fetches 345 * @param parentpath path of the parent of all the entries 346 * @return a possibly empty map of fullpath:record. 347 * @throws IOException for any IO Operation that wasn't ignored. 348 */ 349 public static Map<String, ServiceRecord> extractServiceRecords( 350 RegistryOperations operations, 351 String parentpath, 352 Map<String , RegistryPathStatus> stats) throws IOException { 353 return extractServiceRecords(operations, parentpath, stats.values()); 354 } 355 356 357 /** 358 * Extract all service records under a list of stat operations...this 359 * non-atomic action skips entries that are too short or simply not matching. 360 * <p> 361 * @param operations operation support for fetches 362 * @param parentpath path of the parent of all the entries 363 * @return a possibly empty map of fullpath:record. 364 * @throws IOException for any IO Operation that wasn't ignored. 365 */ 366 public static Map<String, ServiceRecord> extractServiceRecords( 367 RegistryOperations operations, 368 String parentpath) throws IOException { 369 return 370 extractServiceRecords(operations, 371 parentpath, 372 statChildren(operations, parentpath).values()); 373 } 374 375 376 377 /** 378 * Static instance of service record marshalling 379 */ 380 public static class ServiceRecordMarshal extends JsonSerDeser<ServiceRecord> { 381 public ServiceRecordMarshal() { 382 super(ServiceRecord.class); 383 } 384 } 385}