001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.mapreduce.lib.output;
020
021 import java.io.DataOutputStream;
022 import java.io.IOException;
023 import java.io.UnsupportedEncodingException;
024
025 import org.apache.hadoop.classification.InterfaceAudience;
026 import org.apache.hadoop.classification.InterfaceStability;
027 import org.apache.hadoop.conf.Configuration;
028 import org.apache.hadoop.fs.FileSystem;
029 import org.apache.hadoop.fs.Path;
030 import org.apache.hadoop.fs.FSDataOutputStream;
031
032 import org.apache.hadoop.io.NullWritable;
033 import org.apache.hadoop.io.Text;
034 import org.apache.hadoop.io.compress.CompressionCodec;
035 import org.apache.hadoop.io.compress.GzipCodec;
036 import org.apache.hadoop.mapreduce.OutputFormat;
037 import org.apache.hadoop.mapreduce.RecordWriter;
038 import org.apache.hadoop.mapreduce.TaskAttemptContext;
039 import org.apache.hadoop.util.*;
040
041 /** An {@link OutputFormat} that writes plain text files. */
042 @InterfaceAudience.Public
043 @InterfaceStability.Stable
044 public class TextOutputFormat<K, V> extends FileOutputFormat<K, V> {
045 public static String SEPERATOR = "mapreduce.output.textoutputformat.separator";
046 protected static class LineRecordWriter<K, V>
047 extends RecordWriter<K, V> {
048 private static final String utf8 = "UTF-8";
049 private static final byte[] newline;
050 static {
051 try {
052 newline = "\n".getBytes(utf8);
053 } catch (UnsupportedEncodingException uee) {
054 throw new IllegalArgumentException("can't find " + utf8 + " encoding");
055 }
056 }
057
058 protected DataOutputStream out;
059 private final byte[] keyValueSeparator;
060
061 public LineRecordWriter(DataOutputStream out, String keyValueSeparator) {
062 this.out = out;
063 try {
064 this.keyValueSeparator = keyValueSeparator.getBytes(utf8);
065 } catch (UnsupportedEncodingException uee) {
066 throw new IllegalArgumentException("can't find " + utf8 + " encoding");
067 }
068 }
069
070 public LineRecordWriter(DataOutputStream out) {
071 this(out, "\t");
072 }
073
074 /**
075 * Write the object to the byte stream, handling Text as a special
076 * case.
077 * @param o the object to print
078 * @throws IOException if the write throws, we pass it on
079 */
080 private void writeObject(Object o) throws IOException {
081 if (o instanceof Text) {
082 Text to = (Text) o;
083 out.write(to.getBytes(), 0, to.getLength());
084 } else {
085 out.write(o.toString().getBytes(utf8));
086 }
087 }
088
089 public synchronized void write(K key, V value)
090 throws IOException {
091
092 boolean nullKey = key == null || key instanceof NullWritable;
093 boolean nullValue = value == null || value instanceof NullWritable;
094 if (nullKey && nullValue) {
095 return;
096 }
097 if (!nullKey) {
098 writeObject(key);
099 }
100 if (!(nullKey || nullValue)) {
101 out.write(keyValueSeparator);
102 }
103 if (!nullValue) {
104 writeObject(value);
105 }
106 out.write(newline);
107 }
108
109 public synchronized
110 void close(TaskAttemptContext context) throws IOException {
111 out.close();
112 }
113 }
114
115 public RecordWriter<K, V>
116 getRecordWriter(TaskAttemptContext job
117 ) throws IOException, InterruptedException {
118 Configuration conf = job.getConfiguration();
119 boolean isCompressed = getCompressOutput(job);
120 String keyValueSeparator= conf.get(SEPERATOR, "\t");
121 CompressionCodec codec = null;
122 String extension = "";
123 if (isCompressed) {
124 Class<? extends CompressionCodec> codecClass =
125 getOutputCompressorClass(job, GzipCodec.class);
126 codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);
127 extension = codec.getDefaultExtension();
128 }
129 Path file = getDefaultWorkFile(job, extension);
130 FileSystem fs = file.getFileSystem(conf);
131 if (!isCompressed) {
132 FSDataOutputStream fileOut = fs.create(file, false);
133 return new LineRecordWriter<K, V>(fileOut, keyValueSeparator);
134 } else {
135 FSDataOutputStream fileOut = fs.create(file, false);
136 return new LineRecordWriter<K, V>(new DataOutputStream
137 (codec.createOutputStream(fileOut)),
138 keyValueSeparator);
139 }
140 }
141 }
142