001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.mapred.lib;
020
021 import java.io.IOException;
022
023 import org.apache.hadoop.classification.InterfaceAudience;
024 import org.apache.hadoop.classification.InterfaceStability;
025 import org.apache.hadoop.fs.FileSystem;
026 import org.apache.hadoop.mapred.JobConf;
027 import org.apache.hadoop.mapred.OutputFormat;
028 import org.apache.hadoop.mapred.RecordWriter;
029 import org.apache.hadoop.mapred.Reporter;
030 import org.apache.hadoop.util.Progressable;
031 import org.apache.hadoop.util.ReflectionUtils;
032
033 /**
034 * A Convenience class that creates output lazily.
035 */
036 @InterfaceAudience.Public
037 @InterfaceStability.Stable
038 public class LazyOutputFormat<K, V> extends FilterOutputFormat<K, V> {
039 /**
040 * Set the underlying output format for LazyOutputFormat.
041 * @param job the {@link JobConf} to modify
042 * @param theClass the underlying class
043 */
044 @SuppressWarnings("unchecked")
045 public static void setOutputFormatClass(JobConf job,
046 Class<? extends OutputFormat> theClass) {
047 job.setOutputFormat(LazyOutputFormat.class);
048 job.setClass("mapreduce.output.lazyoutputformat.outputformat", theClass, OutputFormat.class);
049 }
050
051 @Override
052 public RecordWriter<K, V> getRecordWriter(FileSystem ignored, JobConf job,
053 String name, Progressable progress) throws IOException {
054 if (baseOut == null) {
055 getBaseOutputFormat(job);
056 }
057 return new LazyRecordWriter<K, V>(job, baseOut, name, progress);
058 }
059
060 @Override
061 public void checkOutputSpecs(FileSystem ignored, JobConf job)
062 throws IOException {
063 if (baseOut == null) {
064 getBaseOutputFormat(job);
065 }
066 super.checkOutputSpecs(ignored, job);
067 }
068
069 @SuppressWarnings("unchecked")
070 private void getBaseOutputFormat(JobConf job) throws IOException {
071 baseOut = ReflectionUtils.newInstance(
072 job.getClass("mapreduce.output.lazyoutputformat.outputformat", null, OutputFormat.class),
073 job);
074 if (baseOut == null) {
075 throw new IOException("Ouput format not set for LazyOutputFormat");
076 }
077 }
078
079 /**
080 * <code>LazyRecordWriter</code> is a convenience
081 * class that works with LazyOutputFormat.
082 */
083
084 private static class LazyRecordWriter<K,V> extends FilterRecordWriter<K,V> {
085
086 final OutputFormat of;
087 final String name;
088 final Progressable progress;
089 final JobConf job;
090
091 public LazyRecordWriter(JobConf job, OutputFormat of, String name,
092 Progressable progress) throws IOException {
093 this.of = of;
094 this.job = job;
095 this.name = name;
096 this.progress = progress;
097 }
098
099 @Override
100 public void close(Reporter reporter) throws IOException {
101 if (rawWriter != null) {
102 rawWriter.close(reporter);
103 }
104 }
105
106 @Override
107 public void write(K key, V value) throws IOException {
108 if (rawWriter == null) {
109 createRecordWriter();
110 }
111 super.write(key, value);
112 }
113
114 @SuppressWarnings("unchecked")
115 private void createRecordWriter() throws IOException {
116 FileSystem fs = FileSystem.get(job);
117 rawWriter = of.getRecordWriter(fs, job, name, progress);
118 }
119 }
120 }