001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.mapreduce.lib.partition;
020
021 import org.apache.hadoop.classification.InterfaceAudience;
022 import org.apache.hadoop.classification.InterfaceStability;
023 import org.apache.hadoop.conf.Configurable;
024 import org.apache.hadoop.conf.Configuration;
025 import org.apache.hadoop.io.BinaryComparable;
026 import org.apache.hadoop.io.WritableComparator;
027 import org.apache.hadoop.mapreduce.Partitioner;
028
029 /**
030 * <p>Partition {@link BinaryComparable} keys using a configurable part of
031 * the bytes array returned by {@link BinaryComparable#getBytes()}.</p>
032 *
033 * <p>The subarray to be used for the partitioning can be defined by means
034 * of the following properties:
035 * <ul>
036 * <li>
037 * <i>mapreduce.partition.binarypartitioner.left.offset</i>:
038 * left offset in array (0 by default)
039 * </li>
040 * <li>
041 * <i>mapreduce.partition.binarypartitioner.right.offset</i>:
042 * right offset in array (-1 by default)
043 * </li>
044 * </ul>
045 * Like in Python, both negative and positive offsets are allowed, but
046 * the meaning is slightly different. In case of an array of length 5,
047 * for instance, the possible offsets are:
048 * <pre><code>
049 * +---+---+---+---+---+
050 * | B | B | B | B | B |
051 * +---+---+---+---+---+
052 * 0 1 2 3 4
053 * -5 -4 -3 -2 -1
054 * </code></pre>
055 * The first row of numbers gives the position of the offsets 0...5 in
056 * the array; the second row gives the corresponding negative offsets.
057 * Contrary to Python, the specified subarray has byte <code>i</code>
058 * and <code>j</code> as first and last element, repectively, when
059 * <code>i</code> and <code>j</code> are the left and right offset.
060 *
061 * <p>For Hadoop programs written in Java, it is advisable to use one of
062 * the following static convenience methods for setting the offsets:
063 * <ul>
064 * <li>{@link #setOffsets}</li>
065 * <li>{@link #setLeftOffset}</li>
066 * <li>{@link #setRightOffset}</li>
067 * </ul></p>
068 */
069 @InterfaceAudience.Public
070 @InterfaceStability.Evolving
071 public class BinaryPartitioner<V> extends Partitioner<BinaryComparable, V>
072 implements Configurable {
073
074 public static final String LEFT_OFFSET_PROPERTY_NAME =
075 "mapreduce.partition.binarypartitioner.left.offset";
076 public static final String RIGHT_OFFSET_PROPERTY_NAME =
077 "mapreduce.partition.binarypartitioner.right.offset";
078
079 /**
080 * Set the subarray to be used for partitioning to
081 * <code>bytes[left:(right+1)]</code> in Python syntax.
082 *
083 * @param conf configuration object
084 * @param left left Python-style offset
085 * @param right right Python-style offset
086 */
087 public static void setOffsets(Configuration conf, int left, int right) {
088 conf.setInt(LEFT_OFFSET_PROPERTY_NAME, left);
089 conf.setInt(RIGHT_OFFSET_PROPERTY_NAME, right);
090 }
091
092 /**
093 * Set the subarray to be used for partitioning to
094 * <code>bytes[offset:]</code> in Python syntax.
095 *
096 * @param conf configuration object
097 * @param offset left Python-style offset
098 */
099 public static void setLeftOffset(Configuration conf, int offset) {
100 conf.setInt(LEFT_OFFSET_PROPERTY_NAME, offset);
101 }
102
103 /**
104 * Set the subarray to be used for partitioning to
105 * <code>bytes[:(offset+1)]</code> in Python syntax.
106 *
107 * @param conf configuration object
108 * @param offset right Python-style offset
109 */
110 public static void setRightOffset(Configuration conf, int offset) {
111 conf.setInt(RIGHT_OFFSET_PROPERTY_NAME, offset);
112 }
113
114
115 private Configuration conf;
116 private int leftOffset, rightOffset;
117
118 public void setConf(Configuration conf) {
119 this.conf = conf;
120 leftOffset = conf.getInt(LEFT_OFFSET_PROPERTY_NAME, 0);
121 rightOffset = conf.getInt(RIGHT_OFFSET_PROPERTY_NAME, -1);
122 }
123
124 public Configuration getConf() {
125 return conf;
126 }
127
128 /**
129 * Use (the specified slice of the array returned by)
130 * {@link BinaryComparable#getBytes()} to partition.
131 */
132 @Override
133 public int getPartition(BinaryComparable key, V value, int numPartitions) {
134 int length = key.getLength();
135 int leftIndex = (leftOffset + length) % length;
136 int rightIndex = (rightOffset + length) % length;
137 int hash = WritableComparator.hashBytes(key.getBytes(),
138 leftIndex, rightIndex - leftIndex + 1);
139 return (hash & Integer.MAX_VALUE) % numPartitions;
140 }
141
142 }