001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.io; 019 020import java.io.DataInput; 021import java.io.DataOutput; 022import java.io.IOException; 023import java.util.Map; 024import java.util.concurrent.ConcurrentHashMap; 025import java.util.concurrent.atomic.AtomicReference; 026 027import org.apache.hadoop.classification.InterfaceAudience; 028import org.apache.hadoop.classification.InterfaceStability; 029import org.apache.hadoop.conf.Configurable; 030import org.apache.hadoop.conf.Configuration; 031 032import com.google.common.annotations.VisibleForTesting; 033 034/** 035 * Abstract base class for MapWritable and SortedMapWritable 036 * 037 * Unlike org.apache.nutch.crawl.MapWritable, this class allows creation of 038 * MapWritable<Writable, MapWritable> so the CLASS_TO_ID and ID_TO_CLASS 039 * maps travel with the class instead of being static. 040 * 041 * Class ids range from 1 to 127 so there can be at most 127 distinct classes 042 * in any specific map instance. 043 */ 044@InterfaceAudience.Public 045@InterfaceStability.Stable 046public abstract class AbstractMapWritable implements Writable, Configurable { 047 private AtomicReference<Configuration> conf; 048 049 /* Class to id mappings */ 050 @VisibleForTesting 051 Map<Class<?>, Byte> classToIdMap = new ConcurrentHashMap<Class<?>, Byte>(); 052 053 /* Id to Class mappings */ 054 @VisibleForTesting 055 Map<Byte, Class<?>> idToClassMap = new ConcurrentHashMap<Byte, Class<?>>(); 056 057 /* The number of new classes (those not established by the constructor) */ 058 private volatile byte newClasses = 0; 059 060 /** @return the number of known classes */ 061 byte getNewClasses() { 062 return newClasses; 063 } 064 065 /** 066 * Used to add "predefined" classes and by Writable to copy "new" classes. 067 */ 068 private synchronized void addToMap(Class<?> clazz, byte id) { 069 if (classToIdMap.containsKey(clazz)) { 070 byte b = classToIdMap.get(clazz); 071 if (b != id) { 072 throw new IllegalArgumentException ("Class " + clazz.getName() + 073 " already registered but maps to " + b + " and not " + id); 074 } 075 } 076 if (idToClassMap.containsKey(id)) { 077 Class<?> c = idToClassMap.get(id); 078 if (!c.equals(clazz)) { 079 throw new IllegalArgumentException("Id " + id + " exists but maps to " + 080 c.getName() + " and not " + clazz.getName()); 081 } 082 } 083 classToIdMap.put(clazz, id); 084 idToClassMap.put(id, clazz); 085 } 086 087 /** Add a Class to the maps if it is not already present. */ 088 protected synchronized void addToMap(Class<?> clazz) { 089 if (classToIdMap.containsKey(clazz)) { 090 return; 091 } 092 if (newClasses + 1 > Byte.MAX_VALUE) { 093 throw new IndexOutOfBoundsException("adding an additional class would" + 094 " exceed the maximum number allowed"); 095 } 096 byte id = ++newClasses; 097 addToMap(clazz, id); 098 } 099 100 /** @return the Class class for the specified id */ 101 protected Class<?> getClass(byte id) { 102 return idToClassMap.get(id); 103 } 104 105 /** @return the id for the specified Class */ 106 protected byte getId(Class<?> clazz) { 107 return classToIdMap.containsKey(clazz) ? classToIdMap.get(clazz) : -1; 108 } 109 110 /** Used by child copy constructors. */ 111 protected synchronized void copy(Writable other) { 112 if (other != null) { 113 try { 114 DataOutputBuffer out = new DataOutputBuffer(); 115 other.write(out); 116 DataInputBuffer in = new DataInputBuffer(); 117 in.reset(out.getData(), out.getLength()); 118 readFields(in); 119 120 } catch (IOException e) { 121 throw new IllegalArgumentException("map cannot be copied: " + 122 e.getMessage()); 123 } 124 125 } else { 126 throw new IllegalArgumentException("source map cannot be null"); 127 } 128 } 129 130 /** constructor. */ 131 protected AbstractMapWritable() { 132 this.conf = new AtomicReference<Configuration>(); 133 134 addToMap(ArrayWritable.class, (byte)-127); 135 addToMap(BooleanWritable.class, (byte)-126); 136 addToMap(BytesWritable.class, (byte)-125); 137 addToMap(FloatWritable.class, (byte)-124); 138 addToMap(IntWritable.class, (byte)-123); 139 addToMap(LongWritable.class, (byte)-122); 140 addToMap(MapWritable.class, (byte)-121); 141 addToMap(MD5Hash.class, (byte)-120); 142 addToMap(NullWritable.class, (byte)-119); 143 addToMap(ObjectWritable.class, (byte)-118); 144 addToMap(SortedMapWritable.class, (byte)-117); 145 addToMap(Text.class, (byte)-116); 146 addToMap(TwoDArrayWritable.class, (byte)-115); 147 148 // UTF8 is deprecated so we don't support it 149 150 addToMap(VIntWritable.class, (byte)-114); 151 addToMap(VLongWritable.class, (byte)-113); 152 } 153 154 /** @return the conf */ 155 @Override 156 public Configuration getConf() { 157 return conf.get(); 158 } 159 160 /** @param conf the conf to set */ 161 @Override 162 public void setConf(Configuration conf) { 163 this.conf.set(conf); 164 } 165 166 @Override 167 public void write(DataOutput out) throws IOException { 168 169 // First write out the size of the class table and any classes that are 170 // "unknown" classes 171 172 out.writeByte(newClasses); 173 174 for (byte i = 1; i <= newClasses; i++) { 175 out.writeByte(i); 176 out.writeUTF(getClass(i).getName()); 177 } 178 } 179 180 @Override 181 public void readFields(DataInput in) throws IOException { 182 183 // Get the number of "unknown" classes 184 newClasses = in.readByte(); 185 186 // Use the classloader of the current thread to load classes instead of the 187 // system-classloader so as to support both client-only and inside-a-MR-job 188 // use-cases. The context-loader by default eventually falls back to the 189 // system one, so there should be no cases where changing this is an issue. 190 ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); 191 192 // Then read in the class names and add them to our tables 193 for (int i = 0; i < newClasses; i++) { 194 byte id = in.readByte(); 195 String className = in.readUTF(); 196 try { 197 addToMap(classLoader.loadClass(className), id); 198 } catch (ClassNotFoundException e) { 199 throw new IOException(e); 200 } 201 } 202 } 203}