001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.hadoop.mapreduce.lib.output;
019
020 import java.io.IOException;
021 import java.io.DataOutputStream;
022
023 import org.apache.hadoop.classification.InterfaceAudience;
024 import org.apache.hadoop.classification.InterfaceStability;
025 import org.apache.hadoop.io.WritableComparable;
026 import org.apache.hadoop.io.Writable;
027 import org.apache.hadoop.io.BytesWritable;
028 import org.apache.hadoop.io.SequenceFile;
029 import org.apache.hadoop.io.SequenceFile.CompressionType;
030 import org.apache.hadoop.io.SequenceFile.ValueBytes;
031 import org.apache.hadoop.mapred.InvalidJobConfException;
032 import org.apache.hadoop.mapreduce.Job;
033 import org.apache.hadoop.mapreduce.JobContext;
034 import org.apache.hadoop.mapreduce.RecordWriter;
035 import org.apache.hadoop.mapreduce.TaskAttemptContext;
036
037 /**
038 * An {@link org.apache.hadoop.mapreduce.OutputFormat} that writes keys,
039 * values to {@link SequenceFile}s in binary(raw) format
040 */
041 @InterfaceAudience.Public
042 @InterfaceStability.Stable
043 public class SequenceFileAsBinaryOutputFormat
044 extends SequenceFileOutputFormat <BytesWritable,BytesWritable> {
045 public static String KEY_CLASS = "mapreduce.output.seqbinaryoutputformat.key.class";
046 public static String VALUE_CLASS = "mapreduce.output.seqbinaryoutputformat.value.class";
047
048 /**
049 * Inner class used for appendRaw
050 */
051 static public class WritableValueBytes implements ValueBytes {
052 private BytesWritable value;
053
054 public WritableValueBytes() {
055 this.value = null;
056 }
057
058 public WritableValueBytes(BytesWritable value) {
059 this.value = value;
060 }
061
062 public void reset(BytesWritable value) {
063 this.value = value;
064 }
065
066 public void writeUncompressedBytes(DataOutputStream outStream)
067 throws IOException {
068 outStream.write(value.getBytes(), 0, value.getLength());
069 }
070
071 public void writeCompressedBytes(DataOutputStream outStream)
072 throws IllegalArgumentException, IOException {
073 throw new UnsupportedOperationException(
074 "WritableValueBytes doesn't support RECORD compression");
075 }
076
077 public int getSize(){
078 return value.getLength();
079 }
080 }
081
082 /**
083 * Set the key class for the {@link SequenceFile}
084 * <p>This allows the user to specify the key class to be different
085 * from the actual class ({@link BytesWritable}) used for writing </p>
086 *
087 * @param job the {@link Job} to modify
088 * @param theClass the SequenceFile output key class.
089 */
090 static public void setSequenceFileOutputKeyClass(Job job,
091 Class<?> theClass) {
092 job.getConfiguration().setClass(KEY_CLASS,
093 theClass, Object.class);
094 }
095
096 /**
097 * Set the value class for the {@link SequenceFile}
098 * <p>This allows the user to specify the value class to be different
099 * from the actual class ({@link BytesWritable}) used for writing </p>
100 *
101 * @param job the {@link Job} to modify
102 * @param theClass the SequenceFile output key class.
103 */
104 static public void setSequenceFileOutputValueClass(Job job,
105 Class<?> theClass) {
106 job.getConfiguration().setClass(VALUE_CLASS,
107 theClass, Object.class);
108 }
109
110 /**
111 * Get the key class for the {@link SequenceFile}
112 *
113 * @return the key class of the {@link SequenceFile}
114 */
115 static public Class<? extends WritableComparable>
116 getSequenceFileOutputKeyClass(JobContext job) {
117 return job.getConfiguration().getClass(KEY_CLASS,
118 job.getOutputKeyClass().asSubclass(WritableComparable.class),
119 WritableComparable.class);
120 }
121
122 /**
123 * Get the value class for the {@link SequenceFile}
124 *
125 * @return the value class of the {@link SequenceFile}
126 */
127 static public Class<? extends Writable> getSequenceFileOutputValueClass(
128 JobContext job) {
129 return job.getConfiguration().getClass(VALUE_CLASS,
130 job.getOutputValueClass().asSubclass(Writable.class), Writable.class);
131 }
132
133 @Override
134 public RecordWriter<BytesWritable, BytesWritable> getRecordWriter(
135 TaskAttemptContext context) throws IOException {
136 final SequenceFile.Writer out = getSequenceWriter(context,
137 getSequenceFileOutputKeyClass(context),
138 getSequenceFileOutputValueClass(context));
139
140 return new RecordWriter<BytesWritable, BytesWritable>() {
141 private WritableValueBytes wvaluebytes = new WritableValueBytes();
142
143 public void write(BytesWritable bkey, BytesWritable bvalue)
144 throws IOException {
145 wvaluebytes.reset(bvalue);
146 out.appendRaw(bkey.getBytes(), 0, bkey.getLength(), wvaluebytes);
147 wvaluebytes.reset(null);
148 }
149
150 public void close(TaskAttemptContext context) throws IOException {
151 out.close();
152 }
153 };
154 }
155
156 @Override
157 public void checkOutputSpecs(JobContext job) throws IOException {
158 super.checkOutputSpecs(job);
159 if (getCompressOutput(job) &&
160 getOutputCompressionType(job) == CompressionType.RECORD ) {
161 throw new InvalidJobConfException("SequenceFileAsBinaryOutputFormat "
162 + "doesn't support Record Compression" );
163 }
164 }
165 }