001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.mapreduce.lib.output;
020
021import java.io.DataOutputStream;
022import java.io.IOException;
023import java.nio.charset.StandardCharsets;
024
025import org.apache.hadoop.classification.InterfaceAudience;
026import org.apache.hadoop.classification.InterfaceStability;
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.fs.FileSystem;
029import org.apache.hadoop.fs.Path;
030import org.apache.hadoop.fs.FSDataOutputStream;
031
032import org.apache.hadoop.io.NullWritable;
033import org.apache.hadoop.io.Text;
034import org.apache.hadoop.io.compress.CompressionCodec;
035import org.apache.hadoop.io.compress.GzipCodec;
036import org.apache.hadoop.mapreduce.OutputFormat;
037import org.apache.hadoop.mapreduce.RecordWriter;
038import org.apache.hadoop.mapreduce.TaskAttemptContext;
039import org.apache.hadoop.util.*;
040
041/** An {@link OutputFormat} that writes plain text files. */
042@InterfaceAudience.Public
043@InterfaceStability.Stable
044public class TextOutputFormat<K, V> extends FileOutputFormat<K, V> {
045  public static String SEPERATOR = "mapreduce.output.textoutputformat.separator";
046  protected static class LineRecordWriter<K, V>
047    extends RecordWriter<K, V> {
048    private static final byte[] NEWLINE =
049      "\n".getBytes(StandardCharsets.UTF_8);
050
051    protected DataOutputStream out;
052    private final byte[] keyValueSeparator;
053
054    public LineRecordWriter(DataOutputStream out, String keyValueSeparator) {
055      this.out = out;
056      this.keyValueSeparator =
057        keyValueSeparator.getBytes(StandardCharsets.UTF_8);
058    }
059
060    public LineRecordWriter(DataOutputStream out) {
061      this(out, "\t");
062    }
063
064    /**
065     * Write the object to the byte stream, handling Text as a special
066     * case.
067     * @param o the object to print
068     * @throws IOException if the write throws, we pass it on
069     */
070    private void writeObject(Object o) throws IOException {
071      if (o instanceof Text) {
072        Text to = (Text) o;
073        out.write(to.getBytes(), 0, to.getLength());
074      } else {
075        out.write(o.toString().getBytes(StandardCharsets.UTF_8));
076      }
077    }
078
079    public synchronized void write(K key, V value)
080      throws IOException {
081
082      boolean nullKey = key == null || key instanceof NullWritable;
083      boolean nullValue = value == null || value instanceof NullWritable;
084      if (nullKey && nullValue) {
085        return;
086      }
087      if (!nullKey) {
088        writeObject(key);
089      }
090      if (!(nullKey || nullValue)) {
091        out.write(keyValueSeparator);
092      }
093      if (!nullValue) {
094        writeObject(value);
095      }
096      out.write(NEWLINE);
097    }
098
099    public synchronized 
100    void close(TaskAttemptContext context) throws IOException {
101      out.close();
102    }
103  }
104
105  public RecordWriter<K, V> 
106         getRecordWriter(TaskAttemptContext job
107                         ) throws IOException, InterruptedException {
108    Configuration conf = job.getConfiguration();
109    boolean isCompressed = getCompressOutput(job);
110    String keyValueSeparator= conf.get(SEPERATOR, "\t");
111    CompressionCodec codec = null;
112    String extension = "";
113    if (isCompressed) {
114      Class<? extends CompressionCodec> codecClass = 
115        getOutputCompressorClass(job, GzipCodec.class);
116      codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);
117      extension = codec.getDefaultExtension();
118    }
119    Path file = getDefaultWorkFile(job, extension);
120    FileSystem fs = file.getFileSystem(conf);
121    if (!isCompressed) {
122      FSDataOutputStream fileOut = fs.create(file, false);
123      return new LineRecordWriter<K, V>(fileOut, keyValueSeparator);
124    } else {
125      FSDataOutputStream fileOut = fs.create(file, false);
126      return new LineRecordWriter<K, V>(new DataOutputStream
127                                        (codec.createOutputStream(fileOut)),
128                                        keyValueSeparator);
129    }
130  }
131}
132