001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.mapreduce.lib.partition;
020
021import org.apache.hadoop.classification.InterfaceAudience;
022import org.apache.hadoop.classification.InterfaceStability;
023import org.apache.hadoop.conf.Configurable;
024import org.apache.hadoop.conf.Configuration;
025import org.apache.hadoop.io.BinaryComparable;
026import org.apache.hadoop.io.WritableComparator;
027import org.apache.hadoop.mapreduce.Partitioner;
028
029/**
030 * <p>Partition {@link BinaryComparable} keys using a configurable part of 
031 * the bytes array returned by {@link BinaryComparable#getBytes()}.</p>
032 * 
033 * <p>The subarray to be used for the partitioning can be defined by means
034 * of the following properties:
035 * <ul>
036 *   <li>
037 *     <i>mapreduce.partition.binarypartitioner.left.offset</i>:
038 *     left offset in array (0 by default)
039 *   </li>
040 *   <li>
041 *     <i>mapreduce.partition.binarypartitioner.right.offset</i>: 
042 *     right offset in array (-1 by default)
043 *   </li>
044 * </ul>
045 * Like in Python, both negative and positive offsets are allowed, but
046 * the meaning is slightly different. In case of an array of length 5,
047 * for instance, the possible offsets are:
048 * <pre><code>
049 *  +---+---+---+---+---+
050 *  | B | B | B | B | B |
051 *  +---+---+---+---+---+
052 *    0   1   2   3   4
053 *   -5  -4  -3  -2  -1
054 * </code></pre>
055 * The first row of numbers gives the position of the offsets 0...5 in 
056 * the array; the second row gives the corresponding negative offsets. 
057 * Contrary to Python, the specified subarray has byte <code>i</code> 
058 * and <code>j</code> as first and last element, repectively, when 
059 * <code>i</code> and <code>j</code> are the left and right offset.
060 * 
061 * <p>For Hadoop programs written in Java, it is advisable to use one of 
062 * the following static convenience methods for setting the offsets:
063 * <ul>
064 *   <li>{@link #setOffsets}</li>
065 *   <li>{@link #setLeftOffset}</li>
066 *   <li>{@link #setRightOffset}</li>
067 * </ul>
068 */
069@InterfaceAudience.Public
070@InterfaceStability.Evolving
071public class BinaryPartitioner<V> extends Partitioner<BinaryComparable, V> 
072  implements Configurable {
073
074  public static final String LEFT_OFFSET_PROPERTY_NAME = 
075    "mapreduce.partition.binarypartitioner.left.offset";
076  public static final String RIGHT_OFFSET_PROPERTY_NAME = 
077    "mapreduce.partition.binarypartitioner.right.offset";
078  
079  /**
080   * Set the subarray to be used for partitioning to 
081   * <code>bytes[left:(right+1)]</code> in Python syntax.
082   * 
083   * @param conf configuration object
084   * @param left left Python-style offset
085   * @param right right Python-style offset
086   */
087  public static void setOffsets(Configuration conf, int left, int right) {
088    conf.setInt(LEFT_OFFSET_PROPERTY_NAME, left);
089    conf.setInt(RIGHT_OFFSET_PROPERTY_NAME, right);
090  }
091  
092  /**
093   * Set the subarray to be used for partitioning to 
094   * <code>bytes[offset:]</code> in Python syntax.
095   * 
096   * @param conf configuration object
097   * @param offset left Python-style offset
098   */
099  public static void setLeftOffset(Configuration conf, int offset) {
100    conf.setInt(LEFT_OFFSET_PROPERTY_NAME, offset);
101  }
102  
103  /**
104   * Set the subarray to be used for partitioning to 
105   * <code>bytes[:(offset+1)]</code> in Python syntax.
106   * 
107   * @param conf configuration object
108   * @param offset right Python-style offset
109   */
110  public static void setRightOffset(Configuration conf, int offset) {
111    conf.setInt(RIGHT_OFFSET_PROPERTY_NAME, offset);
112  }
113  
114  
115  private Configuration conf;
116  private int leftOffset, rightOffset;
117  
118  public void setConf(Configuration conf) {
119    this.conf = conf;
120    leftOffset = conf.getInt(LEFT_OFFSET_PROPERTY_NAME, 0);
121    rightOffset = conf.getInt(RIGHT_OFFSET_PROPERTY_NAME, -1);
122  }
123  
124  public Configuration getConf() {
125    return conf;
126  }
127  
128  /** 
129   * Use (the specified slice of the array returned by) 
130   * {@link BinaryComparable#getBytes()} to partition. 
131   */
132  @Override
133  public int getPartition(BinaryComparable key, V value, int numPartitions) {
134    int length = key.getLength();
135    int leftIndex = (leftOffset + length) % length;
136    int rightIndex = (rightOffset + length) % length;
137    int hash = WritableComparator.hashBytes(key.getBytes(), 
138      leftIndex, rightIndex - leftIndex + 1);
139    return (hash & Integer.MAX_VALUE) % numPartitions;
140  }
141  
142}