001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019 package org.apache.hadoop.mapreduce.lib.partition; 020 021 import java.io.UnsupportedEncodingException; 022 import java.util.List; 023 024 import org.apache.commons.logging.Log; 025 import org.apache.commons.logging.LogFactory; 026 import org.apache.hadoop.classification.InterfaceAudience; 027 import org.apache.hadoop.classification.InterfaceStability; 028 import org.apache.hadoop.conf.Configurable; 029 import org.apache.hadoop.conf.Configuration; 030 import org.apache.hadoop.mapreduce.Job; 031 import org.apache.hadoop.mapreduce.JobContext; 032 import org.apache.hadoop.mapreduce.MRJobConfig; 033 import org.apache.hadoop.mapreduce.Partitioner; 034 import org.apache.hadoop.mapreduce.lib.partition.KeyFieldHelper.KeyDescription; 035 036 /** 037 * Defines a way to partition keys based on certain key fields (also see 038 * {@link KeyFieldBasedComparator}. 039 * The key specification supported is of the form -k pos1[,pos2], where, 040 * pos is of the form f[.c][opts], where f is the number 041 * of the key field to use, and c is the number of the first character from 042 * the beginning of the field. Fields and character posns are numbered 043 * starting with 1; a character position of zero in pos2 indicates the 044 * field's last character. If '.c' is omitted from pos1, it defaults to 1 045 * (the beginning of the field); if omitted from pos2, it defaults to 0 046 * (the end of the field). 047 * 048 */ 049 @InterfaceAudience.Public 050 @InterfaceStability.Stable 051 public class KeyFieldBasedPartitioner<K2, V2> extends Partitioner<K2, V2> 052 implements Configurable { 053 054 private static final Log LOG = LogFactory.getLog( 055 KeyFieldBasedPartitioner.class.getName()); 056 public static String PARTITIONER_OPTIONS = 057 "mapreduce.partition.keypartitioner.options"; 058 private int numOfPartitionFields; 059 060 private KeyFieldHelper keyFieldHelper = new KeyFieldHelper(); 061 062 private Configuration conf; 063 064 public void setConf(Configuration conf) { 065 this.conf = conf; 066 keyFieldHelper = new KeyFieldHelper(); 067 String keyFieldSeparator = 068 conf.get(MRJobConfig.MAP_OUTPUT_KEY_FIELD_SEPERATOR, "\t"); 069 keyFieldHelper.setKeyFieldSeparator(keyFieldSeparator); 070 if (conf.get("num.key.fields.for.partition") != null) { 071 LOG.warn("Using deprecated num.key.fields.for.partition. " + 072 "Use mapreduce.partition.keypartitioner.options instead"); 073 this.numOfPartitionFields = conf.getInt("num.key.fields.for.partition",0); 074 keyFieldHelper.setKeyFieldSpec(1,numOfPartitionFields); 075 } else { 076 String option = conf.get(PARTITIONER_OPTIONS); 077 keyFieldHelper.parseOption(option); 078 } 079 } 080 081 public Configuration getConf() { 082 return conf; 083 } 084 085 public int getPartition(K2 key, V2 value, int numReduceTasks) { 086 byte[] keyBytes; 087 088 List <KeyDescription> allKeySpecs = keyFieldHelper.keySpecs(); 089 if (allKeySpecs.size() == 0) { 090 return getPartition(key.toString().hashCode(), numReduceTasks); 091 } 092 093 try { 094 keyBytes = key.toString().getBytes("UTF-8"); 095 } catch (UnsupportedEncodingException e) { 096 throw new RuntimeException("The current system does not " + 097 "support UTF-8 encoding!", e); 098 } 099 // return 0 if the key is empty 100 if (keyBytes.length == 0) { 101 return 0; 102 } 103 104 int []lengthIndicesFirst = keyFieldHelper.getWordLengths(keyBytes, 0, 105 keyBytes.length); 106 int currentHash = 0; 107 for (KeyDescription keySpec : allKeySpecs) { 108 int startChar = keyFieldHelper.getStartOffset(keyBytes, 0, 109 keyBytes.length, lengthIndicesFirst, keySpec); 110 // no key found! continue 111 if (startChar < 0) { 112 continue; 113 } 114 int endChar = keyFieldHelper.getEndOffset(keyBytes, 0, keyBytes.length, 115 lengthIndicesFirst, keySpec); 116 currentHash = hashCode(keyBytes, startChar, endChar, 117 currentHash); 118 } 119 return getPartition(currentHash, numReduceTasks); 120 } 121 122 protected int hashCode(byte[] b, int start, int end, int currentHash) { 123 for (int i = start; i <= end; i++) { 124 currentHash = 31*currentHash + b[i]; 125 } 126 return currentHash; 127 } 128 129 protected int getPartition(int hash, int numReduceTasks) { 130 return (hash & Integer.MAX_VALUE) % numReduceTasks; 131 } 132 133 /** 134 * Set the {@link KeyFieldBasedPartitioner} options used for 135 * {@link Partitioner} 136 * 137 * @param keySpec the key specification of the form -k pos1[,pos2], where, 138 * pos is of the form f[.c][opts], where f is the number 139 * of the key field to use, and c is the number of the first character from 140 * the beginning of the field. Fields and character posns are numbered 141 * starting with 1; a character position of zero in pos2 indicates the 142 * field's last character. If '.c' is omitted from pos1, it defaults to 1 143 * (the beginning of the field); if omitted from pos2, it defaults to 0 144 * (the end of the field). 145 */ 146 public void setKeyFieldPartitionerOptions(Job job, String keySpec) { 147 job.getConfiguration().set(PARTITIONER_OPTIONS, keySpec); 148 } 149 150 /** 151 * Get the {@link KeyFieldBasedPartitioner} options 152 */ 153 public String getKeyFieldPartitionerOption(JobContext job) { 154 return job.getConfiguration().get(PARTITIONER_OPTIONS); 155 } 156 157 158 }