001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.mapred.lib;
020    
021    import java.io.IOException;
022    import java.util.Iterator;
023    import java.util.TreeMap;
024    
025    import org.apache.hadoop.classification.InterfaceAudience;
026    import org.apache.hadoop.classification.InterfaceStability;
027    import org.apache.hadoop.fs.FileSystem;
028    import org.apache.hadoop.fs.Path;
029    import org.apache.hadoop.mapred.JobConf;
030    import org.apache.hadoop.mapred.FileOutputFormat;
031    import org.apache.hadoop.mapred.RecordWriter;
032    import org.apache.hadoop.mapred.Reporter;
033    import org.apache.hadoop.mapreduce.JobContext;
034    import org.apache.hadoop.mapreduce.MRJobConfig;
035    import org.apache.hadoop.util.Progressable;
036    
037    /**
038     * This abstract class extends the FileOutputFormat, allowing to write the
039     * output data to different output files. There are three basic use cases for
040     * this class.
041     * 
042     * Case one: This class is used for a map reduce job with at least one reducer.
043     * The reducer wants to write data to different files depending on the actual
044     * keys. It is assumed that a key (or value) encodes the actual key (value)
045     * and the desired location for the actual key (value).
046     * 
047     * Case two: This class is used for a map only job. The job wants to use an
048     * output file name that is either a part of the input file name of the input
049     * data, or some derivation of it.
050     * 
051     * Case three: This class is used for a map only job. The job wants to use an
052     * output file name that depends on both the keys and the input file name,
053     */
054    @InterfaceAudience.Public
055    @InterfaceStability.Stable
056    public abstract class MultipleOutputFormat<K, V>
057    extends FileOutputFormat<K, V> {
058    
059      /**
060       * Create a composite record writer that can write key/value data to different
061       * output files
062       * 
063       * @param fs
064       *          the file system to use
065       * @param job
066       *          the job conf for the job
067       * @param name
068       *          the leaf file name for the output file (such as part-00000")
069       * @param arg3
070       *          a progressable for reporting progress.
071       * @return a composite record writer
072       * @throws IOException
073       */
074      public RecordWriter<K, V> getRecordWriter(FileSystem fs, JobConf job,
075          String name, Progressable arg3) throws IOException {
076    
077        final FileSystem myFS = fs;
078        final String myName = generateLeafFileName(name);
079        final JobConf myJob = job;
080        final Progressable myProgressable = arg3;
081    
082        return new RecordWriter<K, V>() {
083    
084          // a cache storing the record writers for different output files.
085          TreeMap<String, RecordWriter<K, V>> recordWriters = new TreeMap<String, RecordWriter<K, V>>();
086    
087          public void write(K key, V value) throws IOException {
088    
089            // get the file name based on the key
090            String keyBasedPath = generateFileNameForKeyValue(key, value, myName);
091    
092            // get the file name based on the input file name
093            String finalPath = getInputFileBasedOutputFileName(myJob, keyBasedPath);
094    
095            // get the actual key
096            K actualKey = generateActualKey(key, value);
097            V actualValue = generateActualValue(key, value);
098    
099            RecordWriter<K, V> rw = this.recordWriters.get(finalPath);
100            if (rw == null) {
101              // if we don't have the record writer yet for the final path, create
102              // one
103              // and add it to the cache
104              rw = getBaseRecordWriter(myFS, myJob, finalPath, myProgressable);
105              this.recordWriters.put(finalPath, rw);
106            }
107            rw.write(actualKey, actualValue);
108          };
109    
110          public void close(Reporter reporter) throws IOException {
111            Iterator<String> keys = this.recordWriters.keySet().iterator();
112            while (keys.hasNext()) {
113              RecordWriter<K, V> rw = this.recordWriters.get(keys.next());
114              rw.close(reporter);
115            }
116            this.recordWriters.clear();
117          };
118        };
119      }
120    
121      /**
122       * Generate the leaf name for the output file name. The default behavior does
123       * not change the leaf file name (such as part-00000)
124       * 
125       * @param name
126       *          the leaf file name for the output file
127       * @return the given leaf file name
128       */
129      protected String generateLeafFileName(String name) {
130        return name;
131      }
132    
133      /**
134       * Generate the file output file name based on the given key and the leaf file
135       * name. The default behavior is that the file name does not depend on the
136       * key.
137       * 
138       * @param key
139       *          the key of the output data
140       * @param name
141       *          the leaf file name
142       * @return generated file name
143       */
144      protected String generateFileNameForKeyValue(K key, V value, String name) {
145        return name;
146      }
147    
148      /**
149       * Generate the actual key from the given key/value. The default behavior is that
150       * the actual key is equal to the given key
151       * 
152       * @param key
153       *          the key of the output data
154       * @param value
155       *          the value of the output data
156       * @return the actual key derived from the given key/value
157       */
158      protected K generateActualKey(K key, V value) {
159        return key;
160      }
161      
162      /**
163       * Generate the actual value from the given key and value. The default behavior is that
164       * the actual value is equal to the given value
165       * 
166       * @param key
167       *          the key of the output data
168       * @param value
169       *          the value of the output data
170       * @return the actual value derived from the given key/value
171       */
172      protected V generateActualValue(K key, V value) {
173        return value;
174      }
175      
176    
177      /**
178       * Generate the outfile name based on a given anme and the input file name. If
179       * the {@link JobContext#MAP_INPUT_FILE} does not exists (i.e. this is not for a map only job),
180       * the given name is returned unchanged. If the config value for
181       * "num.of.trailing.legs.to.use" is not set, or set 0 or negative, the given
182       * name is returned unchanged. Otherwise, return a file name consisting of the
183       * N trailing legs of the input file name where N is the config value for
184       * "num.of.trailing.legs.to.use".
185       * 
186       * @param job
187       *          the job config
188       * @param name
189       *          the output file name
190       * @return the outfile name based on a given anme and the input file name.
191       */
192      protected String getInputFileBasedOutputFileName(JobConf job, String name) {
193        String infilepath = job.get(MRJobConfig.MAP_INPUT_FILE);
194        if (infilepath == null) {
195          // if the {@link JobContext#MAP_INPUT_FILE} does not exists,
196          // then return the given name
197          return name;
198        }
199        int numOfTrailingLegsToUse = job.getInt("mapred.outputformat.numOfTrailingLegs", 0);
200        if (numOfTrailingLegsToUse <= 0) {
201          return name;
202        }
203        Path infile = new Path(infilepath);
204        Path parent = infile.getParent();
205        String midName = infile.getName();
206        Path outPath = new Path(midName);
207        for (int i = 1; i < numOfTrailingLegsToUse; i++) {
208          if (parent == null) break;
209          midName = parent.getName();
210          if (midName.length() == 0) break;
211          parent = parent.getParent();
212          outPath = new Path(midName, outPath);
213        }
214        return outPath.toString();
215      }
216    
217      /**
218       * 
219       * @param fs
220       *          the file system to use
221       * @param job
222       *          a job conf object
223       * @param name
224       *          the name of the file over which a record writer object will be
225       *          constructed
226       * @param arg3
227       *          a progressable object
228       * @return A RecordWriter object over the given file
229       * @throws IOException
230       */
231      abstract protected RecordWriter<K, V> getBaseRecordWriter(FileSystem fs,
232          JobConf job, String name, Progressable arg3) throws IOException;
233    }