001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.mapred; 020 021import java.io.DataOutputStream; 022import java.io.IOException; 023import java.nio.charset.StandardCharsets; 024 025import org.apache.hadoop.classification.InterfaceAudience; 026import org.apache.hadoop.classification.InterfaceStability; 027import org.apache.hadoop.fs.FileSystem; 028import org.apache.hadoop.fs.Path; 029import org.apache.hadoop.fs.FSDataOutputStream; 030 031import org.apache.hadoop.io.NullWritable; 032import org.apache.hadoop.io.Text; 033import org.apache.hadoop.io.compress.CompressionCodec; 034import org.apache.hadoop.io.compress.GzipCodec; 035import org.apache.hadoop.util.*; 036 037/** 038 * An {@link OutputFormat} that writes plain text files. 039 */ 040@InterfaceAudience.Public 041@InterfaceStability.Stable 042public class TextOutputFormat<K, V> extends FileOutputFormat<K, V> { 043 044 protected static class LineRecordWriter<K, V> 045 implements RecordWriter<K, V> { 046 private static final byte[] NEWLINE = 047 "\n".getBytes(StandardCharsets.UTF_8); 048 049 protected DataOutputStream out; 050 private final byte[] keyValueSeparator; 051 052 public LineRecordWriter(DataOutputStream out, String keyValueSeparator) { 053 this.out = out; 054 this.keyValueSeparator = 055 keyValueSeparator.getBytes(StandardCharsets.UTF_8); 056 } 057 058 public LineRecordWriter(DataOutputStream out) { 059 this(out, "\t"); 060 } 061 062 /** 063 * Write the object to the byte stream, handling Text as a special 064 * case. 065 * @param o the object to print 066 * @throws IOException if the write throws, we pass it on 067 */ 068 private void writeObject(Object o) throws IOException { 069 if (o instanceof Text) { 070 Text to = (Text) o; 071 out.write(to.getBytes(), 0, to.getLength()); 072 } else { 073 out.write(o.toString().getBytes(StandardCharsets.UTF_8)); 074 } 075 } 076 077 public synchronized void write(K key, V value) 078 throws IOException { 079 080 boolean nullKey = key == null || key instanceof NullWritable; 081 boolean nullValue = value == null || value instanceof NullWritable; 082 if (nullKey && nullValue) { 083 return; 084 } 085 if (!nullKey) { 086 writeObject(key); 087 } 088 if (!(nullKey || nullValue)) { 089 out.write(keyValueSeparator); 090 } 091 if (!nullValue) { 092 writeObject(value); 093 } 094 out.write(NEWLINE); 095 } 096 097 public synchronized void close(Reporter reporter) throws IOException { 098 out.close(); 099 } 100 } 101 102 public RecordWriter<K, V> getRecordWriter(FileSystem ignored, 103 JobConf job, 104 String name, 105 Progressable progress) 106 throws IOException { 107 boolean isCompressed = getCompressOutput(job); 108 String keyValueSeparator = job.get("mapreduce.output.textoutputformat.separator", 109 "\t"); 110 if (!isCompressed) { 111 Path file = FileOutputFormat.getTaskOutputPath(job, name); 112 FileSystem fs = file.getFileSystem(job); 113 FSDataOutputStream fileOut = fs.create(file, progress); 114 return new LineRecordWriter<K, V>(fileOut, keyValueSeparator); 115 } else { 116 Class<? extends CompressionCodec> codecClass = 117 getOutputCompressorClass(job, GzipCodec.class); 118 // create the named codec 119 CompressionCodec codec = ReflectionUtils.newInstance(codecClass, job); 120 // build the filename including the extension 121 Path file = 122 FileOutputFormat.getTaskOutputPath(job, 123 name + codec.getDefaultExtension()); 124 FileSystem fs = file.getFileSystem(job); 125 FSDataOutputStream fileOut = fs.create(file, progress); 126 return new LineRecordWriter<K, V>(new DataOutputStream 127 (codec.createOutputStream(fileOut)), 128 keyValueSeparator); 129 } 130 } 131} 132