001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.mapreduce.lib.partition;
020    
021    import java.io.UnsupportedEncodingException;
022    import java.util.List;
023    
024    import org.apache.commons.logging.Log;
025    import org.apache.commons.logging.LogFactory;
026    import org.apache.hadoop.classification.InterfaceAudience;
027    import org.apache.hadoop.classification.InterfaceStability;
028    import org.apache.hadoop.conf.Configurable;
029    import org.apache.hadoop.conf.Configuration;
030    import org.apache.hadoop.mapreduce.Job;
031    import org.apache.hadoop.mapreduce.JobContext;
032    import org.apache.hadoop.mapreduce.MRJobConfig;
033    import org.apache.hadoop.mapreduce.Partitioner;
034    import org.apache.hadoop.mapreduce.lib.partition.KeyFieldHelper.KeyDescription;
035    
036     /**   
037      *  Defines a way to partition keys based on certain key fields (also see
038      *  {@link KeyFieldBasedComparator}.
039      *  The key specification supported is of the form -k pos1[,pos2], where,
040      *  pos is of the form f[.c][opts], where f is the number
041      *  of the key field to use, and c is the number of the first character from
042      *  the beginning of the field. Fields and character posns are numbered 
043      *  starting with 1; a character position of zero in pos2 indicates the
044      *  field's last character. If '.c' is omitted from pos1, it defaults to 1
045      *  (the beginning of the field); if omitted from pos2, it defaults to 0 
046      *  (the end of the field).
047      * 
048      */
049    @InterfaceAudience.Public
050    @InterfaceStability.Stable
051    public class KeyFieldBasedPartitioner<K2, V2> extends Partitioner<K2, V2> 
052        implements Configurable {
053    
054      private static final Log LOG = LogFactory.getLog(
055                                       KeyFieldBasedPartitioner.class.getName());
056      public static String PARTITIONER_OPTIONS = 
057        "mapreduce.partition.keypartitioner.options";
058      private int numOfPartitionFields;
059      
060      private KeyFieldHelper keyFieldHelper = new KeyFieldHelper();
061      
062      private Configuration conf;
063    
064      public void setConf(Configuration conf) {
065        this.conf = conf;
066        keyFieldHelper = new KeyFieldHelper();
067        String keyFieldSeparator = 
068          conf.get(MRJobConfig.MAP_OUTPUT_KEY_FIELD_SEPERATOR, "\t");
069        keyFieldHelper.setKeyFieldSeparator(keyFieldSeparator);
070        if (conf.get("num.key.fields.for.partition") != null) {
071          LOG.warn("Using deprecated num.key.fields.for.partition. " +
072                    "Use mapreduce.partition.keypartitioner.options instead");
073          this.numOfPartitionFields = conf.getInt("num.key.fields.for.partition",0);
074          keyFieldHelper.setKeyFieldSpec(1,numOfPartitionFields);
075        } else {
076          String option = conf.get(PARTITIONER_OPTIONS);
077          keyFieldHelper.parseOption(option);
078        }
079      }
080    
081      public Configuration getConf() {
082        return conf;
083      }
084      
085      public int getPartition(K2 key, V2 value, int numReduceTasks) {
086        byte[] keyBytes;
087    
088        List <KeyDescription> allKeySpecs = keyFieldHelper.keySpecs();
089        if (allKeySpecs.size() == 0) {
090          return getPartition(key.toString().hashCode(), numReduceTasks);
091        }
092    
093        try {
094          keyBytes = key.toString().getBytes("UTF-8");
095        } catch (UnsupportedEncodingException e) {
096          throw new RuntimeException("The current system does not " +
097              "support UTF-8 encoding!", e);
098        }
099        // return 0 if the key is empty
100        if (keyBytes.length == 0) {
101          return 0;
102        }
103        
104        int []lengthIndicesFirst = keyFieldHelper.getWordLengths(keyBytes, 0, 
105            keyBytes.length);
106        int currentHash = 0;
107        for (KeyDescription keySpec : allKeySpecs) {
108          int startChar = keyFieldHelper.getStartOffset(keyBytes, 0, 
109            keyBytes.length, lengthIndicesFirst, keySpec);
110           // no key found! continue
111          if (startChar < 0) {
112            continue;
113          }
114          int endChar = keyFieldHelper.getEndOffset(keyBytes, 0, keyBytes.length, 
115              lengthIndicesFirst, keySpec);
116          currentHash = hashCode(keyBytes, startChar, endChar, 
117              currentHash);
118        }
119        return getPartition(currentHash, numReduceTasks);
120      }
121      
122      protected int hashCode(byte[] b, int start, int end, int currentHash) {
123        for (int i = start; i <= end; i++) {
124          currentHash = 31*currentHash + b[i];
125        }
126        return currentHash;
127      }
128    
129      protected int getPartition(int hash, int numReduceTasks) {
130        return (hash & Integer.MAX_VALUE) % numReduceTasks;
131      }
132      
133      /**
134       * Set the {@link KeyFieldBasedPartitioner} options used for 
135       * {@link Partitioner}
136       * 
137       * @param keySpec the key specification of the form -k pos1[,pos2], where,
138       *  pos is of the form f[.c][opts], where f is the number
139       *  of the key field to use, and c is the number of the first character from
140       *  the beginning of the field. Fields and character posns are numbered 
141       *  starting with 1; a character position of zero in pos2 indicates the
142       *  field's last character. If '.c' is omitted from pos1, it defaults to 1
143       *  (the beginning of the field); if omitted from pos2, it defaults to 0 
144       *  (the end of the field).
145       */
146      public void setKeyFieldPartitionerOptions(Job job, String keySpec) {
147        job.getConfiguration().set(PARTITIONER_OPTIONS, keySpec);
148      }
149      
150      /**
151       * Get the {@link KeyFieldBasedPartitioner} options
152       */
153      public String getKeyFieldPartitionerOption(JobContext job) {
154        return job.getConfiguration().get(PARTITIONER_OPTIONS);
155      }
156    
157    
158    }