001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.mapreduce.lib.partition;
020    
021    import java.io.IOException;
022    import java.lang.reflect.Array;
023    import java.util.ArrayList;
024    import java.util.Arrays;
025    
026    import org.apache.commons.logging.Log;
027    import org.apache.commons.logging.LogFactory;
028    import org.apache.hadoop.classification.InterfaceAudience;
029    import org.apache.hadoop.classification.InterfaceStability;
030    import org.apache.hadoop.conf.Configurable;
031    import org.apache.hadoop.conf.Configuration;
032    import org.apache.hadoop.fs.FileSystem;
033    import org.apache.hadoop.fs.Path;
034    import org.apache.hadoop.io.BinaryComparable;
035    import org.apache.hadoop.io.IOUtils;
036    import org.apache.hadoop.io.NullWritable;
037    import org.apache.hadoop.io.SequenceFile;
038    import org.apache.hadoop.io.RawComparator;
039    import org.apache.hadoop.io.WritableComparable;
040    import org.apache.hadoop.mapreduce.Job;
041    import org.apache.hadoop.mapreduce.Partitioner;
042    import org.apache.hadoop.util.ReflectionUtils;
043    
044    /**
045     * Partitioner effecting a total order by reading split points from
046     * an externally generated source.
047     */
048    @InterfaceAudience.Public
049    @InterfaceStability.Stable
050    public class TotalOrderPartitioner<K extends WritableComparable<?>,V>
051        extends Partitioner<K,V> implements Configurable {
052    
053      private Node partitions;
054      public static final String DEFAULT_PATH = "_partition.lst";
055      public static final String PARTITIONER_PATH = 
056        "mapreduce.totalorderpartitioner.path";
057      public static final String MAX_TRIE_DEPTH = 
058        "mapreduce.totalorderpartitioner.trie.maxdepth"; 
059      public static final String NATURAL_ORDER = 
060        "mapreduce.totalorderpartitioner.naturalorder";
061      Configuration conf;
062      private static final Log LOG = LogFactory.getLog(TotalOrderPartitioner.class);
063    
064      public TotalOrderPartitioner() { }
065    
066      /**
067       * Read in the partition file and build indexing data structures.
068       * If the keytype is {@link org.apache.hadoop.io.BinaryComparable} and
069       * <tt>total.order.partitioner.natural.order</tt> is not false, a trie
070       * of the first <tt>total.order.partitioner.max.trie.depth</tt>(2) + 1 bytes
071       * will be built. Otherwise, keys will be located using a binary search of
072       * the partition keyset using the {@link org.apache.hadoop.io.RawComparator}
073       * defined for this job. The input file must be sorted with the same
074       * comparator and contain {@link Job#getNumReduceTasks()} - 1 keys.
075       */
076      @SuppressWarnings("unchecked") // keytype from conf not static
077      public void setConf(Configuration conf) {
078        try {
079          this.conf = conf;
080          String parts = getPartitionFile(conf);
081          final Path partFile = new Path(parts);
082          final FileSystem fs = (DEFAULT_PATH.equals(parts))
083            ? FileSystem.getLocal(conf)     // assume in DistributedCache
084            : partFile.getFileSystem(conf);
085    
086          Job job = new Job(conf);
087          Class<K> keyClass = (Class<K>)job.getMapOutputKeyClass();
088          K[] splitPoints = readPartitions(fs, partFile, keyClass, conf);
089          if (splitPoints.length != job.getNumReduceTasks() - 1) {
090            throw new IOException("Wrong number of partitions in keyset");
091          }
092          RawComparator<K> comparator =
093            (RawComparator<K>) job.getSortComparator();
094          for (int i = 0; i < splitPoints.length - 1; ++i) {
095            if (comparator.compare(splitPoints[i], splitPoints[i+1]) >= 0) {
096              throw new IOException("Split points are out of order");
097            }
098          }
099          boolean natOrder =
100            conf.getBoolean(NATURAL_ORDER, true);
101          if (natOrder && BinaryComparable.class.isAssignableFrom(keyClass)) {
102            partitions = buildTrie((BinaryComparable[])splitPoints, 0,
103                splitPoints.length, new byte[0],
104                // Now that blocks of identical splitless trie nodes are 
105                // represented reentrantly, and we develop a leaf for any trie
106                // node with only one split point, the only reason for a depth
107                // limit is to refute stack overflow or bloat in the pathological
108                // case where the split points are long and mostly look like bytes 
109                // iii...iixii...iii   .  Therefore, we make the default depth
110                // limit large but not huge.
111                conf.getInt(MAX_TRIE_DEPTH, 200));
112          } else {
113            partitions = new BinarySearchNode(splitPoints, comparator);
114          }
115        } catch (IOException e) {
116          throw new IllegalArgumentException("Can't read partitions file", e);
117        }
118      }
119    
120      public Configuration getConf() {
121        return conf;
122      }
123      
124      // by construction, we know if our keytype
125      @SuppressWarnings("unchecked") // is memcmp-able and uses the trie
126      public int getPartition(K key, V value, int numPartitions) {
127        return partitions.findPartition(key);
128      }
129    
130      /**
131       * Set the path to the SequenceFile storing the sorted partition keyset.
132       * It must be the case that for <tt>R</tt> reduces, there are <tt>R-1</tt>
133       * keys in the SequenceFile.
134       */
135      public static void setPartitionFile(Configuration conf, Path p) {
136        conf.set(PARTITIONER_PATH, p.toString());
137      }
138    
139      /**
140       * Get the path to the SequenceFile storing the sorted partition keyset.
141       * @see #setPartitionFile(Configuration, Path)
142       */
143      public static String getPartitionFile(Configuration conf) {
144        return conf.get(PARTITIONER_PATH, DEFAULT_PATH);
145      }
146    
147      /**
148       * Interface to the partitioner to locate a key in the partition keyset.
149       */
150      interface Node<T> {
151        /**
152         * Locate partition in keyset K, st [Ki..Ki+1) defines a partition,
153         * with implicit K0 = -inf, Kn = +inf, and |K| = #partitions - 1.
154         */
155        int findPartition(T key);
156      }
157    
158      /**
159       * Base class for trie nodes. If the keytype is memcomp-able, this builds
160       * tries of the first <tt>total.order.partitioner.max.trie.depth</tt>
161       * bytes.
162       */
163      static abstract class TrieNode implements Node<BinaryComparable> {
164        private final int level;
165        TrieNode(int level) {
166          this.level = level;
167        }
168        int getLevel() {
169          return level;
170        }
171      }
172    
173      /**
174       * For types that are not {@link org.apache.hadoop.io.BinaryComparable} or
175       * where disabled by <tt>total.order.partitioner.natural.order</tt>,
176       * search the partition keyset with a binary search.
177       */
178      class BinarySearchNode implements Node<K> {
179        private final K[] splitPoints;
180        private final RawComparator<K> comparator;
181        BinarySearchNode(K[] splitPoints, RawComparator<K> comparator) {
182          this.splitPoints = splitPoints;
183          this.comparator = comparator;
184        }
185        public int findPartition(K key) {
186          final int pos = Arrays.binarySearch(splitPoints, key, comparator) + 1;
187          return (pos < 0) ? -pos : pos;
188        }
189      }
190    
191      /**
192       * An inner trie node that contains 256 children based on the next
193       * character.
194       */
195      class InnerTrieNode extends TrieNode {
196        private TrieNode[] child = new TrieNode[256];
197    
198        InnerTrieNode(int level) {
199          super(level);
200        }
201        public int findPartition(BinaryComparable key) {
202          int level = getLevel();
203          if (key.getLength() <= level) {
204            return child[0].findPartition(key);
205          }
206          return child[0xFF & key.getBytes()[level]].findPartition(key);
207        }
208      }
209      
210      /**
211       * @param level        the tree depth at this node
212       * @param splitPoints  the full split point vector, which holds
213       *                     the split point or points this leaf node
214       *                     should contain
215       * @param lower        first INcluded element of splitPoints
216       * @param upper        first EXcluded element of splitPoints
217       * @return  a leaf node.  They come in three kinds: no split points 
218       *          [and the findParttion returns a canned index], one split
219       *          point [and we compare with a single comparand], or more
220       *          than one [and we do a binary search].  The last case is
221       *          rare.
222       */
223      private TrieNode LeafTrieNodeFactory
224                 (int level, BinaryComparable[] splitPoints, int lower, int upper) {
225          switch (upper - lower) {
226          case 0:
227              return new UnsplitTrieNode(level, lower);
228              
229          case 1:
230              return new SinglySplitTrieNode(level, splitPoints, lower);
231              
232          default:
233              return new LeafTrieNode(level, splitPoints, lower, upper);
234          }
235      }
236    
237      /**
238       * A leaf trie node that scans for the key between lower..upper.
239       * 
240       * We don't generate many of these now, since we usually continue trie-ing 
241       * when more than one split point remains at this level. and we make different
242       * objects for nodes with 0 or 1 split point.
243       */
244      private class LeafTrieNode extends TrieNode {
245        final int lower;
246        final int upper;
247        final BinaryComparable[] splitPoints;
248        LeafTrieNode(int level, BinaryComparable[] splitPoints, int lower, int upper) {
249          super(level);
250          this.lower = lower;
251          this.upper = upper;
252          this.splitPoints = splitPoints;
253        }
254        public int findPartition(BinaryComparable key) {
255          final int pos = Arrays.binarySearch(splitPoints, lower, upper, key) + 1;
256          return (pos < 0) ? -pos : pos;
257        }
258      }
259      
260      private class UnsplitTrieNode extends TrieNode {
261          final int result;
262          
263          UnsplitTrieNode(int level, int value) {
264              super(level);
265              this.result = value;
266          }
267          
268          public int findPartition(BinaryComparable key) {
269              return result;
270          }
271      }
272      
273      private class SinglySplitTrieNode extends TrieNode {
274          final int               lower;
275          final BinaryComparable  mySplitPoint;
276          
277          SinglySplitTrieNode(int level, BinaryComparable[] splitPoints, int lower) {
278              super(level);
279              this.lower = lower;
280              this.mySplitPoint = splitPoints[lower];
281          }
282          
283          public int findPartition(BinaryComparable key) {
284              return lower + (key.compareTo(mySplitPoint) < 0 ? 0 : 1);
285          }
286      }
287    
288    
289      /**
290       * Read the cut points from the given IFile.
291       * @param fs The file system
292       * @param p The path to read
293       * @param keyClass The map output key class
294       * @param job The job config
295       * @throws IOException
296       */
297                                     // matching key types enforced by passing in
298      @SuppressWarnings("unchecked") // map output key class
299      private K[] readPartitions(FileSystem fs, Path p, Class<K> keyClass,
300          Configuration conf) throws IOException {
301        SequenceFile.Reader reader = new SequenceFile.Reader(fs, p, conf);
302        ArrayList<K> parts = new ArrayList<K>();
303        K key = ReflectionUtils.newInstance(keyClass, conf);
304        NullWritable value = NullWritable.get();
305        try {
306          while (reader.next(key, value)) {
307            parts.add(key);
308            key = ReflectionUtils.newInstance(keyClass, conf);
309          }
310          reader.close();
311          reader = null;
312        } finally {
313          IOUtils.cleanup(LOG, reader);
314        }
315        return parts.toArray((K[])Array.newInstance(keyClass, parts.size()));
316      }
317      
318      /**
319       * 
320       * This object contains a TrieNodeRef if there is such a thing that
321       * can be repeated.  Two adjacent trie node slots that contain no 
322       * split points can be filled with the same trie node, even if they
323       * are not on the same level.  See buildTreeRec, below.
324       *
325       */  
326      private class CarriedTrieNodeRef
327      {
328          TrieNode   content;
329          
330          CarriedTrieNodeRef() {
331              content = null;
332          }
333      }
334    
335      
336      /**
337       * Given a sorted set of cut points, build a trie that will find the correct
338       * partition quickly.
339       * @param splits the list of cut points
340       * @param lower the lower bound of partitions 0..numPartitions-1
341       * @param upper the upper bound of partitions 0..numPartitions-1
342       * @param prefix the prefix that we have already checked against
343       * @param maxDepth the maximum depth we will build a trie for
344       * @return the trie node that will divide the splits correctly
345       */
346      private TrieNode buildTrie(BinaryComparable[] splits, int lower,
347              int upper, byte[] prefix, int maxDepth) {
348          return buildTrieRec
349                   (splits, lower, upper, prefix, maxDepth, new CarriedTrieNodeRef());
350      }
351      
352      /**
353       * This is the core of buildTrie.  The interface, and stub, above, just adds
354       * an empty CarriedTrieNodeRef.  
355       * 
356       * We build trie nodes in depth first order, which is also in key space
357       * order.  Every leaf node is referenced as a slot in a parent internal
358       * node.  If two adjacent slots [in the DFO] hold leaf nodes that have
359       * no split point, then they are not separated by a split point either, 
360       * because there's no place in key space for that split point to exist.
361       * 
362       * When that happens, the leaf nodes would be semantically identical, and
363       * we reuse the object.  A single CarriedTrieNodeRef "ref" lives for the 
364       * duration of the tree-walk.  ref carries a potentially reusable, unsplit
365       * leaf node for such reuse until a leaf node with a split arises, which 
366       * breaks the chain until we need to make a new unsplit leaf node.
367       * 
368       * Note that this use of CarriedTrieNodeRef means that for internal nodes, 
369       * for internal nodes if this code is modified in any way we still need 
370       * to make or fill in the subnodes in key space order.
371       */
372      private TrieNode buildTrieRec(BinaryComparable[] splits, int lower,
373          int upper, byte[] prefix, int maxDepth, CarriedTrieNodeRef ref) {
374        final int depth = prefix.length;
375        // We generate leaves for a single split point as well as for 
376        // no split points.
377        if (depth >= maxDepth || lower >= upper - 1) {
378            // If we have two consecutive requests for an unsplit trie node, we
379            // can deliver the same one the second time.
380            if (lower == upper && ref.content != null) {
381                return ref.content;
382            }
383            TrieNode  result = LeafTrieNodeFactory(depth, splits, lower, upper);
384            ref.content = lower == upper ? result : null;
385            return result;
386        }
387        InnerTrieNode result = new InnerTrieNode(depth);
388        byte[] trial = Arrays.copyOf(prefix, prefix.length + 1);
389        // append an extra byte on to the prefix
390        int         currentBound = lower;
391        for(int ch = 0; ch < 0xFF; ++ch) {
392          trial[depth] = (byte) (ch + 1);
393          lower = currentBound;
394          while (currentBound < upper) {
395            if (splits[currentBound].compareTo(trial, 0, trial.length) >= 0) {
396              break;
397            }
398            currentBound += 1;
399          }
400          trial[depth] = (byte) ch;
401          result.child[0xFF & ch]
402                       = buildTrieRec(splits, lower, currentBound, trial, maxDepth, ref);
403        }
404        // pick up the rest
405        trial[depth] = (byte)0xFF;
406        result.child[0xFF] 
407                     = buildTrieRec(splits, lower, currentBound, trial, maxDepth, ref);
408        
409        return result;
410      }
411    }