001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019 package org.apache.hadoop.mapred.lib; 020 021 import java.io.IOException; 022 import java.util.Iterator; 023 import java.util.TreeMap; 024 025 import org.apache.hadoop.classification.InterfaceAudience; 026 import org.apache.hadoop.classification.InterfaceStability; 027 import org.apache.hadoop.fs.FileSystem; 028 import org.apache.hadoop.fs.Path; 029 import org.apache.hadoop.mapred.JobConf; 030 import org.apache.hadoop.mapred.FileOutputFormat; 031 import org.apache.hadoop.mapred.RecordWriter; 032 import org.apache.hadoop.mapred.Reporter; 033 import org.apache.hadoop.mapreduce.JobContext; 034 import org.apache.hadoop.mapreduce.MRJobConfig; 035 import org.apache.hadoop.util.Progressable; 036 037 /** 038 * This abstract class extends the FileOutputFormat, allowing to write the 039 * output data to different output files. There are three basic use cases for 040 * this class. 041 * 042 * Case one: This class is used for a map reduce job with at least one reducer. 043 * The reducer wants to write data to different files depending on the actual 044 * keys. It is assumed that a key (or value) encodes the actual key (value) 045 * and the desired location for the actual key (value). 046 * 047 * Case two: This class is used for a map only job. The job wants to use an 048 * output file name that is either a part of the input file name of the input 049 * data, or some derivation of it. 050 * 051 * Case three: This class is used for a map only job. The job wants to use an 052 * output file name that depends on both the keys and the input file name, 053 */ 054 @InterfaceAudience.Public 055 @InterfaceStability.Stable 056 public abstract class MultipleOutputFormat<K, V> 057 extends FileOutputFormat<K, V> { 058 059 /** 060 * Create a composite record writer that can write key/value data to different 061 * output files 062 * 063 * @param fs 064 * the file system to use 065 * @param job 066 * the job conf for the job 067 * @param name 068 * the leaf file name for the output file (such as part-00000") 069 * @param arg3 070 * a progressable for reporting progress. 071 * @return a composite record writer 072 * @throws IOException 073 */ 074 public RecordWriter<K, V> getRecordWriter(FileSystem fs, JobConf job, 075 String name, Progressable arg3) throws IOException { 076 077 final FileSystem myFS = fs; 078 final String myName = generateLeafFileName(name); 079 final JobConf myJob = job; 080 final Progressable myProgressable = arg3; 081 082 return new RecordWriter<K, V>() { 083 084 // a cache storing the record writers for different output files. 085 TreeMap<String, RecordWriter<K, V>> recordWriters = new TreeMap<String, RecordWriter<K, V>>(); 086 087 public void write(K key, V value) throws IOException { 088 089 // get the file name based on the key 090 String keyBasedPath = generateFileNameForKeyValue(key, value, myName); 091 092 // get the file name based on the input file name 093 String finalPath = getInputFileBasedOutputFileName(myJob, keyBasedPath); 094 095 // get the actual key 096 K actualKey = generateActualKey(key, value); 097 V actualValue = generateActualValue(key, value); 098 099 RecordWriter<K, V> rw = this.recordWriters.get(finalPath); 100 if (rw == null) { 101 // if we don't have the record writer yet for the final path, create 102 // one 103 // and add it to the cache 104 rw = getBaseRecordWriter(myFS, myJob, finalPath, myProgressable); 105 this.recordWriters.put(finalPath, rw); 106 } 107 rw.write(actualKey, actualValue); 108 }; 109 110 public void close(Reporter reporter) throws IOException { 111 Iterator<String> keys = this.recordWriters.keySet().iterator(); 112 while (keys.hasNext()) { 113 RecordWriter<K, V> rw = this.recordWriters.get(keys.next()); 114 rw.close(reporter); 115 } 116 this.recordWriters.clear(); 117 }; 118 }; 119 } 120 121 /** 122 * Generate the leaf name for the output file name. The default behavior does 123 * not change the leaf file name (such as part-00000) 124 * 125 * @param name 126 * the leaf file name for the output file 127 * @return the given leaf file name 128 */ 129 protected String generateLeafFileName(String name) { 130 return name; 131 } 132 133 /** 134 * Generate the file output file name based on the given key and the leaf file 135 * name. The default behavior is that the file name does not depend on the 136 * key. 137 * 138 * @param key 139 * the key of the output data 140 * @param name 141 * the leaf file name 142 * @return generated file name 143 */ 144 protected String generateFileNameForKeyValue(K key, V value, String name) { 145 return name; 146 } 147 148 /** 149 * Generate the actual key from the given key/value. The default behavior is that 150 * the actual key is equal to the given key 151 * 152 * @param key 153 * the key of the output data 154 * @param value 155 * the value of the output data 156 * @return the actual key derived from the given key/value 157 */ 158 protected K generateActualKey(K key, V value) { 159 return key; 160 } 161 162 /** 163 * Generate the actual value from the given key and value. The default behavior is that 164 * the actual value is equal to the given value 165 * 166 * @param key 167 * the key of the output data 168 * @param value 169 * the value of the output data 170 * @return the actual value derived from the given key/value 171 */ 172 protected V generateActualValue(K key, V value) { 173 return value; 174 } 175 176 177 /** 178 * Generate the outfile name based on a given anme and the input file name. If 179 * the {@link JobContext#MAP_INPUT_FILE} does not exists (i.e. this is not for a map only job), 180 * the given name is returned unchanged. If the config value for 181 * "num.of.trailing.legs.to.use" is not set, or set 0 or negative, the given 182 * name is returned unchanged. Otherwise, return a file name consisting of the 183 * N trailing legs of the input file name where N is the config value for 184 * "num.of.trailing.legs.to.use". 185 * 186 * @param job 187 * the job config 188 * @param name 189 * the output file name 190 * @return the outfile name based on a given anme and the input file name. 191 */ 192 protected String getInputFileBasedOutputFileName(JobConf job, String name) { 193 String infilepath = job.get(MRJobConfig.MAP_INPUT_FILE); 194 if (infilepath == null) { 195 // if the {@link JobContext#MAP_INPUT_FILE} does not exists, 196 // then return the given name 197 return name; 198 } 199 int numOfTrailingLegsToUse = job.getInt("mapred.outputformat.numOfTrailingLegs", 0); 200 if (numOfTrailingLegsToUse <= 0) { 201 return name; 202 } 203 Path infile = new Path(infilepath); 204 Path parent = infile.getParent(); 205 String midName = infile.getName(); 206 Path outPath = new Path(midName); 207 for (int i = 1; i < numOfTrailingLegsToUse; i++) { 208 if (parent == null) break; 209 midName = parent.getName(); 210 if (midName.length() == 0) break; 211 parent = parent.getParent(); 212 outPath = new Path(midName, outPath); 213 } 214 return outPath.toString(); 215 } 216 217 /** 218 * 219 * @param fs 220 * the file system to use 221 * @param job 222 * a job conf object 223 * @param name 224 * the name of the file over which a record writer object will be 225 * constructed 226 * @param arg3 227 * a progressable object 228 * @return A RecordWriter object over the given file 229 * @throws IOException 230 */ 231 abstract protected RecordWriter<K, V> getBaseRecordWriter(FileSystem fs, 232 JobConf job, String name, Progressable arg3) throws IOException; 233 }