001    /*
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.registry.client.binding;
020    
021    import com.google.common.annotations.VisibleForTesting;
022    import com.google.common.base.Preconditions;
023    import org.apache.commons.lang.StringUtils;
024    import org.apache.hadoop.classification.InterfaceAudience;
025    import org.apache.hadoop.classification.InterfaceStability;
026    import org.apache.hadoop.fs.PathNotFoundException;
027    import org.apache.hadoop.security.UserGroupInformation;
028    import org.apache.hadoop.registry.client.api.RegistryConstants;
029    import org.apache.hadoop.registry.client.api.RegistryOperations;
030    import org.apache.hadoop.registry.client.exceptions.InvalidPathnameException;
031    import org.apache.hadoop.registry.client.exceptions.InvalidRecordException;
032    import org.apache.hadoop.registry.client.exceptions.NoRecordException;
033    import org.apache.hadoop.registry.client.impl.zk.RegistryInternalConstants;
034    import org.apache.hadoop.registry.client.types.RegistryPathStatus;
035    import org.apache.hadoop.registry.client.types.ServiceRecord;
036    import org.slf4j.Logger;
037    import org.slf4j.LoggerFactory;
038    
039    import static org.apache.hadoop.registry.client.binding.RegistryPathUtils.*;
040    
041    import java.io.EOFException;
042    import java.io.IOException;
043    import java.util.Collection;
044    import java.util.HashMap;
045    import java.util.List;
046    import java.util.Locale;
047    import java.util.Map;
048    
049    /**
050     * Utility methods for working with a registry.
051     */
052    @InterfaceAudience.Public
053    @InterfaceStability.Evolving
054    public class RegistryUtils {
055      private static final Logger LOG =
056          LoggerFactory.getLogger(RegistryUtils.class);
057    
058      /**
059       * Buld the user path -switches to the system path if the user is "".
060       * It also cross-converts the username to ascii via punycode
061       * @param username username or ""
062       * @return the path to the user
063       */
064      public static String homePathForUser(String username) {
065        Preconditions.checkArgument(username != null, "null user");
066    
067        // catch recursion
068        if (username.startsWith(RegistryConstants.PATH_USERS)) {
069          return username;
070        }
071        if (username.isEmpty()) {
072          return RegistryConstants.PATH_SYSTEM_SERVICES;
073        }
074    
075        // convert username to registry name
076        String convertedName = convertUsername(username);
077    
078        return RegistryPathUtils.join(RegistryConstants.PATH_USERS,
079            encodeForRegistry(convertedName));
080      }
081    
082      /**
083       * Convert the username to that which can be used for registry
084       * entries. Lower cases it,
085       * Strip the kerberos realm off a username if needed, and any "/" hostname
086       * entries
087       * @param username user
088       * @return the converted username
089       */
090      public static String convertUsername(String username) {
091        String converted= username.toLowerCase(Locale.ENGLISH);
092        int atSymbol = converted.indexOf('@');
093        if (atSymbol > 0) {
094          converted = converted.substring(0, atSymbol);
095        }
096        int slashSymbol = converted.indexOf('/');
097        if (slashSymbol > 0) {
098          converted = converted.substring(0, slashSymbol);
099        }
100        return converted;
101      }
102    
103      /**
104       * Create a service classpath
105       * @param user username or ""
106       * @param serviceClass service name
107       * @return a full path
108       */
109      public static String serviceclassPath(String user,
110          String serviceClass) {
111        String services = join(homePathForUser(user),
112            RegistryConstants.PATH_USER_SERVICES);
113        return join(services,
114            serviceClass);
115      }
116    
117      /**
118       * Create a path to a service under a user & service class
119       * @param user username or ""
120       * @param serviceClass service name
121       * @param serviceName service name unique for that user & service class
122       * @return a full path
123       */
124      public static String servicePath(String user,
125          String serviceClass,
126          String serviceName) {
127    
128        return join(
129            serviceclassPath(user, serviceClass),
130            serviceName);
131      }
132    
133      /**
134       * Create a path for listing components under a service
135       * @param user username or ""
136       * @param serviceClass service name
137       * @param serviceName service name unique for that user & service class
138       * @return a full path
139       */
140      public static String componentListPath(String user,
141          String serviceClass, String serviceName) {
142    
143        return join(servicePath(user, serviceClass, serviceName),
144            RegistryConstants.SUBPATH_COMPONENTS);
145      }
146    
147      /**
148       * Create the path to a service record for a component
149       * @param user username or ""
150       * @param serviceClass service name
151       * @param serviceName service name unique for that user & service class
152       * @param componentName unique name/ID of the component
153       * @return a full path
154       */
155      public static String componentPath(String user,
156          String serviceClass, String serviceName, String componentName) {
157    
158        return join(
159            componentListPath(user, serviceClass, serviceName),
160            componentName);
161      }
162    
163      /**
164       * List service records directly under a path
165       * @param registryOperations registry operations instance
166       * @param path path to list
167       * @return a mapping of the service records that were resolved, indexed
168       * by their full path
169       * @throws IOException
170       */
171      public static Map<String, ServiceRecord> listServiceRecords(
172          RegistryOperations registryOperations,
173          String path) throws IOException {
174        Map<String, RegistryPathStatus> children =
175            statChildren(registryOperations, path);
176        return extractServiceRecords(registryOperations,
177            path,
178            children.values());
179      }
180    
181      /**
182       * List children of a directory and retrieve their
183       * {@link RegistryPathStatus} values.
184       * <p>
185       * This is not an atomic operation; A child may be deleted
186       * during the iteration through the child entries. If this happens,
187       * the <code>PathNotFoundException</code> is caught and that child
188       * entry ommitted.
189       *
190       * @param path path
191       * @return a possibly empty map of child entries listed by
192       * their short name.
193       * @throws PathNotFoundException path is not in the registry.
194       * @throws InvalidPathnameException the path is invalid.
195       * @throws IOException Any other IO Exception
196       */
197      public static Map<String, RegistryPathStatus> statChildren(
198          RegistryOperations registryOperations,
199          String path)
200          throws PathNotFoundException,
201          InvalidPathnameException,
202          IOException {
203        List<String> childNames = registryOperations.list(path);
204        Map<String, RegistryPathStatus> results =
205            new HashMap<String, RegistryPathStatus>();
206        for (String childName : childNames) {
207          String child = join(path, childName);
208          try {
209            RegistryPathStatus stat = registryOperations.stat(child);
210            results.put(childName, stat);
211          } catch (PathNotFoundException pnfe) {
212            if (LOG.isDebugEnabled()) {
213              LOG.debug("stat failed on {}: moved? {}", child, pnfe, pnfe);
214            }
215            // and continue
216          }
217        }
218        return results;
219      }
220    
221      /**
222       * Get the home path of the current user.
223       * <p>
224       *  In an insecure cluster, the environment variable
225       *  <code>HADOOP_USER_NAME</code> is queried <i>first</i>.
226       * <p>
227       * This means that in a YARN container where the creator set this
228       * environment variable to propagate their identity, the defined
229       * user name is used in preference to the actual user.
230       * <p>
231       * In a secure cluster, the kerberos identity of the current user is used.
232       * @return a path for the current user's home dir.
233       * @throws RuntimeException if the current user identity cannot be determined
234       * from the OS/kerberos.
235       */
236      public static String homePathForCurrentUser() {
237        String shortUserName = currentUsernameUnencoded();
238        return homePathForUser(shortUserName);
239      }
240    
241      /**
242       * Get the current username, before any encoding has been applied.
243       * @return the current user from the kerberos identity, falling back
244       * to the user and/or env variables.
245       */
246      private static String currentUsernameUnencoded() {
247        String env_hadoop_username = System.getenv(
248            RegistryInternalConstants.HADOOP_USER_NAME);
249        return getCurrentUsernameUnencoded(env_hadoop_username);
250      }
251    
252      /**
253       * Get the current username, using the value of the parameter
254       * <code>env_hadoop_username</code> if it is set on an insecure cluster.
255       * This ensures that the username propagates correctly across processes
256       * started by YARN.
257       * <p>
258       * This method is primarly made visible for testing.
259       * @param env_hadoop_username the environment variable
260       * @return the selected username
261       * @throws RuntimeException if there is a problem getting the short user
262       * name of the current user.
263       */
264      @VisibleForTesting
265      public static String getCurrentUsernameUnencoded(String env_hadoop_username) {
266        String shortUserName = null;
267        if (!UserGroupInformation.isSecurityEnabled()) {
268          shortUserName = env_hadoop_username;
269        }
270        if (StringUtils.isEmpty(shortUserName)) {
271          try {
272            shortUserName = UserGroupInformation.getCurrentUser().getShortUserName();
273          } catch (IOException e) {
274            throw new RuntimeException(e);
275          }
276        }
277        return shortUserName;
278      }
279    
280      /**
281       * Get the current user path formatted for the registry
282       * <p>
283       *  In an insecure cluster, the environment variable
284       *  <code>HADOOP_USER_NAME </code> is queried <i>first</i>.
285       * <p>
286       * This means that in a YARN container where the creator set this
287       * environment variable to propagate their identity, the defined
288       * user name is used in preference to the actual user.
289       * <p>
290       * In a secure cluster, the kerberos identity of the current user is used.
291       * @return the encoded shortname of the current user
292       * @throws RuntimeException if the current user identity cannot be determined
293       * from the OS/kerberos.
294       *
295       */
296      public static String currentUser() {
297        String shortUserName = currentUsernameUnencoded();
298        return encodeForRegistry(shortUserName);
299      }
300    
301      /**
302       * Extract all service records under a list of stat operations...this
303       * skips entries that are too short or simply not matching
304       * @param operations operation support for fetches
305       * @param parentpath path of the parent of all the entries
306       * @param stats Collection of stat results
307       * @return a possibly empty map of fullpath:record.
308       * @throws IOException for any IO Operation that wasn't ignored.
309       */
310      public static Map<String, ServiceRecord> extractServiceRecords(
311          RegistryOperations operations,
312          String parentpath,
313          Collection<RegistryPathStatus> stats) throws IOException {
314        Map<String, ServiceRecord> results = new HashMap<String, ServiceRecord>(stats.size());
315        for (RegistryPathStatus stat : stats) {
316          if (stat.size > ServiceRecord.RECORD_TYPE.length()) {
317            // maybe has data
318            String path = join(parentpath, stat.path);
319            try {
320              ServiceRecord serviceRecord = operations.resolve(path);
321              results.put(path, serviceRecord);
322            } catch (EOFException ignored) {
323              if (LOG.isDebugEnabled()) {
324                LOG.debug("data too short for {}", path);
325              }
326            } catch (InvalidRecordException record) {
327              if (LOG.isDebugEnabled()) {
328                LOG.debug("Invalid record at {}", path);
329              }
330            } catch (NoRecordException record) {
331              if (LOG.isDebugEnabled()) {
332                LOG.debug("No record at {}", path);
333              }
334            }
335          }
336        }
337        return results;
338      }
339    
340      /**
341       * Extract all service records under a list of stat operations...this
342       * non-atomic action skips entries that are too short or simply not matching.
343       * <p>
344       * @param operations operation support for fetches
345       * @param parentpath path of the parent of all the entries
346       * @return a possibly empty map of fullpath:record.
347       * @throws IOException for any IO Operation that wasn't ignored.
348       */
349      public static Map<String, ServiceRecord> extractServiceRecords(
350          RegistryOperations operations,
351          String parentpath,
352          Map<String , RegistryPathStatus> stats) throws IOException {
353        return extractServiceRecords(operations, parentpath, stats.values());
354      }
355    
356    
357      /**
358       * Extract all service records under a list of stat operations...this
359       * non-atomic action skips entries that are too short or simply not matching.
360       * <p>
361       * @param operations operation support for fetches
362       * @param parentpath path of the parent of all the entries
363       * @return a possibly empty map of fullpath:record.
364       * @throws IOException for any IO Operation that wasn't ignored.
365       */
366      public static Map<String, ServiceRecord> extractServiceRecords(
367          RegistryOperations operations,
368          String parentpath) throws IOException {
369        return
370        extractServiceRecords(operations,
371            parentpath,
372            statChildren(operations, parentpath).values());
373      }
374    
375    
376    
377      /**
378       * Static instance of service record marshalling
379       */
380      public static class ServiceRecordMarshal extends JsonSerDeser<ServiceRecord> {
381        public ServiceRecordMarshal() {
382          super(ServiceRecord.class);
383        }
384      }
385    }