001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.mapreduce.lib.output;
020
021import java.io.DataOutputStream;
022import java.io.IOException;
023import java.io.UnsupportedEncodingException;
024
025import org.apache.hadoop.classification.InterfaceAudience;
026import org.apache.hadoop.classification.InterfaceStability;
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.fs.FileSystem;
029import org.apache.hadoop.fs.Path;
030import org.apache.hadoop.fs.FSDataOutputStream;
031
032import org.apache.hadoop.io.NullWritable;
033import org.apache.hadoop.io.Text;
034import org.apache.hadoop.io.compress.CompressionCodec;
035import org.apache.hadoop.io.compress.GzipCodec;
036import org.apache.hadoop.mapreduce.OutputFormat;
037import org.apache.hadoop.mapreduce.RecordWriter;
038import org.apache.hadoop.mapreduce.TaskAttemptContext;
039import org.apache.hadoop.util.*;
040
041/** An {@link OutputFormat} that writes plain text files. */
042@InterfaceAudience.Public
043@InterfaceStability.Stable
044public class TextOutputFormat<K, V> extends FileOutputFormat<K, V> {
045  public static String SEPERATOR = "mapreduce.output.textoutputformat.separator";
046  protected static class LineRecordWriter<K, V>
047    extends RecordWriter<K, V> {
048    private static final String utf8 = "UTF-8";
049    private static final byte[] newline;
050    static {
051      try {
052        newline = "\n".getBytes(utf8);
053      } catch (UnsupportedEncodingException uee) {
054        throw new IllegalArgumentException("can't find " + utf8 + " encoding");
055      }
056    }
057
058    protected DataOutputStream out;
059    private final byte[] keyValueSeparator;
060
061    public LineRecordWriter(DataOutputStream out, String keyValueSeparator) {
062      this.out = out;
063      try {
064        this.keyValueSeparator = keyValueSeparator.getBytes(utf8);
065      } catch (UnsupportedEncodingException uee) {
066        throw new IllegalArgumentException("can't find " + utf8 + " encoding");
067      }
068    }
069
070    public LineRecordWriter(DataOutputStream out) {
071      this(out, "\t");
072    }
073
074    /**
075     * Write the object to the byte stream, handling Text as a special
076     * case.
077     * @param o the object to print
078     * @throws IOException if the write throws, we pass it on
079     */
080    private void writeObject(Object o) throws IOException {
081      if (o instanceof Text) {
082        Text to = (Text) o;
083        out.write(to.getBytes(), 0, to.getLength());
084      } else {
085        out.write(o.toString().getBytes(utf8));
086      }
087    }
088
089    public synchronized void write(K key, V value)
090      throws IOException {
091
092      boolean nullKey = key == null || key instanceof NullWritable;
093      boolean nullValue = value == null || value instanceof NullWritable;
094      if (nullKey && nullValue) {
095        return;
096      }
097      if (!nullKey) {
098        writeObject(key);
099      }
100      if (!(nullKey || nullValue)) {
101        out.write(keyValueSeparator);
102      }
103      if (!nullValue) {
104        writeObject(value);
105      }
106      out.write(newline);
107    }
108
109    public synchronized 
110    void close(TaskAttemptContext context) throws IOException {
111      out.close();
112    }
113  }
114
115  public RecordWriter<K, V> 
116         getRecordWriter(TaskAttemptContext job
117                         ) throws IOException, InterruptedException {
118    Configuration conf = job.getConfiguration();
119    boolean isCompressed = getCompressOutput(job);
120    String keyValueSeparator= conf.get(SEPERATOR, "\t");
121    CompressionCodec codec = null;
122    String extension = "";
123    if (isCompressed) {
124      Class<? extends CompressionCodec> codecClass = 
125        getOutputCompressorClass(job, GzipCodec.class);
126      codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);
127      extension = codec.getDefaultExtension();
128    }
129    Path file = getDefaultWorkFile(job, extension);
130    FileSystem fs = file.getFileSystem(conf);
131    if (!isCompressed) {
132      FSDataOutputStream fileOut = fs.create(file, false);
133      return new LineRecordWriter<K, V>(fileOut, keyValueSeparator);
134    } else {
135      FSDataOutputStream fileOut = fs.create(file, false);
136      return new LineRecordWriter<K, V>(new DataOutputStream
137                                        (codec.createOutputStream(fileOut)),
138                                        keyValueSeparator);
139    }
140  }
141}
142