001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.hadoop.mapred;
019
020 import java.io.IOException;
021
022 import org.apache.hadoop.classification.InterfaceAudience;
023 import org.apache.hadoop.classification.InterfaceStability;
024 import org.apache.hadoop.fs.FileSystem;
025 import org.apache.hadoop.fs.Path;
026 import org.apache.hadoop.io.BytesWritable;
027 import org.apache.hadoop.io.SequenceFile;
028 import org.apache.hadoop.io.SequenceFile.CompressionType;
029 import org.apache.hadoop.io.Writable;
030 import org.apache.hadoop.io.WritableComparable;
031 import org.apache.hadoop.io.compress.CompressionCodec;
032 import org.apache.hadoop.io.compress.DefaultCodec;
033 import org.apache.hadoop.util.Progressable;
034 import org.apache.hadoop.util.ReflectionUtils;
035
036 /**
037 * An {@link OutputFormat} that writes keys, values to
038 * {@link SequenceFile}s in binary(raw) format
039 */
040 @InterfaceAudience.Public
041 @InterfaceStability.Stable
042 public class SequenceFileAsBinaryOutputFormat
043 extends SequenceFileOutputFormat <BytesWritable,BytesWritable> {
044
045 /**
046 * Inner class used for appendRaw
047 */
048 static protected class WritableValueBytes extends org.apache.hadoop.mapreduce
049 .lib.output.SequenceFileAsBinaryOutputFormat.WritableValueBytes {
050 public WritableValueBytes() {
051 super();
052 }
053
054 public WritableValueBytes(BytesWritable value) {
055 super(value);
056 }
057 }
058
059 /**
060 * Set the key class for the {@link SequenceFile}
061 * <p>This allows the user to specify the key class to be different
062 * from the actual class ({@link BytesWritable}) used for writing </p>
063 *
064 * @param conf the {@link JobConf} to modify
065 * @param theClass the SequenceFile output key class.
066 */
067 static public void setSequenceFileOutputKeyClass(JobConf conf,
068 Class<?> theClass) {
069 conf.setClass(org.apache.hadoop.mapreduce.lib.output.
070 SequenceFileAsBinaryOutputFormat.KEY_CLASS, theClass, Object.class);
071 }
072
073 /**
074 * Set the value class for the {@link SequenceFile}
075 * <p>This allows the user to specify the value class to be different
076 * from the actual class ({@link BytesWritable}) used for writing </p>
077 *
078 * @param conf the {@link JobConf} to modify
079 * @param theClass the SequenceFile output key class.
080 */
081 static public void setSequenceFileOutputValueClass(JobConf conf,
082 Class<?> theClass) {
083 conf.setClass(org.apache.hadoop.mapreduce.lib.output.
084 SequenceFileAsBinaryOutputFormat.VALUE_CLASS, theClass, Object.class);
085 }
086
087 /**
088 * Get the key class for the {@link SequenceFile}
089 *
090 * @return the key class of the {@link SequenceFile}
091 */
092 static public Class<? extends WritableComparable> getSequenceFileOutputKeyClass(JobConf conf) {
093 return conf.getClass(org.apache.hadoop.mapreduce.lib.output.
094 SequenceFileAsBinaryOutputFormat.KEY_CLASS,
095 conf.getOutputKeyClass().asSubclass(WritableComparable.class),
096 WritableComparable.class);
097 }
098
099 /**
100 * Get the value class for the {@link SequenceFile}
101 *
102 * @return the value class of the {@link SequenceFile}
103 */
104 static public Class<? extends Writable> getSequenceFileOutputValueClass(JobConf conf) {
105 return conf.getClass(org.apache.hadoop.mapreduce.lib.output.
106 SequenceFileAsBinaryOutputFormat.VALUE_CLASS,
107 conf.getOutputValueClass().asSubclass(Writable.class), Writable.class);
108 }
109
110 @Override
111 public RecordWriter <BytesWritable, BytesWritable>
112 getRecordWriter(FileSystem ignored, JobConf job,
113 String name, Progressable progress)
114 throws IOException {
115 // get the path of the temporary output file
116 Path file = FileOutputFormat.getTaskOutputPath(job, name);
117
118 FileSystem fs = file.getFileSystem(job);
119 CompressionCodec codec = null;
120 CompressionType compressionType = CompressionType.NONE;
121 if (getCompressOutput(job)) {
122 // find the kind of compression to do
123 compressionType = getOutputCompressionType(job);
124
125 // find the right codec
126 Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job,
127 DefaultCodec.class);
128 codec = ReflectionUtils.newInstance(codecClass, job);
129 }
130 final SequenceFile.Writer out =
131 SequenceFile.createWriter(fs, job, file,
132 getSequenceFileOutputKeyClass(job),
133 getSequenceFileOutputValueClass(job),
134 compressionType,
135 codec,
136 progress);
137
138 return new RecordWriter<BytesWritable, BytesWritable>() {
139
140 private WritableValueBytes wvaluebytes = new WritableValueBytes();
141
142 public void write(BytesWritable bkey, BytesWritable bvalue)
143 throws IOException {
144
145 wvaluebytes.reset(bvalue);
146 out.appendRaw(bkey.getBytes(), 0, bkey.getLength(), wvaluebytes);
147 wvaluebytes.reset(null);
148 }
149
150 public void close(Reporter reporter) throws IOException {
151 out.close();
152 }
153
154 };
155
156 }
157
158 @Override
159 public void checkOutputSpecs(FileSystem ignored, JobConf job)
160 throws IOException {
161 super.checkOutputSpecs(ignored, job);
162 if (getCompressOutput(job) &&
163 getOutputCompressionType(job) == CompressionType.RECORD ){
164 throw new InvalidJobConfException("SequenceFileAsBinaryOutputFormat "
165 + "doesn't support Record Compression" );
166 }
167
168 }
169
170 }