001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.mapreduce.lib.partition;
020
021 import java.io.UnsupportedEncodingException;
022 import java.util.List;
023
024 import org.apache.commons.logging.Log;
025 import org.apache.commons.logging.LogFactory;
026 import org.apache.hadoop.classification.InterfaceAudience;
027 import org.apache.hadoop.classification.InterfaceStability;
028 import org.apache.hadoop.conf.Configurable;
029 import org.apache.hadoop.conf.Configuration;
030 import org.apache.hadoop.mapreduce.Job;
031 import org.apache.hadoop.mapreduce.JobContext;
032 import org.apache.hadoop.mapreduce.MRJobConfig;
033 import org.apache.hadoop.mapreduce.Partitioner;
034 import org.apache.hadoop.mapreduce.lib.partition.KeyFieldHelper.KeyDescription;
035
036 /**
037 * Defines a way to partition keys based on certain key fields (also see
038 * {@link KeyFieldBasedComparator}.
039 * The key specification supported is of the form -k pos1[,pos2], where,
040 * pos is of the form f[.c][opts], where f is the number
041 * of the key field to use, and c is the number of the first character from
042 * the beginning of the field. Fields and character posns are numbered
043 * starting with 1; a character position of zero in pos2 indicates the
044 * field's last character. If '.c' is omitted from pos1, it defaults to 1
045 * (the beginning of the field); if omitted from pos2, it defaults to 0
046 * (the end of the field).
047 *
048 */
049 @InterfaceAudience.Public
050 @InterfaceStability.Stable
051 public class KeyFieldBasedPartitioner<K2, V2> extends Partitioner<K2, V2>
052 implements Configurable {
053
054 private static final Log LOG = LogFactory.getLog(
055 KeyFieldBasedPartitioner.class.getName());
056 public static String PARTITIONER_OPTIONS =
057 "mapreduce.partition.keypartitioner.options";
058 private int numOfPartitionFields;
059
060 private KeyFieldHelper keyFieldHelper = new KeyFieldHelper();
061
062 private Configuration conf;
063
064 public void setConf(Configuration conf) {
065 this.conf = conf;
066 keyFieldHelper = new KeyFieldHelper();
067 String keyFieldSeparator =
068 conf.get(MRJobConfig.MAP_OUTPUT_KEY_FIELD_SEPERATOR, "\t");
069 keyFieldHelper.setKeyFieldSeparator(keyFieldSeparator);
070 if (conf.get("num.key.fields.for.partition") != null) {
071 LOG.warn("Using deprecated num.key.fields.for.partition. " +
072 "Use mapreduce.partition.keypartitioner.options instead");
073 this.numOfPartitionFields = conf.getInt("num.key.fields.for.partition",0);
074 keyFieldHelper.setKeyFieldSpec(1,numOfPartitionFields);
075 } else {
076 String option = conf.get(PARTITIONER_OPTIONS);
077 keyFieldHelper.parseOption(option);
078 }
079 }
080
081 public Configuration getConf() {
082 return conf;
083 }
084
085 public int getPartition(K2 key, V2 value, int numReduceTasks) {
086 byte[] keyBytes;
087
088 List <KeyDescription> allKeySpecs = keyFieldHelper.keySpecs();
089 if (allKeySpecs.size() == 0) {
090 return getPartition(key.toString().hashCode(), numReduceTasks);
091 }
092
093 try {
094 keyBytes = key.toString().getBytes("UTF-8");
095 } catch (UnsupportedEncodingException e) {
096 throw new RuntimeException("The current system does not " +
097 "support UTF-8 encoding!", e);
098 }
099 // return 0 if the key is empty
100 if (keyBytes.length == 0) {
101 return 0;
102 }
103
104 int []lengthIndicesFirst = keyFieldHelper.getWordLengths(keyBytes, 0,
105 keyBytes.length);
106 int currentHash = 0;
107 for (KeyDescription keySpec : allKeySpecs) {
108 int startChar = keyFieldHelper.getStartOffset(keyBytes, 0,
109 keyBytes.length, lengthIndicesFirst, keySpec);
110 // no key found! continue
111 if (startChar < 0) {
112 continue;
113 }
114 int endChar = keyFieldHelper.getEndOffset(keyBytes, 0, keyBytes.length,
115 lengthIndicesFirst, keySpec);
116 currentHash = hashCode(keyBytes, startChar, endChar,
117 currentHash);
118 }
119 return getPartition(currentHash, numReduceTasks);
120 }
121
122 protected int hashCode(byte[] b, int start, int end, int currentHash) {
123 for (int i = start; i <= end; i++) {
124 currentHash = 31*currentHash + b[i];
125 }
126 return currentHash;
127 }
128
129 protected int getPartition(int hash, int numReduceTasks) {
130 return (hash & Integer.MAX_VALUE) % numReduceTasks;
131 }
132
133 /**
134 * Set the {@link KeyFieldBasedPartitioner} options used for
135 * {@link Partitioner}
136 *
137 * @param keySpec the key specification of the form -k pos1[,pos2], where,
138 * pos is of the form f[.c][opts], where f is the number
139 * of the key field to use, and c is the number of the first character from
140 * the beginning of the field. Fields and character posns are numbered
141 * starting with 1; a character position of zero in pos2 indicates the
142 * field's last character. If '.c' is omitted from pos1, it defaults to 1
143 * (the beginning of the field); if omitted from pos2, it defaults to 0
144 * (the end of the field).
145 */
146 public void setKeyFieldPartitionerOptions(Job job, String keySpec) {
147 job.getConfiguration().set(PARTITIONER_OPTIONS, keySpec);
148 }
149
150 /**
151 * Get the {@link KeyFieldBasedPartitioner} options
152 */
153 public String getKeyFieldPartitionerOption(JobContext job) {
154 return job.getConfiguration().get(PARTITIONER_OPTIONS);
155 }
156
157
158 }