001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019 package org.apache.hadoop.io.serializer; 020 021 import java.io.DataInputStream; 022 import java.io.DataOutputStream; 023 import java.io.IOException; 024 import java.io.InputStream; 025 import java.io.OutputStream; 026 import org.apache.hadoop.classification.InterfaceAudience; 027 import org.apache.hadoop.classification.InterfaceStability; 028 import org.apache.hadoop.conf.Configuration; 029 import org.apache.hadoop.conf.Configured; 030 import org.apache.hadoop.io.Writable; 031 import org.apache.hadoop.util.ReflectionUtils; 032 033 /** 034 * A {@link Serialization} for {@link Writable}s that delegates to 035 * {@link Writable#write(java.io.DataOutput)} and 036 * {@link Writable#readFields(java.io.DataInput)}. 037 */ 038 @InterfaceAudience.Public 039 @InterfaceStability.Evolving 040 public class WritableSerialization extends Configured 041 implements Serialization<Writable> { 042 static class WritableDeserializer extends Configured 043 implements Deserializer<Writable> { 044 045 private Class<?> writableClass; 046 private DataInputStream dataIn; 047 048 public WritableDeserializer(Configuration conf, Class<?> c) { 049 setConf(conf); 050 this.writableClass = c; 051 } 052 053 @Override 054 public void open(InputStream in) { 055 if (in instanceof DataInputStream) { 056 dataIn = (DataInputStream) in; 057 } else { 058 dataIn = new DataInputStream(in); 059 } 060 } 061 062 @Override 063 public Writable deserialize(Writable w) throws IOException { 064 Writable writable; 065 if (w == null) { 066 writable 067 = (Writable) ReflectionUtils.newInstance(writableClass, getConf()); 068 } else { 069 writable = w; 070 } 071 writable.readFields(dataIn); 072 return writable; 073 } 074 075 @Override 076 public void close() throws IOException { 077 dataIn.close(); 078 } 079 080 } 081 082 static class WritableSerializer extends Configured implements 083 Serializer<Writable> { 084 085 private DataOutputStream dataOut; 086 087 @Override 088 public void open(OutputStream out) { 089 if (out instanceof DataOutputStream) { 090 dataOut = (DataOutputStream) out; 091 } else { 092 dataOut = new DataOutputStream(out); 093 } 094 } 095 096 @Override 097 public void serialize(Writable w) throws IOException { 098 w.write(dataOut); 099 } 100 101 @Override 102 public void close() throws IOException { 103 dataOut.close(); 104 } 105 106 } 107 108 @InterfaceAudience.Private 109 @Override 110 public boolean accept(Class<?> c) { 111 return Writable.class.isAssignableFrom(c); 112 } 113 114 @InterfaceAudience.Private 115 @Override 116 public Serializer<Writable> getSerializer(Class<Writable> c) { 117 return new WritableSerializer(); 118 } 119 120 @InterfaceAudience.Private 121 @Override 122 public Deserializer<Writable> getDeserializer(Class<Writable> c) { 123 return new WritableDeserializer(getConf(), c); 124 } 125 126 }