001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019 package org.apache.hadoop.mapred.lib; 020 021 import java.io.IOException; 022 023 import org.apache.hadoop.classification.InterfaceAudience; 024 import org.apache.hadoop.classification.InterfaceStability; 025 import org.apache.hadoop.fs.FileSystem; 026 import org.apache.hadoop.mapred.JobConf; 027 import org.apache.hadoop.mapred.OutputFormat; 028 import org.apache.hadoop.mapred.RecordWriter; 029 import org.apache.hadoop.mapred.Reporter; 030 import org.apache.hadoop.util.Progressable; 031 import org.apache.hadoop.util.ReflectionUtils; 032 033 /** 034 * A Convenience class that creates output lazily. 035 */ 036 @InterfaceAudience.Public 037 @InterfaceStability.Stable 038 public class LazyOutputFormat<K, V> extends FilterOutputFormat<K, V> { 039 /** 040 * Set the underlying output format for LazyOutputFormat. 041 * @param job the {@link JobConf} to modify 042 * @param theClass the underlying class 043 */ 044 @SuppressWarnings("unchecked") 045 public static void setOutputFormatClass(JobConf job, 046 Class<? extends OutputFormat> theClass) { 047 job.setOutputFormat(LazyOutputFormat.class); 048 job.setClass("mapreduce.output.lazyoutputformat.outputformat", theClass, OutputFormat.class); 049 } 050 051 @Override 052 public RecordWriter<K, V> getRecordWriter(FileSystem ignored, JobConf job, 053 String name, Progressable progress) throws IOException { 054 if (baseOut == null) { 055 getBaseOutputFormat(job); 056 } 057 return new LazyRecordWriter<K, V>(job, baseOut, name, progress); 058 } 059 060 @Override 061 public void checkOutputSpecs(FileSystem ignored, JobConf job) 062 throws IOException { 063 if (baseOut == null) { 064 getBaseOutputFormat(job); 065 } 066 super.checkOutputSpecs(ignored, job); 067 } 068 069 @SuppressWarnings("unchecked") 070 private void getBaseOutputFormat(JobConf job) throws IOException { 071 baseOut = ReflectionUtils.newInstance( 072 job.getClass("mapreduce.output.lazyoutputformat.outputformat", null, OutputFormat.class), 073 job); 074 if (baseOut == null) { 075 throw new IOException("Ouput format not set for LazyOutputFormat"); 076 } 077 } 078 079 /** 080 * <code>LazyRecordWriter</code> is a convenience 081 * class that works with LazyOutputFormat. 082 */ 083 084 private static class LazyRecordWriter<K,V> extends FilterRecordWriter<K,V> { 085 086 final OutputFormat of; 087 final String name; 088 final Progressable progress; 089 final JobConf job; 090 091 public LazyRecordWriter(JobConf job, OutputFormat of, String name, 092 Progressable progress) throws IOException { 093 this.of = of; 094 this.job = job; 095 this.name = name; 096 this.progress = progress; 097 } 098 099 @Override 100 public void close(Reporter reporter) throws IOException { 101 if (rawWriter != null) { 102 rawWriter.close(reporter); 103 } 104 } 105 106 @Override 107 public void write(K key, V value) throws IOException { 108 if (rawWriter == null) { 109 createRecordWriter(); 110 } 111 super.write(key, value); 112 } 113 114 @SuppressWarnings("unchecked") 115 private void createRecordWriter() throws IOException { 116 FileSystem fs = FileSystem.get(job); 117 rawWriter = of.getRecordWriter(fs, job, name, progress); 118 } 119 } 120 }