001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.io;
019
020import java.io.DataInput;
021import java.io.DataOutput;
022import java.io.IOException;
023import java.util.Map;
024import java.util.concurrent.ConcurrentHashMap;
025import java.util.concurrent.atomic.AtomicReference;
026
027import org.apache.hadoop.classification.InterfaceAudience;
028import org.apache.hadoop.classification.InterfaceStability;
029import org.apache.hadoop.conf.Configurable;
030import org.apache.hadoop.conf.Configuration;
031
032import com.google.common.annotations.VisibleForTesting;
033
034/**
035 * Abstract base class for MapWritable and SortedMapWritable
036 * 
037 * Unlike org.apache.nutch.crawl.MapWritable, this class allows creation of
038 * MapWritable<Writable, MapWritable> so the CLASS_TO_ID and ID_TO_CLASS
039 * maps travel with the class instead of being static.
040 * 
041 * Class ids range from 1 to 127 so there can be at most 127 distinct classes
042 * in any specific map instance.
043 */
044@InterfaceAudience.Public
045@InterfaceStability.Stable
046public abstract class AbstractMapWritable implements Writable, Configurable {
047  private AtomicReference<Configuration> conf;
048  
049  /* Class to id mappings */
050  @VisibleForTesting
051  Map<Class<?>, Byte> classToIdMap = new ConcurrentHashMap<Class<?>, Byte>();
052  
053  /* Id to Class mappings */
054  @VisibleForTesting
055  Map<Byte, Class<?>> idToClassMap = new ConcurrentHashMap<Byte, Class<?>>();
056  
057  /* The number of new classes (those not established by the constructor) */
058  private volatile byte newClasses = 0;
059  
060  /** @return the number of known classes */
061  byte getNewClasses() {
062    return newClasses;
063  }
064
065  /**
066   * Used to add "predefined" classes and by Writable to copy "new" classes.
067   */
068  private synchronized void addToMap(Class<?> clazz, byte id) {
069    if (classToIdMap.containsKey(clazz)) {
070      byte b = classToIdMap.get(clazz);
071      if (b != id) {
072        throw new IllegalArgumentException ("Class " + clazz.getName() +
073          " already registered but maps to " + b + " and not " + id);
074      }
075    }
076    if (idToClassMap.containsKey(id)) {
077      Class<?> c = idToClassMap.get(id);
078      if (!c.equals(clazz)) {
079        throw new IllegalArgumentException("Id " + id + " exists but maps to " +
080            c.getName() + " and not " + clazz.getName());
081      }
082    }
083    classToIdMap.put(clazz, id);
084    idToClassMap.put(id, clazz);
085  }
086  
087  /** Add a Class to the maps if it is not already present. */ 
088  protected synchronized void addToMap(Class<?> clazz) {
089    if (classToIdMap.containsKey(clazz)) {
090      return;
091    }
092    if (newClasses + 1 > Byte.MAX_VALUE) {
093      throw new IndexOutOfBoundsException("adding an additional class would" +
094      " exceed the maximum number allowed");
095    }
096    byte id = ++newClasses;
097    addToMap(clazz, id);
098  }
099
100  /** @return the Class class for the specified id */
101  protected Class<?> getClass(byte id) {
102    return idToClassMap.get(id);
103  }
104
105  /** @return the id for the specified Class */
106  protected byte getId(Class<?> clazz) {
107    return classToIdMap.containsKey(clazz) ? classToIdMap.get(clazz) : -1;
108  }
109
110  /** Used by child copy constructors. */
111  protected synchronized void copy(Writable other) {
112    if (other != null) {
113      try {
114        DataOutputBuffer out = new DataOutputBuffer();
115        other.write(out);
116        DataInputBuffer in = new DataInputBuffer();
117        in.reset(out.getData(), out.getLength());
118        readFields(in);
119
120      } catch (IOException e) {
121        throw new IllegalArgumentException("map cannot be copied: " +
122            e.getMessage());
123      }
124
125    } else {
126      throw new IllegalArgumentException("source map cannot be null");
127    }
128  }
129
130  /** constructor. */
131  protected AbstractMapWritable() {
132    this.conf = new AtomicReference<Configuration>();
133
134    addToMap(ArrayWritable.class, (byte)-127);
135    addToMap(BooleanWritable.class, (byte)-126);
136    addToMap(BytesWritable.class, (byte)-125);
137    addToMap(FloatWritable.class, (byte)-124);
138    addToMap(IntWritable.class, (byte)-123);
139    addToMap(LongWritable.class, (byte)-122);
140    addToMap(MapWritable.class, (byte)-121);
141    addToMap(MD5Hash.class, (byte)-120);
142    addToMap(NullWritable.class, (byte)-119);
143    addToMap(ObjectWritable.class, (byte)-118);
144    addToMap(SortedMapWritable.class, (byte)-117);
145    addToMap(Text.class, (byte)-116);
146    addToMap(TwoDArrayWritable.class, (byte)-115);
147
148    // UTF8 is deprecated so we don't support it
149
150    addToMap(VIntWritable.class, (byte)-114);
151    addToMap(VLongWritable.class, (byte)-113);
152  }
153
154  /** @return the conf */
155  @Override
156  public Configuration getConf() {
157    return conf.get();
158  }
159
160  /** @param conf the conf to set */
161  @Override
162  public void setConf(Configuration conf) {
163    this.conf.set(conf);
164  }
165  
166  @Override
167  public void write(DataOutput out) throws IOException {
168    
169    // First write out the size of the class table and any classes that are
170    // "unknown" classes
171    
172    out.writeByte(newClasses);
173
174    for (byte i = 1; i <= newClasses; i++) {
175      out.writeByte(i);
176      out.writeUTF(getClass(i).getName());
177    }
178  }
179  
180  @Override
181  public void readFields(DataInput in) throws IOException {
182    
183    // Get the number of "unknown" classes
184    newClasses = in.readByte();
185
186    // Use the classloader of the current thread to load classes instead of the
187    // system-classloader so as to support both client-only and inside-a-MR-job
188    // use-cases. The context-loader by default eventually falls back to the
189    // system one, so there should be no cases where changing this is an issue.
190    ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
191
192    // Then read in the class names and add them to our tables
193    for (int i = 0; i < newClasses; i++) {
194      byte id = in.readByte();
195      String className = in.readUTF();
196      try {
197        addToMap(classLoader.loadClass(className), id);
198      } catch (ClassNotFoundException e) {
199        throw new IOException(e);
200      }
201    }
202  }    
203}