001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019 package org.apache.hadoop.mapreduce.lib.partition; 020 021 import java.io.IOException; 022 import java.lang.reflect.Array; 023 import java.util.ArrayList; 024 import java.util.Arrays; 025 026 import org.apache.commons.logging.Log; 027 import org.apache.commons.logging.LogFactory; 028 import org.apache.hadoop.classification.InterfaceAudience; 029 import org.apache.hadoop.classification.InterfaceStability; 030 import org.apache.hadoop.conf.Configurable; 031 import org.apache.hadoop.conf.Configuration; 032 import org.apache.hadoop.fs.FileSystem; 033 import org.apache.hadoop.fs.Path; 034 import org.apache.hadoop.io.BinaryComparable; 035 import org.apache.hadoop.io.IOUtils; 036 import org.apache.hadoop.io.NullWritable; 037 import org.apache.hadoop.io.SequenceFile; 038 import org.apache.hadoop.io.RawComparator; 039 import org.apache.hadoop.io.WritableComparable; 040 import org.apache.hadoop.mapreduce.Job; 041 import org.apache.hadoop.mapreduce.Partitioner; 042 import org.apache.hadoop.util.ReflectionUtils; 043 044 /** 045 * Partitioner effecting a total order by reading split points from 046 * an externally generated source. 047 */ 048 @InterfaceAudience.Public 049 @InterfaceStability.Stable 050 public class TotalOrderPartitioner<K extends WritableComparable<?>,V> 051 extends Partitioner<K,V> implements Configurable { 052 053 private Node partitions; 054 public static final String DEFAULT_PATH = "_partition.lst"; 055 public static final String PARTITIONER_PATH = 056 "mapreduce.totalorderpartitioner.path"; 057 public static final String MAX_TRIE_DEPTH = 058 "mapreduce.totalorderpartitioner.trie.maxdepth"; 059 public static final String NATURAL_ORDER = 060 "mapreduce.totalorderpartitioner.naturalorder"; 061 Configuration conf; 062 private static final Log LOG = LogFactory.getLog(TotalOrderPartitioner.class); 063 064 public TotalOrderPartitioner() { } 065 066 /** 067 * Read in the partition file and build indexing data structures. 068 * If the keytype is {@link org.apache.hadoop.io.BinaryComparable} and 069 * <tt>total.order.partitioner.natural.order</tt> is not false, a trie 070 * of the first <tt>total.order.partitioner.max.trie.depth</tt>(2) + 1 bytes 071 * will be built. Otherwise, keys will be located using a binary search of 072 * the partition keyset using the {@link org.apache.hadoop.io.RawComparator} 073 * defined for this job. The input file must be sorted with the same 074 * comparator and contain {@link Job#getNumReduceTasks()} - 1 keys. 075 */ 076 @SuppressWarnings("unchecked") // keytype from conf not static 077 public void setConf(Configuration conf) { 078 try { 079 this.conf = conf; 080 String parts = getPartitionFile(conf); 081 final Path partFile = new Path(parts); 082 final FileSystem fs = (DEFAULT_PATH.equals(parts)) 083 ? FileSystem.getLocal(conf) // assume in DistributedCache 084 : partFile.getFileSystem(conf); 085 086 Job job = new Job(conf); 087 Class<K> keyClass = (Class<K>)job.getMapOutputKeyClass(); 088 K[] splitPoints = readPartitions(fs, partFile, keyClass, conf); 089 if (splitPoints.length != job.getNumReduceTasks() - 1) { 090 throw new IOException("Wrong number of partitions in keyset"); 091 } 092 RawComparator<K> comparator = 093 (RawComparator<K>) job.getSortComparator(); 094 for (int i = 0; i < splitPoints.length - 1; ++i) { 095 if (comparator.compare(splitPoints[i], splitPoints[i+1]) >= 0) { 096 throw new IOException("Split points are out of order"); 097 } 098 } 099 boolean natOrder = 100 conf.getBoolean(NATURAL_ORDER, true); 101 if (natOrder && BinaryComparable.class.isAssignableFrom(keyClass)) { 102 partitions = buildTrie((BinaryComparable[])splitPoints, 0, 103 splitPoints.length, new byte[0], 104 // Now that blocks of identical splitless trie nodes are 105 // represented reentrantly, and we develop a leaf for any trie 106 // node with only one split point, the only reason for a depth 107 // limit is to refute stack overflow or bloat in the pathological 108 // case where the split points are long and mostly look like bytes 109 // iii...iixii...iii . Therefore, we make the default depth 110 // limit large but not huge. 111 conf.getInt(MAX_TRIE_DEPTH, 200)); 112 } else { 113 partitions = new BinarySearchNode(splitPoints, comparator); 114 } 115 } catch (IOException e) { 116 throw new IllegalArgumentException("Can't read partitions file", e); 117 } 118 } 119 120 public Configuration getConf() { 121 return conf; 122 } 123 124 // by construction, we know if our keytype 125 @SuppressWarnings("unchecked") // is memcmp-able and uses the trie 126 public int getPartition(K key, V value, int numPartitions) { 127 return partitions.findPartition(key); 128 } 129 130 /** 131 * Set the path to the SequenceFile storing the sorted partition keyset. 132 * It must be the case that for <tt>R</tt> reduces, there are <tt>R-1</tt> 133 * keys in the SequenceFile. 134 */ 135 public static void setPartitionFile(Configuration conf, Path p) { 136 conf.set(PARTITIONER_PATH, p.toString()); 137 } 138 139 /** 140 * Get the path to the SequenceFile storing the sorted partition keyset. 141 * @see #setPartitionFile(Configuration, Path) 142 */ 143 public static String getPartitionFile(Configuration conf) { 144 return conf.get(PARTITIONER_PATH, DEFAULT_PATH); 145 } 146 147 /** 148 * Interface to the partitioner to locate a key in the partition keyset. 149 */ 150 interface Node<T> { 151 /** 152 * Locate partition in keyset K, st [Ki..Ki+1) defines a partition, 153 * with implicit K0 = -inf, Kn = +inf, and |K| = #partitions - 1. 154 */ 155 int findPartition(T key); 156 } 157 158 /** 159 * Base class for trie nodes. If the keytype is memcomp-able, this builds 160 * tries of the first <tt>total.order.partitioner.max.trie.depth</tt> 161 * bytes. 162 */ 163 static abstract class TrieNode implements Node<BinaryComparable> { 164 private final int level; 165 TrieNode(int level) { 166 this.level = level; 167 } 168 int getLevel() { 169 return level; 170 } 171 } 172 173 /** 174 * For types that are not {@link org.apache.hadoop.io.BinaryComparable} or 175 * where disabled by <tt>total.order.partitioner.natural.order</tt>, 176 * search the partition keyset with a binary search. 177 */ 178 class BinarySearchNode implements Node<K> { 179 private final K[] splitPoints; 180 private final RawComparator<K> comparator; 181 BinarySearchNode(K[] splitPoints, RawComparator<K> comparator) { 182 this.splitPoints = splitPoints; 183 this.comparator = comparator; 184 } 185 public int findPartition(K key) { 186 final int pos = Arrays.binarySearch(splitPoints, key, comparator) + 1; 187 return (pos < 0) ? -pos : pos; 188 } 189 } 190 191 /** 192 * An inner trie node that contains 256 children based on the next 193 * character. 194 */ 195 class InnerTrieNode extends TrieNode { 196 private TrieNode[] child = new TrieNode[256]; 197 198 InnerTrieNode(int level) { 199 super(level); 200 } 201 public int findPartition(BinaryComparable key) { 202 int level = getLevel(); 203 if (key.getLength() <= level) { 204 return child[0].findPartition(key); 205 } 206 return child[0xFF & key.getBytes()[level]].findPartition(key); 207 } 208 } 209 210 /** 211 * @param level the tree depth at this node 212 * @param splitPoints the full split point vector, which holds 213 * the split point or points this leaf node 214 * should contain 215 * @param lower first INcluded element of splitPoints 216 * @param upper first EXcluded element of splitPoints 217 * @return a leaf node. They come in three kinds: no split points 218 * [and the findParttion returns a canned index], one split 219 * point [and we compare with a single comparand], or more 220 * than one [and we do a binary search]. The last case is 221 * rare. 222 */ 223 private TrieNode LeafTrieNodeFactory 224 (int level, BinaryComparable[] splitPoints, int lower, int upper) { 225 switch (upper - lower) { 226 case 0: 227 return new UnsplitTrieNode(level, lower); 228 229 case 1: 230 return new SinglySplitTrieNode(level, splitPoints, lower); 231 232 default: 233 return new LeafTrieNode(level, splitPoints, lower, upper); 234 } 235 } 236 237 /** 238 * A leaf trie node that scans for the key between lower..upper. 239 * 240 * We don't generate many of these now, since we usually continue trie-ing 241 * when more than one split point remains at this level. and we make different 242 * objects for nodes with 0 or 1 split point. 243 */ 244 private class LeafTrieNode extends TrieNode { 245 final int lower; 246 final int upper; 247 final BinaryComparable[] splitPoints; 248 LeafTrieNode(int level, BinaryComparable[] splitPoints, int lower, int upper) { 249 super(level); 250 this.lower = lower; 251 this.upper = upper; 252 this.splitPoints = splitPoints; 253 } 254 public int findPartition(BinaryComparable key) { 255 final int pos = Arrays.binarySearch(splitPoints, lower, upper, key) + 1; 256 return (pos < 0) ? -pos : pos; 257 } 258 } 259 260 private class UnsplitTrieNode extends TrieNode { 261 final int result; 262 263 UnsplitTrieNode(int level, int value) { 264 super(level); 265 this.result = value; 266 } 267 268 public int findPartition(BinaryComparable key) { 269 return result; 270 } 271 } 272 273 private class SinglySplitTrieNode extends TrieNode { 274 final int lower; 275 final BinaryComparable mySplitPoint; 276 277 SinglySplitTrieNode(int level, BinaryComparable[] splitPoints, int lower) { 278 super(level); 279 this.lower = lower; 280 this.mySplitPoint = splitPoints[lower]; 281 } 282 283 public int findPartition(BinaryComparable key) { 284 return lower + (key.compareTo(mySplitPoint) < 0 ? 0 : 1); 285 } 286 } 287 288 289 /** 290 * Read the cut points from the given IFile. 291 * @param fs The file system 292 * @param p The path to read 293 * @param keyClass The map output key class 294 * @param job The job config 295 * @throws IOException 296 */ 297 // matching key types enforced by passing in 298 @SuppressWarnings("unchecked") // map output key class 299 private K[] readPartitions(FileSystem fs, Path p, Class<K> keyClass, 300 Configuration conf) throws IOException { 301 SequenceFile.Reader reader = new SequenceFile.Reader(fs, p, conf); 302 ArrayList<K> parts = new ArrayList<K>(); 303 K key = ReflectionUtils.newInstance(keyClass, conf); 304 NullWritable value = NullWritable.get(); 305 try { 306 while (reader.next(key, value)) { 307 parts.add(key); 308 key = ReflectionUtils.newInstance(keyClass, conf); 309 } 310 reader.close(); 311 reader = null; 312 } finally { 313 IOUtils.cleanup(LOG, reader); 314 } 315 return parts.toArray((K[])Array.newInstance(keyClass, parts.size())); 316 } 317 318 /** 319 * 320 * This object contains a TrieNodeRef if there is such a thing that 321 * can be repeated. Two adjacent trie node slots that contain no 322 * split points can be filled with the same trie node, even if they 323 * are not on the same level. See buildTreeRec, below. 324 * 325 */ 326 private class CarriedTrieNodeRef 327 { 328 TrieNode content; 329 330 CarriedTrieNodeRef() { 331 content = null; 332 } 333 } 334 335 336 /** 337 * Given a sorted set of cut points, build a trie that will find the correct 338 * partition quickly. 339 * @param splits the list of cut points 340 * @param lower the lower bound of partitions 0..numPartitions-1 341 * @param upper the upper bound of partitions 0..numPartitions-1 342 * @param prefix the prefix that we have already checked against 343 * @param maxDepth the maximum depth we will build a trie for 344 * @return the trie node that will divide the splits correctly 345 */ 346 private TrieNode buildTrie(BinaryComparable[] splits, int lower, 347 int upper, byte[] prefix, int maxDepth) { 348 return buildTrieRec 349 (splits, lower, upper, prefix, maxDepth, new CarriedTrieNodeRef()); 350 } 351 352 /** 353 * This is the core of buildTrie. The interface, and stub, above, just adds 354 * an empty CarriedTrieNodeRef. 355 * 356 * We build trie nodes in depth first order, which is also in key space 357 * order. Every leaf node is referenced as a slot in a parent internal 358 * node. If two adjacent slots [in the DFO] hold leaf nodes that have 359 * no split point, then they are not separated by a split point either, 360 * because there's no place in key space for that split point to exist. 361 * 362 * When that happens, the leaf nodes would be semantically identical, and 363 * we reuse the object. A single CarriedTrieNodeRef "ref" lives for the 364 * duration of the tree-walk. ref carries a potentially reusable, unsplit 365 * leaf node for such reuse until a leaf node with a split arises, which 366 * breaks the chain until we need to make a new unsplit leaf node. 367 * 368 * Note that this use of CarriedTrieNodeRef means that for internal nodes, 369 * for internal nodes if this code is modified in any way we still need 370 * to make or fill in the subnodes in key space order. 371 */ 372 private TrieNode buildTrieRec(BinaryComparable[] splits, int lower, 373 int upper, byte[] prefix, int maxDepth, CarriedTrieNodeRef ref) { 374 final int depth = prefix.length; 375 // We generate leaves for a single split point as well as for 376 // no split points. 377 if (depth >= maxDepth || lower >= upper - 1) { 378 // If we have two consecutive requests for an unsplit trie node, we 379 // can deliver the same one the second time. 380 if (lower == upper && ref.content != null) { 381 return ref.content; 382 } 383 TrieNode result = LeafTrieNodeFactory(depth, splits, lower, upper); 384 ref.content = lower == upper ? result : null; 385 return result; 386 } 387 InnerTrieNode result = new InnerTrieNode(depth); 388 byte[] trial = Arrays.copyOf(prefix, prefix.length + 1); 389 // append an extra byte on to the prefix 390 int currentBound = lower; 391 for(int ch = 0; ch < 0xFF; ++ch) { 392 trial[depth] = (byte) (ch + 1); 393 lower = currentBound; 394 while (currentBound < upper) { 395 if (splits[currentBound].compareTo(trial, 0, trial.length) >= 0) { 396 break; 397 } 398 currentBound += 1; 399 } 400 trial[depth] = (byte) ch; 401 result.child[0xFF & ch] 402 = buildTrieRec(splits, lower, currentBound, trial, maxDepth, ref); 403 } 404 // pick up the rest 405 trial[depth] = (byte)0xFF; 406 result.child[0xFF] 407 = buildTrieRec(splits, lower, currentBound, trial, maxDepth, ref); 408 409 return result; 410 } 411 }