001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.mapreduce.lib.partition;
020
021import java.io.IOException;
022import java.lang.reflect.Array;
023import java.util.ArrayList;
024import java.util.Arrays;
025
026import org.apache.commons.logging.Log;
027import org.apache.commons.logging.LogFactory;
028import org.apache.hadoop.classification.InterfaceAudience;
029import org.apache.hadoop.classification.InterfaceStability;
030import org.apache.hadoop.conf.Configurable;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.fs.FileSystem;
033import org.apache.hadoop.fs.Path;
034import org.apache.hadoop.io.BinaryComparable;
035import org.apache.hadoop.io.IOUtils;
036import org.apache.hadoop.io.NullWritable;
037import org.apache.hadoop.io.SequenceFile;
038import org.apache.hadoop.io.RawComparator;
039import org.apache.hadoop.io.WritableComparable;
040import org.apache.hadoop.mapreduce.Job;
041import org.apache.hadoop.mapreduce.Partitioner;
042import org.apache.hadoop.util.ReflectionUtils;
043
044/**
045 * Partitioner effecting a total order by reading split points from
046 * an externally generated source.
047 */
048@InterfaceAudience.Public
049@InterfaceStability.Stable
050public class TotalOrderPartitioner<K extends WritableComparable<?>,V>
051    extends Partitioner<K,V> implements Configurable {
052
053  private Node partitions;
054  public static final String DEFAULT_PATH = "_partition.lst";
055  public static final String PARTITIONER_PATH = 
056    "mapreduce.totalorderpartitioner.path";
057  public static final String MAX_TRIE_DEPTH = 
058    "mapreduce.totalorderpartitioner.trie.maxdepth"; 
059  public static final String NATURAL_ORDER = 
060    "mapreduce.totalorderpartitioner.naturalorder";
061  Configuration conf;
062  private static final Log LOG = LogFactory.getLog(TotalOrderPartitioner.class);
063
064  public TotalOrderPartitioner() { }
065
066  /**
067   * Read in the partition file and build indexing data structures.
068   * If the keytype is {@link org.apache.hadoop.io.BinaryComparable} and
069   * <tt>total.order.partitioner.natural.order</tt> is not false, a trie
070   * of the first <tt>total.order.partitioner.max.trie.depth</tt>(2) + 1 bytes
071   * will be built. Otherwise, keys will be located using a binary search of
072   * the partition keyset using the {@link org.apache.hadoop.io.RawComparator}
073   * defined for this job. The input file must be sorted with the same
074   * comparator and contain {@link Job#getNumReduceTasks()} - 1 keys.
075   */
076  @SuppressWarnings("unchecked") // keytype from conf not static
077  public void setConf(Configuration conf) {
078    try {
079      this.conf = conf;
080      String parts = getPartitionFile(conf);
081      final Path partFile = new Path(parts);
082      final FileSystem fs = (DEFAULT_PATH.equals(parts))
083        ? FileSystem.getLocal(conf)     // assume in DistributedCache
084        : partFile.getFileSystem(conf);
085
086      Job job = Job.getInstance(conf);
087      Class<K> keyClass = (Class<K>)job.getMapOutputKeyClass();
088      K[] splitPoints = readPartitions(fs, partFile, keyClass, conf);
089      if (splitPoints.length != job.getNumReduceTasks() - 1) {
090        throw new IOException("Wrong number of partitions in keyset");
091      }
092      RawComparator<K> comparator =
093        (RawComparator<K>) job.getSortComparator();
094      for (int i = 0; i < splitPoints.length - 1; ++i) {
095        if (comparator.compare(splitPoints[i], splitPoints[i+1]) >= 0) {
096          throw new IOException("Split points are out of order");
097        }
098      }
099      boolean natOrder =
100        conf.getBoolean(NATURAL_ORDER, true);
101      if (natOrder && BinaryComparable.class.isAssignableFrom(keyClass)) {
102        partitions = buildTrie((BinaryComparable[])splitPoints, 0,
103            splitPoints.length, new byte[0],
104            // Now that blocks of identical splitless trie nodes are 
105            // represented reentrantly, and we develop a leaf for any trie
106            // node with only one split point, the only reason for a depth
107            // limit is to refute stack overflow or bloat in the pathological
108            // case where the split points are long and mostly look like bytes 
109            // iii...iixii...iii   .  Therefore, we make the default depth
110            // limit large but not huge.
111            conf.getInt(MAX_TRIE_DEPTH, 200));
112      } else {
113        partitions = new BinarySearchNode(splitPoints, comparator);
114      }
115    } catch (IOException e) {
116      throw new IllegalArgumentException("Can't read partitions file", e);
117    }
118  }
119
120  public Configuration getConf() {
121    return conf;
122  }
123  
124  // by construction, we know if our keytype
125  @SuppressWarnings("unchecked") // is memcmp-able and uses the trie
126  public int getPartition(K key, V value, int numPartitions) {
127    return partitions.findPartition(key);
128  }
129
130  /**
131   * Set the path to the SequenceFile storing the sorted partition keyset.
132   * It must be the case that for <tt>R</tt> reduces, there are <tt>R-1</tt>
133   * keys in the SequenceFile.
134   */
135  public static void setPartitionFile(Configuration conf, Path p) {
136    conf.set(PARTITIONER_PATH, p.toString());
137  }
138
139  /**
140   * Get the path to the SequenceFile storing the sorted partition keyset.
141   * @see #setPartitionFile(Configuration, Path)
142   */
143  public static String getPartitionFile(Configuration conf) {
144    return conf.get(PARTITIONER_PATH, DEFAULT_PATH);
145  }
146
147  /**
148   * Interface to the partitioner to locate a key in the partition keyset.
149   */
150  interface Node<T> {
151    /**
152     * Locate partition in keyset K, st [Ki..Ki+1) defines a partition,
153     * with implicit K0 = -inf, Kn = +inf, and |K| = #partitions - 1.
154     */
155    int findPartition(T key);
156  }
157
158  /**
159   * Base class for trie nodes. If the keytype is memcomp-able, this builds
160   * tries of the first <tt>total.order.partitioner.max.trie.depth</tt>
161   * bytes.
162   */
163  static abstract class TrieNode implements Node<BinaryComparable> {
164    private final int level;
165    TrieNode(int level) {
166      this.level = level;
167    }
168    int getLevel() {
169      return level;
170    }
171  }
172
173  /**
174   * For types that are not {@link org.apache.hadoop.io.BinaryComparable} or
175   * where disabled by <tt>total.order.partitioner.natural.order</tt>,
176   * search the partition keyset with a binary search.
177   */
178  class BinarySearchNode implements Node<K> {
179    private final K[] splitPoints;
180    private final RawComparator<K> comparator;
181    BinarySearchNode(K[] splitPoints, RawComparator<K> comparator) {
182      this.splitPoints = splitPoints;
183      this.comparator = comparator;
184    }
185    public int findPartition(K key) {
186      final int pos = Arrays.binarySearch(splitPoints, key, comparator) + 1;
187      return (pos < 0) ? -pos : pos;
188    }
189  }
190
191  /**
192   * An inner trie node that contains 256 children based on the next
193   * character.
194   */
195  class InnerTrieNode extends TrieNode {
196    private TrieNode[] child = new TrieNode[256];
197
198    InnerTrieNode(int level) {
199      super(level);
200    }
201    public int findPartition(BinaryComparable key) {
202      int level = getLevel();
203      if (key.getLength() <= level) {
204        return child[0].findPartition(key);
205      }
206      return child[0xFF & key.getBytes()[level]].findPartition(key);
207    }
208  }
209  
210  /**
211   * @param level        the tree depth at this node
212   * @param splitPoints  the full split point vector, which holds
213   *                     the split point or points this leaf node
214   *                     should contain
215   * @param lower        first INcluded element of splitPoints
216   * @param upper        first EXcluded element of splitPoints
217   * @return  a leaf node.  They come in three kinds: no split points 
218   *          [and the findParttion returns a canned index], one split
219   *          point [and we compare with a single comparand], or more
220   *          than one [and we do a binary search].  The last case is
221   *          rare.
222   */
223  private TrieNode LeafTrieNodeFactory
224             (int level, BinaryComparable[] splitPoints, int lower, int upper) {
225      switch (upper - lower) {
226      case 0:
227          return new UnsplitTrieNode(level, lower);
228          
229      case 1:
230          return new SinglySplitTrieNode(level, splitPoints, lower);
231          
232      default:
233          return new LeafTrieNode(level, splitPoints, lower, upper);
234      }
235  }
236
237  /**
238   * A leaf trie node that scans for the key between lower..upper.
239   * 
240   * We don't generate many of these now, since we usually continue trie-ing 
241   * when more than one split point remains at this level. and we make different
242   * objects for nodes with 0 or 1 split point.
243   */
244  private class LeafTrieNode extends TrieNode {
245    final int lower;
246    final int upper;
247    final BinaryComparable[] splitPoints;
248    LeafTrieNode(int level, BinaryComparable[] splitPoints, int lower, int upper) {
249      super(level);
250      this.lower = lower;
251      this.upper = upper;
252      this.splitPoints = splitPoints;
253    }
254    public int findPartition(BinaryComparable key) {
255      final int pos = Arrays.binarySearch(splitPoints, lower, upper, key) + 1;
256      return (pos < 0) ? -pos : pos;
257    }
258  }
259  
260  private class UnsplitTrieNode extends TrieNode {
261      final int result;
262      
263      UnsplitTrieNode(int level, int value) {
264          super(level);
265          this.result = value;
266      }
267      
268      public int findPartition(BinaryComparable key) {
269          return result;
270      }
271  }
272  
273  private class SinglySplitTrieNode extends TrieNode {
274      final int               lower;
275      final BinaryComparable  mySplitPoint;
276      
277      SinglySplitTrieNode(int level, BinaryComparable[] splitPoints, int lower) {
278          super(level);
279          this.lower = lower;
280          this.mySplitPoint = splitPoints[lower];
281      }
282      
283      public int findPartition(BinaryComparable key) {
284          return lower + (key.compareTo(mySplitPoint) < 0 ? 0 : 1);
285      }
286  }
287
288
289  /**
290   * Read the cut points from the given IFile.
291   * @param fs The file system
292   * @param p The path to read
293   * @param keyClass The map output key class
294   * @param job The job config
295   * @throws IOException
296   */
297                                 // matching key types enforced by passing in
298  @SuppressWarnings("unchecked") // map output key class
299  private K[] readPartitions(FileSystem fs, Path p, Class<K> keyClass,
300      Configuration conf) throws IOException {
301    SequenceFile.Reader reader = new SequenceFile.Reader(fs, p, conf);
302    ArrayList<K> parts = new ArrayList<K>();
303    K key = ReflectionUtils.newInstance(keyClass, conf);
304    NullWritable value = NullWritable.get();
305    try {
306      while (reader.next(key, value)) {
307        parts.add(key);
308        key = ReflectionUtils.newInstance(keyClass, conf);
309      }
310      reader.close();
311      reader = null;
312    } finally {
313      IOUtils.cleanup(LOG, reader);
314    }
315    return parts.toArray((K[])Array.newInstance(keyClass, parts.size()));
316  }
317  
318  /**
319   * 
320   * This object contains a TrieNodeRef if there is such a thing that
321   * can be repeated.  Two adjacent trie node slots that contain no 
322   * split points can be filled with the same trie node, even if they
323   * are not on the same level.  See buildTreeRec, below.
324   *
325   */  
326  private class CarriedTrieNodeRef
327  {
328      TrieNode   content;
329      
330      CarriedTrieNodeRef() {
331          content = null;
332      }
333  }
334
335  
336  /**
337   * Given a sorted set of cut points, build a trie that will find the correct
338   * partition quickly.
339   * @param splits the list of cut points
340   * @param lower the lower bound of partitions 0..numPartitions-1
341   * @param upper the upper bound of partitions 0..numPartitions-1
342   * @param prefix the prefix that we have already checked against
343   * @param maxDepth the maximum depth we will build a trie for
344   * @return the trie node that will divide the splits correctly
345   */
346  private TrieNode buildTrie(BinaryComparable[] splits, int lower,
347          int upper, byte[] prefix, int maxDepth) {
348      return buildTrieRec
349               (splits, lower, upper, prefix, maxDepth, new CarriedTrieNodeRef());
350  }
351  
352  /**
353   * This is the core of buildTrie.  The interface, and stub, above, just adds
354   * an empty CarriedTrieNodeRef.  
355   * 
356   * We build trie nodes in depth first order, which is also in key space
357   * order.  Every leaf node is referenced as a slot in a parent internal
358   * node.  If two adjacent slots [in the DFO] hold leaf nodes that have
359   * no split point, then they are not separated by a split point either, 
360   * because there's no place in key space for that split point to exist.
361   * 
362   * When that happens, the leaf nodes would be semantically identical, and
363   * we reuse the object.  A single CarriedTrieNodeRef "ref" lives for the 
364   * duration of the tree-walk.  ref carries a potentially reusable, unsplit
365   * leaf node for such reuse until a leaf node with a split arises, which 
366   * breaks the chain until we need to make a new unsplit leaf node.
367   * 
368   * Note that this use of CarriedTrieNodeRef means that for internal nodes, 
369   * for internal nodes if this code is modified in any way we still need 
370   * to make or fill in the subnodes in key space order.
371   */
372  private TrieNode buildTrieRec(BinaryComparable[] splits, int lower,
373      int upper, byte[] prefix, int maxDepth, CarriedTrieNodeRef ref) {
374    final int depth = prefix.length;
375    // We generate leaves for a single split point as well as for 
376    // no split points.
377    if (depth >= maxDepth || lower >= upper - 1) {
378        // If we have two consecutive requests for an unsplit trie node, we
379        // can deliver the same one the second time.
380        if (lower == upper && ref.content != null) {
381            return ref.content;
382        }
383        TrieNode  result = LeafTrieNodeFactory(depth, splits, lower, upper);
384        ref.content = lower == upper ? result : null;
385        return result;
386    }
387    InnerTrieNode result = new InnerTrieNode(depth);
388    byte[] trial = Arrays.copyOf(prefix, prefix.length + 1);
389    // append an extra byte on to the prefix
390    int         currentBound = lower;
391    for(int ch = 0; ch < 0xFF; ++ch) {
392      trial[depth] = (byte) (ch + 1);
393      lower = currentBound;
394      while (currentBound < upper) {
395        if (splits[currentBound].compareTo(trial, 0, trial.length) >= 0) {
396          break;
397        }
398        currentBound += 1;
399      }
400      trial[depth] = (byte) ch;
401      result.child[0xFF & ch]
402                   = buildTrieRec(splits, lower, currentBound, trial, maxDepth, ref);
403    }
404    // pick up the rest
405    trial[depth] = (byte)0xFF;
406    result.child[0xFF] 
407                 = buildTrieRec(splits, lower, currentBound, trial, maxDepth, ref);
408    
409    return result;
410  }
411}