001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.io;
019
020import java.io.DataInput;
021import java.io.DataOutput;
022import java.io.IOException;
023import java.util.Map;
024import java.util.concurrent.ConcurrentHashMap;
025import java.util.concurrent.atomic.AtomicReference;
026
027import org.apache.hadoop.classification.InterfaceAudience;
028import org.apache.hadoop.classification.InterfaceStability;
029import org.apache.hadoop.conf.Configurable;
030import org.apache.hadoop.conf.Configuration;
031
032import com.google.common.annotations.VisibleForTesting;
033
034/**
035 * Abstract base class for MapWritable and SortedMapWritable
036 * 
037 * Unlike org.apache.nutch.crawl.MapWritable, this class allows creation of
038 * MapWritable<Writable, MapWritable> so the CLASS_TO_ID and ID_TO_CLASS
039 * maps travel with the class instead of being static.
040 * 
041 * Class ids range from 1 to 127 so there can be at most 127 distinct classes
042 * in any specific map instance.
043 */
044@InterfaceAudience.Public
045@InterfaceStability.Stable
046public abstract class AbstractMapWritable implements Writable, Configurable {
047  private AtomicReference<Configuration> conf;
048  
049  /* Class to id mappings */
050  @VisibleForTesting
051  Map<Class<?>, Byte> classToIdMap = new ConcurrentHashMap<Class<?>, Byte>();
052  
053  /* Id to Class mappings */
054  @VisibleForTesting
055  Map<Byte, Class<?>> idToClassMap = new ConcurrentHashMap<Byte, Class<?>>();
056  
057  /* The number of new classes (those not established by the constructor) */
058  private volatile byte newClasses = 0;
059  
060  /** @return the number of known classes */
061  byte getNewClasses() {
062    return newClasses;
063  }
064
065  /**
066   * Used to add "predefined" classes and by Writable to copy "new" classes.
067   */
068  private synchronized void addToMap(Class<?> clazz, byte id) {
069    if (classToIdMap.containsKey(clazz)) {
070      byte b = classToIdMap.get(clazz);
071      if (b != id) {
072        throw new IllegalArgumentException ("Class " + clazz.getName() +
073          " already registered but maps to " + b + " and not " + id);
074      }
075    }
076    if (idToClassMap.containsKey(id)) {
077      Class<?> c = idToClassMap.get(id);
078      if (!c.equals(clazz)) {
079        throw new IllegalArgumentException("Id " + id + " exists but maps to " +
080            c.getName() + " and not " + clazz.getName());
081      }
082    }
083    classToIdMap.put(clazz, id);
084    idToClassMap.put(id, clazz);
085  }
086  
087  /** Add a Class to the maps if it is not already present. */ 
088  protected synchronized void addToMap(Class<?> clazz) {
089    if (classToIdMap.containsKey(clazz)) {
090      return;
091    }
092    if (newClasses + 1 > Byte.MAX_VALUE) {
093      throw new IndexOutOfBoundsException("adding an additional class would" +
094      " exceed the maximum number allowed");
095    }
096    byte id = ++newClasses;
097    addToMap(clazz, id);
098  }
099
100  /** @return the Class class for the specified id */
101  protected Class<?> getClass(byte id) {
102    return idToClassMap.get(id);
103  }
104
105  /** @return the id for the specified Class */
106  protected byte getId(Class<?> clazz) {
107    return classToIdMap.containsKey(clazz) ? classToIdMap.get(clazz) : -1;
108  }
109
110  /** Used by child copy constructors. */
111  protected synchronized void copy(Writable other) {
112    if (other != null) {
113      try {
114        DataOutputBuffer out = new DataOutputBuffer();
115        other.write(out);
116        DataInputBuffer in = new DataInputBuffer();
117        in.reset(out.getData(), out.getLength());
118        readFields(in);
119        
120      } catch (IOException e) {
121        throw new IllegalArgumentException("map cannot be copied: " +
122            e.getMessage());
123      }
124      
125    } else {
126      throw new IllegalArgumentException("source map cannot be null");
127    }
128  }
129  
130  /** constructor. */
131  protected AbstractMapWritable() {
132    this.conf = new AtomicReference<Configuration>();
133
134    addToMap(ArrayWritable.class,
135        Byte.valueOf(Integer.valueOf(-127).byteValue())); 
136    addToMap(BooleanWritable.class,
137        Byte.valueOf(Integer.valueOf(-126).byteValue()));
138    addToMap(BytesWritable.class,
139        Byte.valueOf(Integer.valueOf(-125).byteValue()));
140    addToMap(FloatWritable.class,
141        Byte.valueOf(Integer.valueOf(-124).byteValue()));
142    addToMap(IntWritable.class,
143        Byte.valueOf(Integer.valueOf(-123).byteValue()));
144    addToMap(LongWritable.class,
145        Byte.valueOf(Integer.valueOf(-122).byteValue()));
146    addToMap(MapWritable.class,
147        Byte.valueOf(Integer.valueOf(-121).byteValue()));
148    addToMap(MD5Hash.class,
149        Byte.valueOf(Integer.valueOf(-120).byteValue()));
150    addToMap(NullWritable.class,
151        Byte.valueOf(Integer.valueOf(-119).byteValue()));
152    addToMap(ObjectWritable.class,
153        Byte.valueOf(Integer.valueOf(-118).byteValue()));
154    addToMap(SortedMapWritable.class,
155        Byte.valueOf(Integer.valueOf(-117).byteValue()));
156    addToMap(Text.class,
157        Byte.valueOf(Integer.valueOf(-116).byteValue()));
158    addToMap(TwoDArrayWritable.class,
159        Byte.valueOf(Integer.valueOf(-115).byteValue()));
160    
161    // UTF8 is deprecated so we don't support it
162
163    addToMap(VIntWritable.class,
164        Byte.valueOf(Integer.valueOf(-114).byteValue()));
165    addToMap(VLongWritable.class,
166        Byte.valueOf(Integer.valueOf(-113).byteValue()));
167
168  }
169
170  /** @return the conf */
171  @Override
172  public Configuration getConf() {
173    return conf.get();
174  }
175
176  /** @param conf the conf to set */
177  @Override
178  public void setConf(Configuration conf) {
179    this.conf.set(conf);
180  }
181  
182  @Override
183  public void write(DataOutput out) throws IOException {
184    
185    // First write out the size of the class table and any classes that are
186    // "unknown" classes
187    
188    out.writeByte(newClasses);
189
190    for (byte i = 1; i <= newClasses; i++) {
191      out.writeByte(i);
192      out.writeUTF(getClass(i).getName());
193    }
194  }
195  
196  @Override
197  public void readFields(DataInput in) throws IOException {
198    
199    // Get the number of "unknown" classes
200    newClasses = in.readByte();
201
202    // Use the classloader of the current thread to load classes instead of the
203    // system-classloader so as to support both client-only and inside-a-MR-job
204    // use-cases. The context-loader by default eventually falls back to the
205    // system one, so there should be no cases where changing this is an issue.
206    ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
207
208    // Then read in the class names and add them to our tables
209    for (int i = 0; i < newClasses; i++) {
210      byte id = in.readByte();
211      String className = in.readUTF();
212      try {
213        addToMap(classLoader.loadClass(className), id);
214      } catch (ClassNotFoundException e) {
215        throw new IOException(e);
216      }
217    }
218  }    
219}