001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.net;
019
020import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.NET_TOPOLOGY_TABLE_MAPPING_FILE_KEY;
021
022import java.io.BufferedReader;
023import java.io.FileInputStream;
024import java.io.FileReader;
025import java.io.IOException;
026import java.io.InputStreamReader;
027import java.util.ArrayList;
028import java.util.HashMap;
029import java.util.List;
030import java.util.Map;
031
032import org.apache.commons.io.Charsets;
033import org.apache.commons.lang.StringUtils;
034import org.apache.commons.logging.Log;
035import org.apache.commons.logging.LogFactory;
036import org.apache.hadoop.classification.InterfaceAudience;
037import org.apache.hadoop.classification.InterfaceStability;
038import org.apache.hadoop.conf.Configuration;
039import org.apache.hadoop.conf.Configured;
040
041/**
042 * <p>
043 * Simple {@link DNSToSwitchMapping} implementation that reads a 2 column text
044 * file. The columns are separated by whitespace. The first column is a DNS or
045 * IP address and the second column specifies the rack where the address maps.
046 * </p>
047 * <p>
048 * This class uses the configuration parameter {@code
049 * net.topology.table.file.name} to locate the mapping file.
050 * </p>
051 * <p>
052 * Calls to {@link #resolve(List)} will look up the address as defined in the
053 * mapping file. If no entry corresponding to the address is found, the value
054 * {@code /default-rack} is returned.
055 * </p>
056 */
057@InterfaceAudience.Public
058@InterfaceStability.Evolving
059public class TableMapping extends CachedDNSToSwitchMapping {
060
061  private static final Log LOG = LogFactory.getLog(TableMapping.class);
062  
063  public TableMapping() {
064    super(new RawTableMapping());
065  }
066  
067  private RawTableMapping getRawMapping() {
068    return (RawTableMapping) rawMapping;
069  }
070
071  @Override
072  public Configuration getConf() {
073    return getRawMapping().getConf();
074  }
075
076  @Override
077  public void setConf(Configuration conf) {
078    super.setConf(conf);
079    getRawMapping().setConf(conf);
080  }
081  
082  @Override
083  public void reloadCachedMappings() {
084    super.reloadCachedMappings();
085    getRawMapping().reloadCachedMappings();
086  }
087  
088  private static final class RawTableMapping extends Configured
089      implements DNSToSwitchMapping {
090    
091    private Map<String, String> map;
092  
093    private Map<String, String> load() {
094      Map<String, String> loadMap = new HashMap<String, String>();
095  
096      String filename = getConf().get(NET_TOPOLOGY_TABLE_MAPPING_FILE_KEY, null);
097      if (StringUtils.isBlank(filename)) {
098        LOG.warn(NET_TOPOLOGY_TABLE_MAPPING_FILE_KEY + " not configured. ");
099        return null;
100      }
101  
102
103      try (BufferedReader reader =
104               new BufferedReader(new InputStreamReader(
105                   new FileInputStream(filename), Charsets.UTF_8))) {
106        String line = reader.readLine();
107        while (line != null) {
108          line = line.trim();
109          if (line.length() != 0 && line.charAt(0) != '#') {
110            String[] columns = line.split("\\s+");
111            if (columns.length == 2) {
112              loadMap.put(columns[0], columns[1]);
113            } else {
114              LOG.warn("Line does not have two columns. Ignoring. " + line);
115            }
116          }
117          line = reader.readLine();
118        }
119      } catch (Exception e) {
120        LOG.warn(filename + " cannot be read.", e);
121        return null;
122      }
123      return loadMap;
124    }
125  
126    @Override
127    public synchronized List<String> resolve(List<String> names) {
128      if (map == null) {
129        map = load();
130        if (map == null) {
131          LOG.warn("Failed to read topology table. " +
132            NetworkTopology.DEFAULT_RACK + " will be used for all nodes.");
133          map = new HashMap<String, String>();
134        }
135      }
136      List<String> results = new ArrayList<String>(names.size());
137      for (String name : names) {
138        String result = map.get(name);
139        if (result != null) {
140          results.add(result);
141        } else {
142          results.add(NetworkTopology.DEFAULT_RACK);
143        }
144      }
145      return results;
146    }
147
148    @Override
149    public void reloadCachedMappings() {
150      Map<String, String> newMap = load();
151      if (newMap == null) {
152        LOG.error("Failed to reload the topology table.  The cached " +
153            "mappings will not be cleared.");
154      } else {
155        synchronized(this) {
156          map = newMap;
157        }
158      }
159    }
160
161    @Override
162    public void reloadCachedMappings(List<String> names) {
163      // TableMapping has to reload all mappings at once, so no chance to 
164      // reload mappings on specific nodes
165      reloadCachedMappings();
166    }
167  }
168}