001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.mapreduce.lib.join;
020
021import java.io.DataOutput;
022import java.io.DataInput;
023import java.io.IOException;
024import java.util.BitSet;
025import java.util.Iterator;
026import java.util.NoSuchElementException;
027
028import org.apache.hadoop.classification.InterfaceAudience;
029import org.apache.hadoop.classification.InterfaceStability;
030import org.apache.hadoop.io.NullWritable;
031import org.apache.hadoop.io.Text;
032import org.apache.hadoop.io.Writable;
033import org.apache.hadoop.io.WritableUtils;
034
035/**
036 * Writable type storing multiple {@link org.apache.hadoop.io.Writable}s.
037 *
038 * This is *not* a general-purpose tuple type. In almost all cases, users are
039 * encouraged to implement their own serializable types, which can perform
040 * better validation and provide more efficient encodings than this class is
041 * capable. TupleWritable relies on the join framework for type safety and
042 * assumes its instances will rarely be persisted, assumptions not only
043 * incompatible with, but contrary to the general case.
044 *
045 * @see org.apache.hadoop.io.Writable
046 */
047@InterfaceAudience.Public
048@InterfaceStability.Stable
049public class TupleWritable implements Writable, Iterable<Writable> {
050
051  protected BitSet written;
052  private Writable[] values;
053
054  /**
055   * Create an empty tuple with no allocated storage for writables.
056   */
057  public TupleWritable() {
058    written = new BitSet(0);
059  }
060
061  /**
062   * Initialize tuple with storage; unknown whether any of them contain
063   * &quot;written&quot; values.
064   */
065  public TupleWritable(Writable[] vals) {
066    written = new BitSet(vals.length);
067    values = vals;
068  }
069
070  /**
071   * Return true if tuple has an element at the position provided.
072   */
073  public boolean has(int i) {
074    return written.get(i);
075  }
076
077  /**
078   * Get ith Writable from Tuple.
079   */
080  public Writable get(int i) {
081    return values[i];
082  }
083
084  /**
085   * The number of children in this Tuple.
086   */
087  public int size() {
088    return values.length;
089  }
090
091  /**
092   * {@inheritDoc}
093   */
094  public boolean equals(Object other) {
095    if (other instanceof TupleWritable) {
096      TupleWritable that = (TupleWritable)other;
097      if (!this.written.equals(that.written)) {
098        return false;
099      }
100      for (int i = 0; i < values.length; ++i) {
101        if (!has(i)) continue;
102        if (!values[i].equals(that.get(i))) {
103          return false;
104        }
105      }
106      return true;
107    }
108    return false;
109  }
110
111  public int hashCode() {
112    assert false : "hashCode not designed";
113    return written.hashCode();
114  }
115
116  /**
117   * Return an iterator over the elements in this tuple.
118   * Note that this doesn't flatten the tuple; one may receive tuples
119   * from this iterator.
120   */
121  public Iterator<Writable> iterator() {
122    final TupleWritable t = this;
123    return new Iterator<Writable>() {
124      int bitIndex = written.nextSetBit(0);
125      public boolean hasNext() {
126        return bitIndex >= 0;
127      }
128      public Writable next() {
129        int returnIndex = bitIndex;
130        if (returnIndex < 0)
131          throw new NoSuchElementException();
132        bitIndex = written.nextSetBit(bitIndex+1);
133        return t.get(returnIndex);
134      }
135      public void remove() {
136        if (!written.get(bitIndex)) {
137          throw new IllegalStateException(
138            "Attempt to remove non-existent val");
139        }
140        written.clear(bitIndex);
141      }
142    };
143  }
144
145  /**
146   * Convert Tuple to String as in the following.
147   * <tt>[&lt;child1&gt;,&lt;child2&gt;,...,&lt;childn&gt;]</tt>
148   */
149  public String toString() {
150    StringBuffer buf = new StringBuffer("[");
151    for (int i = 0; i < values.length; ++i) {
152      buf.append(has(i) ? values[i].toString() : "");
153      buf.append(",");
154    }
155    if (values.length != 0)
156      buf.setCharAt(buf.length() - 1, ']');
157    else
158      buf.append(']');
159    return buf.toString();
160  }
161
162  // Writable
163
164  /** Writes each Writable to <code>out</code>.
165   * TupleWritable format:
166   * {@code
167   *  <count><type1><type2>...<typen><obj1><obj2>...<objn>
168   * }
169   */
170  public void write(DataOutput out) throws IOException {
171    WritableUtils.writeVInt(out, values.length);
172    writeBitSet(out, values.length, written);
173    for (int i = 0; i < values.length; ++i) {
174      Text.writeString(out, values[i].getClass().getName());
175    }
176    for (int i = 0; i < values.length; ++i) {
177      if (has(i)) {
178        values[i].write(out);
179      }
180    }
181  }
182
183  /**
184   * {@inheritDoc}
185   */
186  @SuppressWarnings("unchecked") // No static typeinfo on Tuples
187  public void readFields(DataInput in) throws IOException {
188    int card = WritableUtils.readVInt(in);
189    values = new Writable[card];
190    readBitSet(in, card, written);
191    Class<? extends Writable>[] cls = new Class[card];
192    try {
193      for (int i = 0; i < card; ++i) {
194        cls[i] = Class.forName(Text.readString(in)).asSubclass(Writable.class);
195      }
196      for (int i = 0; i < card; ++i) {
197        if (cls[i].equals(NullWritable.class)) {
198          values[i] = NullWritable.get();
199        } else {
200          values[i] = cls[i].newInstance();
201        }
202        if (has(i)) {
203          values[i].readFields(in);
204        }
205      }
206    } catch (ClassNotFoundException e) {
207      throw new IOException("Failed tuple init", e);
208    } catch (IllegalAccessException e) {
209      throw new IOException("Failed tuple init", e);
210    } catch (InstantiationException e) {
211      throw new IOException("Failed tuple init", e);
212    }
213  }
214
215  /**
216   * Record that the tuple contains an element at the position provided.
217   */
218  void setWritten(int i) {
219    written.set(i);
220  }
221
222  /**
223   * Record that the tuple does not contain an element at the position
224   * provided.
225   */
226  void clearWritten(int i) {
227    written.clear(i);
228  }
229
230  /**
231   * Clear any record of which writables have been written to, without
232   * releasing storage.
233   */
234  void clearWritten() {
235    written.clear();
236  }
237
238  /**
239   * Writes the bit set to the stream. The first 64 bit-positions of the bit
240   * set are written as a VLong for backwards-compatibility with older 
241   * versions of TupleWritable. All bit-positions >= 64 are encoded as a byte
242   * for every 8 bit-positions.
243   */
244  private static final void writeBitSet(DataOutput stream, int nbits,
245      BitSet bitSet) throws IOException {
246    long bits = 0L;
247        
248    int bitSetIndex = bitSet.nextSetBit(0);
249    for (;bitSetIndex >= 0 && bitSetIndex < Long.SIZE;
250            bitSetIndex=bitSet.nextSetBit(bitSetIndex+1)) {
251      bits |= 1L << bitSetIndex;
252    }
253    WritableUtils.writeVLong(stream,bits);
254    
255    if (nbits > Long.SIZE) {
256      bits = 0L;
257      for (int lastWordWritten = 0; bitSetIndex >= 0 && bitSetIndex < nbits; 
258              bitSetIndex = bitSet.nextSetBit(bitSetIndex+1)) {
259        int bitsIndex = bitSetIndex % Byte.SIZE;
260        int word = (bitSetIndex-Long.SIZE) / Byte.SIZE;
261        if (word > lastWordWritten) {
262          stream.writeByte((byte)bits);
263          bits = 0L;
264          for (lastWordWritten++;lastWordWritten<word;lastWordWritten++) {
265            stream.writeByte((byte)bits);
266          }
267        }
268        bits |= 1L << bitsIndex;
269      }
270      stream.writeByte((byte)bits);
271    }
272  }
273
274  /**
275   * Reads a bitset from the stream that has been written with
276   * {@link #writeBitSet(DataOutput, int, BitSet)}.
277   */
278  private static final void readBitSet(DataInput stream, int nbits, 
279      BitSet bitSet) throws IOException {
280    bitSet.clear();
281    long initialBits = WritableUtils.readVLong(stream);
282    long last = 0L;
283    while (0L != initialBits) {
284      last = Long.lowestOneBit(initialBits);
285      initialBits ^= last;
286      bitSet.set(Long.numberOfTrailingZeros(last));
287    }
288    
289    for (int offset=Long.SIZE; offset < nbits; offset+=Byte.SIZE) {
290      byte bits = stream.readByte();
291      while (0 != bits) {
292        last = Long.lowestOneBit(bits);
293        bits ^= last;
294        bitSet.set(Long.numberOfTrailingZeros(last) + offset);
295      }
296    }
297  }
298}