001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019 package org.apache.hadoop.mapreduce.lib.output; 020 021 import java.io.DataOutputStream; 022 import java.io.IOException; 023 import java.io.UnsupportedEncodingException; 024 025 import org.apache.hadoop.classification.InterfaceAudience; 026 import org.apache.hadoop.classification.InterfaceStability; 027 import org.apache.hadoop.conf.Configuration; 028 import org.apache.hadoop.fs.FileSystem; 029 import org.apache.hadoop.fs.Path; 030 import org.apache.hadoop.fs.FSDataOutputStream; 031 032 import org.apache.hadoop.io.NullWritable; 033 import org.apache.hadoop.io.Text; 034 import org.apache.hadoop.io.compress.CompressionCodec; 035 import org.apache.hadoop.io.compress.GzipCodec; 036 import org.apache.hadoop.mapreduce.OutputFormat; 037 import org.apache.hadoop.mapreduce.RecordWriter; 038 import org.apache.hadoop.mapreduce.TaskAttemptContext; 039 import org.apache.hadoop.util.*; 040 041 /** An {@link OutputFormat} that writes plain text files. */ 042 @InterfaceAudience.Public 043 @InterfaceStability.Stable 044 public class TextOutputFormat<K, V> extends FileOutputFormat<K, V> { 045 public static String SEPERATOR = "mapreduce.output.textoutputformat.separator"; 046 protected static class LineRecordWriter<K, V> 047 extends RecordWriter<K, V> { 048 private static final String utf8 = "UTF-8"; 049 private static final byte[] newline; 050 static { 051 try { 052 newline = "\n".getBytes(utf8); 053 } catch (UnsupportedEncodingException uee) { 054 throw new IllegalArgumentException("can't find " + utf8 + " encoding"); 055 } 056 } 057 058 protected DataOutputStream out; 059 private final byte[] keyValueSeparator; 060 061 public LineRecordWriter(DataOutputStream out, String keyValueSeparator) { 062 this.out = out; 063 try { 064 this.keyValueSeparator = keyValueSeparator.getBytes(utf8); 065 } catch (UnsupportedEncodingException uee) { 066 throw new IllegalArgumentException("can't find " + utf8 + " encoding"); 067 } 068 } 069 070 public LineRecordWriter(DataOutputStream out) { 071 this(out, "\t"); 072 } 073 074 /** 075 * Write the object to the byte stream, handling Text as a special 076 * case. 077 * @param o the object to print 078 * @throws IOException if the write throws, we pass it on 079 */ 080 private void writeObject(Object o) throws IOException { 081 if (o instanceof Text) { 082 Text to = (Text) o; 083 out.write(to.getBytes(), 0, to.getLength()); 084 } else { 085 out.write(o.toString().getBytes(utf8)); 086 } 087 } 088 089 public synchronized void write(K key, V value) 090 throws IOException { 091 092 boolean nullKey = key == null || key instanceof NullWritable; 093 boolean nullValue = value == null || value instanceof NullWritable; 094 if (nullKey && nullValue) { 095 return; 096 } 097 if (!nullKey) { 098 writeObject(key); 099 } 100 if (!(nullKey || nullValue)) { 101 out.write(keyValueSeparator); 102 } 103 if (!nullValue) { 104 writeObject(value); 105 } 106 out.write(newline); 107 } 108 109 public synchronized 110 void close(TaskAttemptContext context) throws IOException { 111 out.close(); 112 } 113 } 114 115 public RecordWriter<K, V> 116 getRecordWriter(TaskAttemptContext job 117 ) throws IOException, InterruptedException { 118 Configuration conf = job.getConfiguration(); 119 boolean isCompressed = getCompressOutput(job); 120 String keyValueSeparator= conf.get(SEPERATOR, "\t"); 121 CompressionCodec codec = null; 122 String extension = ""; 123 if (isCompressed) { 124 Class<? extends CompressionCodec> codecClass = 125 getOutputCompressorClass(job, GzipCodec.class); 126 codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf); 127 extension = codec.getDefaultExtension(); 128 } 129 Path file = getDefaultWorkFile(job, extension); 130 FileSystem fs = file.getFileSystem(conf); 131 if (!isCompressed) { 132 FSDataOutputStream fileOut = fs.create(file, false); 133 return new LineRecordWriter<K, V>(fileOut, keyValueSeparator); 134 } else { 135 FSDataOutputStream fileOut = fs.create(file, false); 136 return new LineRecordWriter<K, V>(new DataOutputStream 137 (codec.createOutputStream(fileOut)), 138 keyValueSeparator); 139 } 140 } 141 } 142