001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.mapreduce.lib.partition;
020
021import java.io.UnsupportedEncodingException;
022import java.util.List;
023
024import org.apache.commons.logging.Log;
025import org.apache.commons.logging.LogFactory;
026import org.apache.hadoop.classification.InterfaceAudience;
027import org.apache.hadoop.classification.InterfaceStability;
028import org.apache.hadoop.conf.Configurable;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.mapreduce.Job;
031import org.apache.hadoop.mapreduce.JobContext;
032import org.apache.hadoop.mapreduce.MRJobConfig;
033import org.apache.hadoop.mapreduce.Partitioner;
034import org.apache.hadoop.mapreduce.lib.partition.KeyFieldHelper.KeyDescription;
035
036 /**   
037  *  Defines a way to partition keys based on certain key fields (also see
038  *  {@link KeyFieldBasedComparator}.
039  *  The key specification supported is of the form -k pos1[,pos2], where,
040  *  pos is of the form f[.c][opts], where f is the number
041  *  of the key field to use, and c is the number of the first character from
042  *  the beginning of the field. Fields and character posns are numbered 
043  *  starting with 1; a character position of zero in pos2 indicates the
044  *  field's last character. If '.c' is omitted from pos1, it defaults to 1
045  *  (the beginning of the field); if omitted from pos2, it defaults to 0 
046  *  (the end of the field).
047  * 
048  */
049@InterfaceAudience.Public
050@InterfaceStability.Stable
051public class KeyFieldBasedPartitioner<K2, V2> extends Partitioner<K2, V2> 
052    implements Configurable {
053
054  private static final Log LOG = LogFactory.getLog(
055                                   KeyFieldBasedPartitioner.class.getName());
056  public static String PARTITIONER_OPTIONS = 
057    "mapreduce.partition.keypartitioner.options";
058  private int numOfPartitionFields;
059  
060  private KeyFieldHelper keyFieldHelper = new KeyFieldHelper();
061  
062  private Configuration conf;
063
064  public void setConf(Configuration conf) {
065    this.conf = conf;
066    keyFieldHelper = new KeyFieldHelper();
067    String keyFieldSeparator = 
068      conf.get(MRJobConfig.MAP_OUTPUT_KEY_FIELD_SEPERATOR, "\t");
069    keyFieldHelper.setKeyFieldSeparator(keyFieldSeparator);
070    if (conf.get("num.key.fields.for.partition") != null) {
071      LOG.warn("Using deprecated num.key.fields.for.partition. " +
072                "Use mapreduce.partition.keypartitioner.options instead");
073      this.numOfPartitionFields = conf.getInt("num.key.fields.for.partition",0);
074      keyFieldHelper.setKeyFieldSpec(1,numOfPartitionFields);
075    } else {
076      String option = conf.get(PARTITIONER_OPTIONS);
077      keyFieldHelper.parseOption(option);
078    }
079  }
080
081  public Configuration getConf() {
082    return conf;
083  }
084  
085  public int getPartition(K2 key, V2 value, int numReduceTasks) {
086    byte[] keyBytes;
087
088    List <KeyDescription> allKeySpecs = keyFieldHelper.keySpecs();
089    if (allKeySpecs.size() == 0) {
090      return getPartition(key.toString().hashCode(), numReduceTasks);
091    }
092
093    try {
094      keyBytes = key.toString().getBytes("UTF-8");
095    } catch (UnsupportedEncodingException e) {
096      throw new RuntimeException("The current system does not " +
097          "support UTF-8 encoding!", e);
098    }
099    // return 0 if the key is empty
100    if (keyBytes.length == 0) {
101      return 0;
102    }
103    
104    int []lengthIndicesFirst = keyFieldHelper.getWordLengths(keyBytes, 0, 
105        keyBytes.length);
106    int currentHash = 0;
107    for (KeyDescription keySpec : allKeySpecs) {
108      int startChar = keyFieldHelper.getStartOffset(keyBytes, 0, 
109        keyBytes.length, lengthIndicesFirst, keySpec);
110       // no key found! continue
111      if (startChar < 0) {
112        continue;
113      }
114      int endChar = keyFieldHelper.getEndOffset(keyBytes, 0, keyBytes.length, 
115          lengthIndicesFirst, keySpec);
116      currentHash = hashCode(keyBytes, startChar, endChar, 
117          currentHash);
118    }
119    return getPartition(currentHash, numReduceTasks);
120  }
121  
122  protected int hashCode(byte[] b, int start, int end, int currentHash) {
123    for (int i = start; i <= end; i++) {
124      currentHash = 31*currentHash + b[i];
125    }
126    return currentHash;
127  }
128
129  protected int getPartition(int hash, int numReduceTasks) {
130    return (hash & Integer.MAX_VALUE) % numReduceTasks;
131  }
132  
133  /**
134   * Set the {@link KeyFieldBasedPartitioner} options used for 
135   * {@link Partitioner}
136   * 
137   * @param keySpec the key specification of the form -k pos1[,pos2], where,
138   *  pos is of the form f[.c][opts], where f is the number
139   *  of the key field to use, and c is the number of the first character from
140   *  the beginning of the field. Fields and character posns are numbered 
141   *  starting with 1; a character position of zero in pos2 indicates the
142   *  field's last character. If '.c' is omitted from pos1, it defaults to 1
143   *  (the beginning of the field); if omitted from pos2, it defaults to 0 
144   *  (the end of the field).
145   */
146  public void setKeyFieldPartitionerOptions(Job job, String keySpec) {
147    job.getConfiguration().set(PARTITIONER_OPTIONS, keySpec);
148  }
149  
150  /**
151   * Get the {@link KeyFieldBasedPartitioner} options
152   */
153  public String getKeyFieldPartitionerOption(JobContext job) {
154    return job.getConfiguration().get(PARTITIONER_OPTIONS);
155  }
156
157
158}