001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.mapred.lib;
020
021import java.io.IOException;
022import java.util.Iterator;
023import java.util.TreeMap;
024
025import org.apache.hadoop.classification.InterfaceAudience;
026import org.apache.hadoop.classification.InterfaceStability;
027import org.apache.hadoop.fs.FileSystem;
028import org.apache.hadoop.fs.Path;
029import org.apache.hadoop.mapred.JobConf;
030import org.apache.hadoop.mapred.FileOutputFormat;
031import org.apache.hadoop.mapred.RecordWriter;
032import org.apache.hadoop.mapred.Reporter;
033import org.apache.hadoop.mapreduce.JobContext;
034import org.apache.hadoop.mapreduce.MRJobConfig;
035import org.apache.hadoop.util.Progressable;
036
037/**
038 * This abstract class extends the FileOutputFormat, allowing to write the
039 * output data to different output files. There are three basic use cases for
040 * this class.
041 * 
042 * Case one: This class is used for a map reduce job with at least one reducer.
043 * The reducer wants to write data to different files depending on the actual
044 * keys. It is assumed that a key (or value) encodes the actual key (value)
045 * and the desired location for the actual key (value).
046 * 
047 * Case two: This class is used for a map only job. The job wants to use an
048 * output file name that is either a part of the input file name of the input
049 * data, or some derivation of it.
050 * 
051 * Case three: This class is used for a map only job. The job wants to use an
052 * output file name that depends on both the keys and the input file name,
053 */
054@InterfaceAudience.Public
055@InterfaceStability.Stable
056public abstract class MultipleOutputFormat<K, V>
057extends FileOutputFormat<K, V> {
058
059  /**
060   * Create a composite record writer that can write key/value data to different
061   * output files
062   * 
063   * @param fs
064   *          the file system to use
065   * @param job
066   *          the job conf for the job
067   * @param name
068   *          the leaf file name for the output file (such as part-00000")
069   * @param arg3
070   *          a progressable for reporting progress.
071   * @return a composite record writer
072   * @throws IOException
073   */
074  public RecordWriter<K, V> getRecordWriter(FileSystem fs, JobConf job,
075      String name, Progressable arg3) throws IOException {
076
077    final FileSystem myFS = fs;
078    final String myName = generateLeafFileName(name);
079    final JobConf myJob = job;
080    final Progressable myProgressable = arg3;
081
082    return new RecordWriter<K, V>() {
083
084      // a cache storing the record writers for different output files.
085      TreeMap<String, RecordWriter<K, V>> recordWriters = new TreeMap<String, RecordWriter<K, V>>();
086
087      public void write(K key, V value) throws IOException {
088
089        // get the file name based on the key
090        String keyBasedPath = generateFileNameForKeyValue(key, value, myName);
091
092        // get the file name based on the input file name
093        String finalPath = getInputFileBasedOutputFileName(myJob, keyBasedPath);
094
095        // get the actual key
096        K actualKey = generateActualKey(key, value);
097        V actualValue = generateActualValue(key, value);
098
099        RecordWriter<K, V> rw = this.recordWriters.get(finalPath);
100        if (rw == null) {
101          // if we don't have the record writer yet for the final path, create
102          // one
103          // and add it to the cache
104          rw = getBaseRecordWriter(myFS, myJob, finalPath, myProgressable);
105          this.recordWriters.put(finalPath, rw);
106        }
107        rw.write(actualKey, actualValue);
108      };
109
110      public void close(Reporter reporter) throws IOException {
111        Iterator<String> keys = this.recordWriters.keySet().iterator();
112        while (keys.hasNext()) {
113          RecordWriter<K, V> rw = this.recordWriters.get(keys.next());
114          rw.close(reporter);
115        }
116        this.recordWriters.clear();
117      };
118    };
119  }
120
121  /**
122   * Generate the leaf name for the output file name. The default behavior does
123   * not change the leaf file name (such as part-00000)
124   * 
125   * @param name
126   *          the leaf file name for the output file
127   * @return the given leaf file name
128   */
129  protected String generateLeafFileName(String name) {
130    return name;
131  }
132
133  /**
134   * Generate the file output file name based on the given key and the leaf file
135   * name. The default behavior is that the file name does not depend on the
136   * key.
137   * 
138   * @param key
139   *          the key of the output data
140   * @param name
141   *          the leaf file name
142   * @return generated file name
143   */
144  protected String generateFileNameForKeyValue(K key, V value, String name) {
145    return name;
146  }
147
148  /**
149   * Generate the actual key from the given key/value. The default behavior is that
150   * the actual key is equal to the given key
151   * 
152   * @param key
153   *          the key of the output data
154   * @param value
155   *          the value of the output data
156   * @return the actual key derived from the given key/value
157   */
158  protected K generateActualKey(K key, V value) {
159    return key;
160  }
161  
162  /**
163   * Generate the actual value from the given key and value. The default behavior is that
164   * the actual value is equal to the given value
165   * 
166   * @param key
167   *          the key of the output data
168   * @param value
169   *          the value of the output data
170   * @return the actual value derived from the given key/value
171   */
172  protected V generateActualValue(K key, V value) {
173    return value;
174  }
175  
176
177  /**
178   * Generate the outfile name based on a given anme and the input file name. If
179   * the {@link JobContext#MAP_INPUT_FILE} does not exists (i.e. this is not for a map only job),
180   * the given name is returned unchanged. If the config value for
181   * "num.of.trailing.legs.to.use" is not set, or set 0 or negative, the given
182   * name is returned unchanged. Otherwise, return a file name consisting of the
183   * N trailing legs of the input file name where N is the config value for
184   * "num.of.trailing.legs.to.use".
185   * 
186   * @param job
187   *          the job config
188   * @param name
189   *          the output file name
190   * @return the outfile name based on a given anme and the input file name.
191   */
192  protected String getInputFileBasedOutputFileName(JobConf job, String name) {
193    String infilepath = job.get(MRJobConfig.MAP_INPUT_FILE);
194    if (infilepath == null) {
195      // if the {@link JobContext#MAP_INPUT_FILE} does not exists,
196      // then return the given name
197      return name;
198    }
199    int numOfTrailingLegsToUse = job.getInt("mapred.outputformat.numOfTrailingLegs", 0);
200    if (numOfTrailingLegsToUse <= 0) {
201      return name;
202    }
203    Path infile = new Path(infilepath);
204    Path parent = infile.getParent();
205    String midName = infile.getName();
206    Path outPath = new Path(midName);
207    for (int i = 1; i < numOfTrailingLegsToUse; i++) {
208      if (parent == null) break;
209      midName = parent.getName();
210      if (midName.length() == 0) break;
211      parent = parent.getParent();
212      outPath = new Path(midName, outPath);
213    }
214    return outPath.toString();
215  }
216
217  /**
218   * 
219   * @param fs
220   *          the file system to use
221   * @param job
222   *          a job conf object
223   * @param name
224   *          the name of the file over which a record writer object will be
225   *          constructed
226   * @param arg3
227   *          a progressable object
228   * @return A RecordWriter object over the given file
229   * @throws IOException
230   */
231  abstract protected RecordWriter<K, V> getBaseRecordWriter(FileSystem fs,
232      JobConf job, String name, Progressable arg3) throws IOException;
233}