001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.io; 019 020import java.io.DataInput; 021import java.io.DataOutput; 022import java.io.IOException; 023import java.util.Map; 024import java.util.concurrent.ConcurrentHashMap; 025import java.util.concurrent.atomic.AtomicReference; 026 027import org.apache.hadoop.classification.InterfaceAudience; 028import org.apache.hadoop.classification.InterfaceStability; 029import org.apache.hadoop.conf.Configurable; 030import org.apache.hadoop.conf.Configuration; 031 032import com.google.common.annotations.VisibleForTesting; 033 034/** 035 * Abstract base class for MapWritable and SortedMapWritable 036 * 037 * Unlike org.apache.nutch.crawl.MapWritable, this class allows creation of 038 * MapWritable<Writable, MapWritable> so the CLASS_TO_ID and ID_TO_CLASS 039 * maps travel with the class instead of being static. 040 * 041 * Class ids range from 1 to 127 so there can be at most 127 distinct classes 042 * in any specific map instance. 043 */ 044@InterfaceAudience.Public 045@InterfaceStability.Stable 046public abstract class AbstractMapWritable implements Writable, Configurable { 047 private AtomicReference<Configuration> conf; 048 049 /* Class to id mappings */ 050 @VisibleForTesting 051 Map<Class<?>, Byte> classToIdMap = new ConcurrentHashMap<Class<?>, Byte>(); 052 053 /* Id to Class mappings */ 054 @VisibleForTesting 055 Map<Byte, Class<?>> idToClassMap = new ConcurrentHashMap<Byte, Class<?>>(); 056 057 /* The number of new classes (those not established by the constructor) */ 058 private volatile byte newClasses = 0; 059 060 /** @return the number of known classes */ 061 byte getNewClasses() { 062 return newClasses; 063 } 064 065 /** 066 * Used to add "predefined" classes and by Writable to copy "new" classes. 067 */ 068 private synchronized void addToMap(Class<?> clazz, byte id) { 069 if (classToIdMap.containsKey(clazz)) { 070 byte b = classToIdMap.get(clazz); 071 if (b != id) { 072 throw new IllegalArgumentException ("Class " + clazz.getName() + 073 " already registered but maps to " + b + " and not " + id); 074 } 075 } 076 if (idToClassMap.containsKey(id)) { 077 Class<?> c = idToClassMap.get(id); 078 if (!c.equals(clazz)) { 079 throw new IllegalArgumentException("Id " + id + " exists but maps to " + 080 c.getName() + " and not " + clazz.getName()); 081 } 082 } 083 classToIdMap.put(clazz, id); 084 idToClassMap.put(id, clazz); 085 } 086 087 /** Add a Class to the maps if it is not already present. */ 088 protected synchronized void addToMap(Class<?> clazz) { 089 if (classToIdMap.containsKey(clazz)) { 090 return; 091 } 092 if (newClasses + 1 > Byte.MAX_VALUE) { 093 throw new IndexOutOfBoundsException("adding an additional class would" + 094 " exceed the maximum number allowed"); 095 } 096 byte id = ++newClasses; 097 addToMap(clazz, id); 098 } 099 100 /** @return the Class class for the specified id */ 101 protected Class<?> getClass(byte id) { 102 return idToClassMap.get(id); 103 } 104 105 /** @return the id for the specified Class */ 106 protected byte getId(Class<?> clazz) { 107 return classToIdMap.containsKey(clazz) ? classToIdMap.get(clazz) : -1; 108 } 109 110 /** Used by child copy constructors. */ 111 protected synchronized void copy(Writable other) { 112 if (other != null) { 113 try { 114 DataOutputBuffer out = new DataOutputBuffer(); 115 other.write(out); 116 DataInputBuffer in = new DataInputBuffer(); 117 in.reset(out.getData(), out.getLength()); 118 readFields(in); 119 120 } catch (IOException e) { 121 throw new IllegalArgumentException("map cannot be copied: " + 122 e.getMessage()); 123 } 124 125 } else { 126 throw new IllegalArgumentException("source map cannot be null"); 127 } 128 } 129 130 /** constructor. */ 131 protected AbstractMapWritable() { 132 this.conf = new AtomicReference<Configuration>(); 133 134 addToMap(ArrayWritable.class, 135 Byte.valueOf(Integer.valueOf(-127).byteValue())); 136 addToMap(BooleanWritable.class, 137 Byte.valueOf(Integer.valueOf(-126).byteValue())); 138 addToMap(BytesWritable.class, 139 Byte.valueOf(Integer.valueOf(-125).byteValue())); 140 addToMap(FloatWritable.class, 141 Byte.valueOf(Integer.valueOf(-124).byteValue())); 142 addToMap(IntWritable.class, 143 Byte.valueOf(Integer.valueOf(-123).byteValue())); 144 addToMap(LongWritable.class, 145 Byte.valueOf(Integer.valueOf(-122).byteValue())); 146 addToMap(MapWritable.class, 147 Byte.valueOf(Integer.valueOf(-121).byteValue())); 148 addToMap(MD5Hash.class, 149 Byte.valueOf(Integer.valueOf(-120).byteValue())); 150 addToMap(NullWritable.class, 151 Byte.valueOf(Integer.valueOf(-119).byteValue())); 152 addToMap(ObjectWritable.class, 153 Byte.valueOf(Integer.valueOf(-118).byteValue())); 154 addToMap(SortedMapWritable.class, 155 Byte.valueOf(Integer.valueOf(-117).byteValue())); 156 addToMap(Text.class, 157 Byte.valueOf(Integer.valueOf(-116).byteValue())); 158 addToMap(TwoDArrayWritable.class, 159 Byte.valueOf(Integer.valueOf(-115).byteValue())); 160 161 // UTF8 is deprecated so we don't support it 162 163 addToMap(VIntWritable.class, 164 Byte.valueOf(Integer.valueOf(-114).byteValue())); 165 addToMap(VLongWritable.class, 166 Byte.valueOf(Integer.valueOf(-113).byteValue())); 167 168 } 169 170 /** @return the conf */ 171 @Override 172 public Configuration getConf() { 173 return conf.get(); 174 } 175 176 /** @param conf the conf to set */ 177 @Override 178 public void setConf(Configuration conf) { 179 this.conf.set(conf); 180 } 181 182 @Override 183 public void write(DataOutput out) throws IOException { 184 185 // First write out the size of the class table and any classes that are 186 // "unknown" classes 187 188 out.writeByte(newClasses); 189 190 for (byte i = 1; i <= newClasses; i++) { 191 out.writeByte(i); 192 out.writeUTF(getClass(i).getName()); 193 } 194 } 195 196 @Override 197 public void readFields(DataInput in) throws IOException { 198 199 // Get the number of "unknown" classes 200 newClasses = in.readByte(); 201 202 // Use the classloader of the current thread to load classes instead of the 203 // system-classloader so as to support both client-only and inside-a-MR-job 204 // use-cases. The context-loader by default eventually falls back to the 205 // system one, so there should be no cases where changing this is an issue. 206 ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); 207 208 // Then read in the class names and add them to our tables 209 for (int i = 0; i < newClasses; i++) { 210 byte id = in.readByte(); 211 String className = in.readUTF(); 212 try { 213 addToMap(classLoader.loadClass(className), id); 214 } catch (ClassNotFoundException e) { 215 throw new IOException(e); 216 } 217 } 218 } 219}