001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019 package org.apache.hadoop.mapreduce.lib.partition; 020 021 import org.apache.hadoop.classification.InterfaceAudience; 022 import org.apache.hadoop.classification.InterfaceStability; 023 import org.apache.hadoop.conf.Configurable; 024 import org.apache.hadoop.conf.Configuration; 025 import org.apache.hadoop.io.BinaryComparable; 026 import org.apache.hadoop.io.WritableComparator; 027 import org.apache.hadoop.mapreduce.Partitioner; 028 029 /** 030 * <p>Partition {@link BinaryComparable} keys using a configurable part of 031 * the bytes array returned by {@link BinaryComparable#getBytes()}.</p> 032 * 033 * <p>The subarray to be used for the partitioning can be defined by means 034 * of the following properties: 035 * <ul> 036 * <li> 037 * <i>mapreduce.partition.binarypartitioner.left.offset</i>: 038 * left offset in array (0 by default) 039 * </li> 040 * <li> 041 * <i>mapreduce.partition.binarypartitioner.right.offset</i>: 042 * right offset in array (-1 by default) 043 * </li> 044 * </ul> 045 * Like in Python, both negative and positive offsets are allowed, but 046 * the meaning is slightly different. In case of an array of length 5, 047 * for instance, the possible offsets are: 048 * <pre><code> 049 * +---+---+---+---+---+ 050 * | B | B | B | B | B | 051 * +---+---+---+---+---+ 052 * 0 1 2 3 4 053 * -5 -4 -3 -2 -1 054 * </code></pre> 055 * The first row of numbers gives the position of the offsets 0...5 in 056 * the array; the second row gives the corresponding negative offsets. 057 * Contrary to Python, the specified subarray has byte <code>i</code> 058 * and <code>j</code> as first and last element, repectively, when 059 * <code>i</code> and <code>j</code> are the left and right offset. 060 * 061 * <p>For Hadoop programs written in Java, it is advisable to use one of 062 * the following static convenience methods for setting the offsets: 063 * <ul> 064 * <li>{@link #setOffsets}</li> 065 * <li>{@link #setLeftOffset}</li> 066 * <li>{@link #setRightOffset}</li> 067 * </ul></p> 068 */ 069 @InterfaceAudience.Public 070 @InterfaceStability.Evolving 071 public class BinaryPartitioner<V> extends Partitioner<BinaryComparable, V> 072 implements Configurable { 073 074 public static final String LEFT_OFFSET_PROPERTY_NAME = 075 "mapreduce.partition.binarypartitioner.left.offset"; 076 public static final String RIGHT_OFFSET_PROPERTY_NAME = 077 "mapreduce.partition.binarypartitioner.right.offset"; 078 079 /** 080 * Set the subarray to be used for partitioning to 081 * <code>bytes[left:(right+1)]</code> in Python syntax. 082 * 083 * @param conf configuration object 084 * @param left left Python-style offset 085 * @param right right Python-style offset 086 */ 087 public static void setOffsets(Configuration conf, int left, int right) { 088 conf.setInt(LEFT_OFFSET_PROPERTY_NAME, left); 089 conf.setInt(RIGHT_OFFSET_PROPERTY_NAME, right); 090 } 091 092 /** 093 * Set the subarray to be used for partitioning to 094 * <code>bytes[offset:]</code> in Python syntax. 095 * 096 * @param conf configuration object 097 * @param offset left Python-style offset 098 */ 099 public static void setLeftOffset(Configuration conf, int offset) { 100 conf.setInt(LEFT_OFFSET_PROPERTY_NAME, offset); 101 } 102 103 /** 104 * Set the subarray to be used for partitioning to 105 * <code>bytes[:(offset+1)]</code> in Python syntax. 106 * 107 * @param conf configuration object 108 * @param offset right Python-style offset 109 */ 110 public static void setRightOffset(Configuration conf, int offset) { 111 conf.setInt(RIGHT_OFFSET_PROPERTY_NAME, offset); 112 } 113 114 115 private Configuration conf; 116 private int leftOffset, rightOffset; 117 118 public void setConf(Configuration conf) { 119 this.conf = conf; 120 leftOffset = conf.getInt(LEFT_OFFSET_PROPERTY_NAME, 0); 121 rightOffset = conf.getInt(RIGHT_OFFSET_PROPERTY_NAME, -1); 122 } 123 124 public Configuration getConf() { 125 return conf; 126 } 127 128 /** 129 * Use (the specified slice of the array returned by) 130 * {@link BinaryComparable#getBytes()} to partition. 131 */ 132 @Override 133 public int getPartition(BinaryComparable key, V value, int numPartitions) { 134 int length = key.getLength(); 135 int leftIndex = (leftOffset + length) % length; 136 int rightIndex = (rightOffset + length) % length; 137 int hash = WritableComparator.hashBytes(key.getBytes(), 138 leftIndex, rightIndex - leftIndex + 1); 139 return (hash & Integer.MAX_VALUE) % numPartitions; 140 } 141 142 }