001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.mapreduce.lib.partition;
020
021 import java.io.IOException;
022 import java.lang.reflect.Array;
023 import java.util.ArrayList;
024 import java.util.Arrays;
025
026 import org.apache.commons.logging.Log;
027 import org.apache.commons.logging.LogFactory;
028 import org.apache.hadoop.classification.InterfaceAudience;
029 import org.apache.hadoop.classification.InterfaceStability;
030 import org.apache.hadoop.conf.Configurable;
031 import org.apache.hadoop.conf.Configuration;
032 import org.apache.hadoop.fs.FileSystem;
033 import org.apache.hadoop.fs.Path;
034 import org.apache.hadoop.io.BinaryComparable;
035 import org.apache.hadoop.io.IOUtils;
036 import org.apache.hadoop.io.NullWritable;
037 import org.apache.hadoop.io.SequenceFile;
038 import org.apache.hadoop.io.RawComparator;
039 import org.apache.hadoop.io.WritableComparable;
040 import org.apache.hadoop.mapreduce.Job;
041 import org.apache.hadoop.mapreduce.Partitioner;
042 import org.apache.hadoop.util.ReflectionUtils;
043
044 /**
045 * Partitioner effecting a total order by reading split points from
046 * an externally generated source.
047 */
048 @InterfaceAudience.Public
049 @InterfaceStability.Stable
050 public class TotalOrderPartitioner<K extends WritableComparable<?>,V>
051 extends Partitioner<K,V> implements Configurable {
052
053 private Node partitions;
054 public static final String DEFAULT_PATH = "_partition.lst";
055 public static final String PARTITIONER_PATH =
056 "mapreduce.totalorderpartitioner.path";
057 public static final String MAX_TRIE_DEPTH =
058 "mapreduce.totalorderpartitioner.trie.maxdepth";
059 public static final String NATURAL_ORDER =
060 "mapreduce.totalorderpartitioner.naturalorder";
061 Configuration conf;
062 private static final Log LOG = LogFactory.getLog(TotalOrderPartitioner.class);
063
064 public TotalOrderPartitioner() { }
065
066 /**
067 * Read in the partition file and build indexing data structures.
068 * If the keytype is {@link org.apache.hadoop.io.BinaryComparable} and
069 * <tt>total.order.partitioner.natural.order</tt> is not false, a trie
070 * of the first <tt>total.order.partitioner.max.trie.depth</tt>(2) + 1 bytes
071 * will be built. Otherwise, keys will be located using a binary search of
072 * the partition keyset using the {@link org.apache.hadoop.io.RawComparator}
073 * defined for this job. The input file must be sorted with the same
074 * comparator and contain {@link Job#getNumReduceTasks()} - 1 keys.
075 */
076 @SuppressWarnings("unchecked") // keytype from conf not static
077 public void setConf(Configuration conf) {
078 try {
079 this.conf = conf;
080 String parts = getPartitionFile(conf);
081 final Path partFile = new Path(parts);
082 final FileSystem fs = (DEFAULT_PATH.equals(parts))
083 ? FileSystem.getLocal(conf) // assume in DistributedCache
084 : partFile.getFileSystem(conf);
085
086 Job job = new Job(conf);
087 Class<K> keyClass = (Class<K>)job.getMapOutputKeyClass();
088 K[] splitPoints = readPartitions(fs, partFile, keyClass, conf);
089 if (splitPoints.length != job.getNumReduceTasks() - 1) {
090 throw new IOException("Wrong number of partitions in keyset");
091 }
092 RawComparator<K> comparator =
093 (RawComparator<K>) job.getSortComparator();
094 for (int i = 0; i < splitPoints.length - 1; ++i) {
095 if (comparator.compare(splitPoints[i], splitPoints[i+1]) >= 0) {
096 throw new IOException("Split points are out of order");
097 }
098 }
099 boolean natOrder =
100 conf.getBoolean(NATURAL_ORDER, true);
101 if (natOrder && BinaryComparable.class.isAssignableFrom(keyClass)) {
102 partitions = buildTrie((BinaryComparable[])splitPoints, 0,
103 splitPoints.length, new byte[0],
104 // Now that blocks of identical splitless trie nodes are
105 // represented reentrantly, and we develop a leaf for any trie
106 // node with only one split point, the only reason for a depth
107 // limit is to refute stack overflow or bloat in the pathological
108 // case where the split points are long and mostly look like bytes
109 // iii...iixii...iii . Therefore, we make the default depth
110 // limit large but not huge.
111 conf.getInt(MAX_TRIE_DEPTH, 200));
112 } else {
113 partitions = new BinarySearchNode(splitPoints, comparator);
114 }
115 } catch (IOException e) {
116 throw new IllegalArgumentException("Can't read partitions file", e);
117 }
118 }
119
120 public Configuration getConf() {
121 return conf;
122 }
123
124 // by construction, we know if our keytype
125 @SuppressWarnings("unchecked") // is memcmp-able and uses the trie
126 public int getPartition(K key, V value, int numPartitions) {
127 return partitions.findPartition(key);
128 }
129
130 /**
131 * Set the path to the SequenceFile storing the sorted partition keyset.
132 * It must be the case that for <tt>R</tt> reduces, there are <tt>R-1</tt>
133 * keys in the SequenceFile.
134 */
135 public static void setPartitionFile(Configuration conf, Path p) {
136 conf.set(PARTITIONER_PATH, p.toString());
137 }
138
139 /**
140 * Get the path to the SequenceFile storing the sorted partition keyset.
141 * @see #setPartitionFile(Configuration, Path)
142 */
143 public static String getPartitionFile(Configuration conf) {
144 return conf.get(PARTITIONER_PATH, DEFAULT_PATH);
145 }
146
147 /**
148 * Interface to the partitioner to locate a key in the partition keyset.
149 */
150 interface Node<T> {
151 /**
152 * Locate partition in keyset K, st [Ki..Ki+1) defines a partition,
153 * with implicit K0 = -inf, Kn = +inf, and |K| = #partitions - 1.
154 */
155 int findPartition(T key);
156 }
157
158 /**
159 * Base class for trie nodes. If the keytype is memcomp-able, this builds
160 * tries of the first <tt>total.order.partitioner.max.trie.depth</tt>
161 * bytes.
162 */
163 static abstract class TrieNode implements Node<BinaryComparable> {
164 private final int level;
165 TrieNode(int level) {
166 this.level = level;
167 }
168 int getLevel() {
169 return level;
170 }
171 }
172
173 /**
174 * For types that are not {@link org.apache.hadoop.io.BinaryComparable} or
175 * where disabled by <tt>total.order.partitioner.natural.order</tt>,
176 * search the partition keyset with a binary search.
177 */
178 class BinarySearchNode implements Node<K> {
179 private final K[] splitPoints;
180 private final RawComparator<K> comparator;
181 BinarySearchNode(K[] splitPoints, RawComparator<K> comparator) {
182 this.splitPoints = splitPoints;
183 this.comparator = comparator;
184 }
185 public int findPartition(K key) {
186 final int pos = Arrays.binarySearch(splitPoints, key, comparator) + 1;
187 return (pos < 0) ? -pos : pos;
188 }
189 }
190
191 /**
192 * An inner trie node that contains 256 children based on the next
193 * character.
194 */
195 class InnerTrieNode extends TrieNode {
196 private TrieNode[] child = new TrieNode[256];
197
198 InnerTrieNode(int level) {
199 super(level);
200 }
201 public int findPartition(BinaryComparable key) {
202 int level = getLevel();
203 if (key.getLength() <= level) {
204 return child[0].findPartition(key);
205 }
206 return child[0xFF & key.getBytes()[level]].findPartition(key);
207 }
208 }
209
210 /**
211 * @param level the tree depth at this node
212 * @param splitPoints the full split point vector, which holds
213 * the split point or points this leaf node
214 * should contain
215 * @param lower first INcluded element of splitPoints
216 * @param upper first EXcluded element of splitPoints
217 * @return a leaf node. They come in three kinds: no split points
218 * [and the findParttion returns a canned index], one split
219 * point [and we compare with a single comparand], or more
220 * than one [and we do a binary search]. The last case is
221 * rare.
222 */
223 private TrieNode LeafTrieNodeFactory
224 (int level, BinaryComparable[] splitPoints, int lower, int upper) {
225 switch (upper - lower) {
226 case 0:
227 return new UnsplitTrieNode(level, lower);
228
229 case 1:
230 return new SinglySplitTrieNode(level, splitPoints, lower);
231
232 default:
233 return new LeafTrieNode(level, splitPoints, lower, upper);
234 }
235 }
236
237 /**
238 * A leaf trie node that scans for the key between lower..upper.
239 *
240 * We don't generate many of these now, since we usually continue trie-ing
241 * when more than one split point remains at this level. and we make different
242 * objects for nodes with 0 or 1 split point.
243 */
244 private class LeafTrieNode extends TrieNode {
245 final int lower;
246 final int upper;
247 final BinaryComparable[] splitPoints;
248 LeafTrieNode(int level, BinaryComparable[] splitPoints, int lower, int upper) {
249 super(level);
250 this.lower = lower;
251 this.upper = upper;
252 this.splitPoints = splitPoints;
253 }
254 public int findPartition(BinaryComparable key) {
255 final int pos = Arrays.binarySearch(splitPoints, lower, upper, key) + 1;
256 return (pos < 0) ? -pos : pos;
257 }
258 }
259
260 private class UnsplitTrieNode extends TrieNode {
261 final int result;
262
263 UnsplitTrieNode(int level, int value) {
264 super(level);
265 this.result = value;
266 }
267
268 public int findPartition(BinaryComparable key) {
269 return result;
270 }
271 }
272
273 private class SinglySplitTrieNode extends TrieNode {
274 final int lower;
275 final BinaryComparable mySplitPoint;
276
277 SinglySplitTrieNode(int level, BinaryComparable[] splitPoints, int lower) {
278 super(level);
279 this.lower = lower;
280 this.mySplitPoint = splitPoints[lower];
281 }
282
283 public int findPartition(BinaryComparable key) {
284 return lower + (key.compareTo(mySplitPoint) < 0 ? 0 : 1);
285 }
286 }
287
288
289 /**
290 * Read the cut points from the given IFile.
291 * @param fs The file system
292 * @param p The path to read
293 * @param keyClass The map output key class
294 * @param job The job config
295 * @throws IOException
296 */
297 // matching key types enforced by passing in
298 @SuppressWarnings("unchecked") // map output key class
299 private K[] readPartitions(FileSystem fs, Path p, Class<K> keyClass,
300 Configuration conf) throws IOException {
301 SequenceFile.Reader reader = new SequenceFile.Reader(fs, p, conf);
302 ArrayList<K> parts = new ArrayList<K>();
303 K key = ReflectionUtils.newInstance(keyClass, conf);
304 NullWritable value = NullWritable.get();
305 try {
306 while (reader.next(key, value)) {
307 parts.add(key);
308 key = ReflectionUtils.newInstance(keyClass, conf);
309 }
310 reader.close();
311 reader = null;
312 } finally {
313 IOUtils.cleanup(LOG, reader);
314 }
315 return parts.toArray((K[])Array.newInstance(keyClass, parts.size()));
316 }
317
318 /**
319 *
320 * This object contains a TrieNodeRef if there is such a thing that
321 * can be repeated. Two adjacent trie node slots that contain no
322 * split points can be filled with the same trie node, even if they
323 * are not on the same level. See buildTreeRec, below.
324 *
325 */
326 private class CarriedTrieNodeRef
327 {
328 TrieNode content;
329
330 CarriedTrieNodeRef() {
331 content = null;
332 }
333 }
334
335
336 /**
337 * Given a sorted set of cut points, build a trie that will find the correct
338 * partition quickly.
339 * @param splits the list of cut points
340 * @param lower the lower bound of partitions 0..numPartitions-1
341 * @param upper the upper bound of partitions 0..numPartitions-1
342 * @param prefix the prefix that we have already checked against
343 * @param maxDepth the maximum depth we will build a trie for
344 * @return the trie node that will divide the splits correctly
345 */
346 private TrieNode buildTrie(BinaryComparable[] splits, int lower,
347 int upper, byte[] prefix, int maxDepth) {
348 return buildTrieRec
349 (splits, lower, upper, prefix, maxDepth, new CarriedTrieNodeRef());
350 }
351
352 /**
353 * This is the core of buildTrie. The interface, and stub, above, just adds
354 * an empty CarriedTrieNodeRef.
355 *
356 * We build trie nodes in depth first order, which is also in key space
357 * order. Every leaf node is referenced as a slot in a parent internal
358 * node. If two adjacent slots [in the DFO] hold leaf nodes that have
359 * no split point, then they are not separated by a split point either,
360 * because there's no place in key space for that split point to exist.
361 *
362 * When that happens, the leaf nodes would be semantically identical, and
363 * we reuse the object. A single CarriedTrieNodeRef "ref" lives for the
364 * duration of the tree-walk. ref carries a potentially reusable, unsplit
365 * leaf node for such reuse until a leaf node with a split arises, which
366 * breaks the chain until we need to make a new unsplit leaf node.
367 *
368 * Note that this use of CarriedTrieNodeRef means that for internal nodes,
369 * for internal nodes if this code is modified in any way we still need
370 * to make or fill in the subnodes in key space order.
371 */
372 private TrieNode buildTrieRec(BinaryComparable[] splits, int lower,
373 int upper, byte[] prefix, int maxDepth, CarriedTrieNodeRef ref) {
374 final int depth = prefix.length;
375 // We generate leaves for a single split point as well as for
376 // no split points.
377 if (depth >= maxDepth || lower >= upper - 1) {
378 // If we have two consecutive requests for an unsplit trie node, we
379 // can deliver the same one the second time.
380 if (lower == upper && ref.content != null) {
381 return ref.content;
382 }
383 TrieNode result = LeafTrieNodeFactory(depth, splits, lower, upper);
384 ref.content = lower == upper ? result : null;
385 return result;
386 }
387 InnerTrieNode result = new InnerTrieNode(depth);
388 byte[] trial = Arrays.copyOf(prefix, prefix.length + 1);
389 // append an extra byte on to the prefix
390 int currentBound = lower;
391 for(int ch = 0; ch < 0xFF; ++ch) {
392 trial[depth] = (byte) (ch + 1);
393 lower = currentBound;
394 while (currentBound < upper) {
395 if (splits[currentBound].compareTo(trial, 0, trial.length) >= 0) {
396 break;
397 }
398 currentBound += 1;
399 }
400 trial[depth] = (byte) ch;
401 result.child[0xFF & ch]
402 = buildTrieRec(splits, lower, currentBound, trial, maxDepth, ref);
403 }
404 // pick up the rest
405 trial[depth] = (byte)0xFF;
406 result.child[0xFF]
407 = buildTrieRec(splits, lower, currentBound, trial, maxDepth, ref);
408
409 return result;
410 }
411 }