001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019 package org.apache.hadoop.mapred; 020 021 import java.io.DataOutputStream; 022 import java.io.IOException; 023 import java.io.UnsupportedEncodingException; 024 025 import org.apache.hadoop.classification.InterfaceAudience; 026 import org.apache.hadoop.classification.InterfaceStability; 027 import org.apache.hadoop.fs.FileSystem; 028 import org.apache.hadoop.fs.Path; 029 import org.apache.hadoop.fs.FSDataOutputStream; 030 031 import org.apache.hadoop.io.NullWritable; 032 import org.apache.hadoop.io.Text; 033 import org.apache.hadoop.io.compress.CompressionCodec; 034 import org.apache.hadoop.io.compress.GzipCodec; 035 import org.apache.hadoop.util.*; 036 037 /** 038 * An {@link OutputFormat} that writes plain text files. 039 */ 040 @InterfaceAudience.Public 041 @InterfaceStability.Stable 042 public class TextOutputFormat<K, V> extends FileOutputFormat<K, V> { 043 044 protected static class LineRecordWriter<K, V> 045 implements RecordWriter<K, V> { 046 private static final String utf8 = "UTF-8"; 047 private static final byte[] newline; 048 static { 049 try { 050 newline = "\n".getBytes(utf8); 051 } catch (UnsupportedEncodingException uee) { 052 throw new IllegalArgumentException("can't find " + utf8 + " encoding"); 053 } 054 } 055 056 protected DataOutputStream out; 057 private final byte[] keyValueSeparator; 058 059 public LineRecordWriter(DataOutputStream out, String keyValueSeparator) { 060 this.out = out; 061 try { 062 this.keyValueSeparator = keyValueSeparator.getBytes(utf8); 063 } catch (UnsupportedEncodingException uee) { 064 throw new IllegalArgumentException("can't find " + utf8 + " encoding"); 065 } 066 } 067 068 public LineRecordWriter(DataOutputStream out) { 069 this(out, "\t"); 070 } 071 072 /** 073 * Write the object to the byte stream, handling Text as a special 074 * case. 075 * @param o the object to print 076 * @throws IOException if the write throws, we pass it on 077 */ 078 private void writeObject(Object o) throws IOException { 079 if (o instanceof Text) { 080 Text to = (Text) o; 081 out.write(to.getBytes(), 0, to.getLength()); 082 } else { 083 out.write(o.toString().getBytes(utf8)); 084 } 085 } 086 087 public synchronized void write(K key, V value) 088 throws IOException { 089 090 boolean nullKey = key == null || key instanceof NullWritable; 091 boolean nullValue = value == null || value instanceof NullWritable; 092 if (nullKey && nullValue) { 093 return; 094 } 095 if (!nullKey) { 096 writeObject(key); 097 } 098 if (!(nullKey || nullValue)) { 099 out.write(keyValueSeparator); 100 } 101 if (!nullValue) { 102 writeObject(value); 103 } 104 out.write(newline); 105 } 106 107 public synchronized void close(Reporter reporter) throws IOException { 108 out.close(); 109 } 110 } 111 112 public RecordWriter<K, V> getRecordWriter(FileSystem ignored, 113 JobConf job, 114 String name, 115 Progressable progress) 116 throws IOException { 117 boolean isCompressed = getCompressOutput(job); 118 String keyValueSeparator = job.get("mapreduce.output.textoutputformat.separator", 119 "\t"); 120 if (!isCompressed) { 121 Path file = FileOutputFormat.getTaskOutputPath(job, name); 122 FileSystem fs = file.getFileSystem(job); 123 FSDataOutputStream fileOut = fs.create(file, progress); 124 return new LineRecordWriter<K, V>(fileOut, keyValueSeparator); 125 } else { 126 Class<? extends CompressionCodec> codecClass = 127 getOutputCompressorClass(job, GzipCodec.class); 128 // create the named codec 129 CompressionCodec codec = ReflectionUtils.newInstance(codecClass, job); 130 // build the filename including the extension 131 Path file = 132 FileOutputFormat.getTaskOutputPath(job, 133 name + codec.getDefaultExtension()); 134 FileSystem fs = file.getFileSystem(job); 135 FSDataOutputStream fileOut = fs.create(file, progress); 136 return new LineRecordWriter<K, V>(new DataOutputStream 137 (codec.createOutputStream(fileOut)), 138 keyValueSeparator); 139 } 140 } 141 } 142