001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.net;
019
020import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.NET_TOPOLOGY_TABLE_MAPPING_FILE_KEY;
021
022import java.io.BufferedReader;
023import java.io.FileInputStream;
024import java.io.InputStreamReader;
025import java.nio.charset.StandardCharsets;
026import java.util.ArrayList;
027import java.util.HashMap;
028import java.util.List;
029import java.util.Map;
030
031import org.apache.commons.lang.StringUtils;
032import org.apache.commons.logging.Log;
033import org.apache.commons.logging.LogFactory;
034import org.apache.hadoop.classification.InterfaceAudience;
035import org.apache.hadoop.classification.InterfaceStability;
036import org.apache.hadoop.conf.Configuration;
037import org.apache.hadoop.conf.Configured;
038
039/**
040 * <p>
041 * Simple {@link DNSToSwitchMapping} implementation that reads a 2 column text
042 * file. The columns are separated by whitespace. The first column is a DNS or
043 * IP address and the second column specifies the rack where the address maps.
044 * </p>
045 * <p>
046 * This class uses the configuration parameter {@code
047 * net.topology.table.file.name} to locate the mapping file.
048 * </p>
049 * <p>
050 * Calls to {@link #resolve(List)} will look up the address as defined in the
051 * mapping file. If no entry corresponding to the address is found, the value
052 * {@code /default-rack} is returned.
053 * </p>
054 */
055@InterfaceAudience.Public
056@InterfaceStability.Evolving
057public class TableMapping extends CachedDNSToSwitchMapping {
058
059  private static final Log LOG = LogFactory.getLog(TableMapping.class);
060  
061  public TableMapping() {
062    super(new RawTableMapping());
063  }
064  
065  private RawTableMapping getRawMapping() {
066    return (RawTableMapping) rawMapping;
067  }
068
069  @Override
070  public Configuration getConf() {
071    return getRawMapping().getConf();
072  }
073
074  @Override
075  public void setConf(Configuration conf) {
076    super.setConf(conf);
077    getRawMapping().setConf(conf);
078  }
079  
080  @Override
081  public void reloadCachedMappings() {
082    super.reloadCachedMappings();
083    getRawMapping().reloadCachedMappings();
084  }
085  
086  private static final class RawTableMapping extends Configured
087      implements DNSToSwitchMapping {
088    
089    private Map<String, String> map;
090  
091    private Map<String, String> load() {
092      Map<String, String> loadMap = new HashMap<String, String>();
093  
094      String filename = getConf().get(NET_TOPOLOGY_TABLE_MAPPING_FILE_KEY, null);
095      if (StringUtils.isBlank(filename)) {
096        LOG.warn(NET_TOPOLOGY_TABLE_MAPPING_FILE_KEY + " not configured. ");
097        return null;
098      }
099  
100
101      try (BufferedReader reader =
102               new BufferedReader(new InputStreamReader(
103                   new FileInputStream(filename), StandardCharsets.UTF_8))) {
104        String line = reader.readLine();
105        while (line != null) {
106          line = line.trim();
107          if (line.length() != 0 && line.charAt(0) != '#') {
108            String[] columns = line.split("\\s+");
109            if (columns.length == 2) {
110              loadMap.put(columns[0], columns[1]);
111            } else {
112              LOG.warn("Line does not have two columns. Ignoring. " + line);
113            }
114          }
115          line = reader.readLine();
116        }
117      } catch (Exception e) {
118        LOG.warn(filename + " cannot be read.", e);
119        return null;
120      }
121      return loadMap;
122    }
123  
124    @Override
125    public synchronized List<String> resolve(List<String> names) {
126      if (map == null) {
127        map = load();
128        if (map == null) {
129          LOG.warn("Failed to read topology table. " +
130            NetworkTopology.DEFAULT_RACK + " will be used for all nodes.");
131          map = new HashMap<String, String>();
132        }
133      }
134      List<String> results = new ArrayList<String>(names.size());
135      for (String name : names) {
136        String result = map.get(name);
137        if (result != null) {
138          results.add(result);
139        } else {
140          results.add(NetworkTopology.DEFAULT_RACK);
141        }
142      }
143      return results;
144    }
145
146    @Override
147    public void reloadCachedMappings() {
148      Map<String, String> newMap = load();
149      if (newMap == null) {
150        LOG.error("Failed to reload the topology table.  The cached " +
151            "mappings will not be cleared.");
152      } else {
153        synchronized(this) {
154          map = newMap;
155        }
156      }
157    }
158
159    @Override
160    public void reloadCachedMappings(List<String> names) {
161      // TableMapping has to reload all mappings at once, so no chance to 
162      // reload mappings on specific nodes
163      reloadCachedMappings();
164    }
165  }
166}