001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.mapreduce.lib.join;
020    
021    import java.io.DataOutput;
022    import java.io.DataInput;
023    import java.io.IOException;
024    import java.util.BitSet;
025    import java.util.Iterator;
026    import java.util.NoSuchElementException;
027    
028    import org.apache.hadoop.classification.InterfaceAudience;
029    import org.apache.hadoop.classification.InterfaceStability;
030    import org.apache.hadoop.io.NullWritable;
031    import org.apache.hadoop.io.Text;
032    import org.apache.hadoop.io.Writable;
033    import org.apache.hadoop.io.WritableUtils;
034    
035    /**
036     * Writable type storing multiple {@link org.apache.hadoop.io.Writable}s.
037     *
038     * This is *not* a general-purpose tuple type. In almost all cases, users are
039     * encouraged to implement their own serializable types, which can perform
040     * better validation and provide more efficient encodings than this class is
041     * capable. TupleWritable relies on the join framework for type safety and
042     * assumes its instances will rarely be persisted, assumptions not only
043     * incompatible with, but contrary to the general case.
044     *
045     * @see org.apache.hadoop.io.Writable
046     */
047    @InterfaceAudience.Public
048    @InterfaceStability.Stable
049    public class TupleWritable implements Writable, Iterable<Writable> {
050    
051      protected BitSet written;
052      private Writable[] values;
053    
054      /**
055       * Create an empty tuple with no allocated storage for writables.
056       */
057      public TupleWritable() {
058        written = new BitSet(0);
059      }
060    
061      /**
062       * Initialize tuple with storage; unknown whether any of them contain
063       * &quot;written&quot; values.
064       */
065      public TupleWritable(Writable[] vals) {
066        written = new BitSet(vals.length);
067        values = vals;
068      }
069    
070      /**
071       * Return true if tuple has an element at the position provided.
072       */
073      public boolean has(int i) {
074        return written.get(i);
075      }
076    
077      /**
078       * Get ith Writable from Tuple.
079       */
080      public Writable get(int i) {
081        return values[i];
082      }
083    
084      /**
085       * The number of children in this Tuple.
086       */
087      public int size() {
088        return values.length;
089      }
090    
091      /**
092       * {@inheritDoc}
093       */
094      public boolean equals(Object other) {
095        if (other instanceof TupleWritable) {
096          TupleWritable that = (TupleWritable)other;
097          if (!this.written.equals(that.written)) {
098            return false;
099          }
100          for (int i = 0; i < values.length; ++i) {
101            if (!has(i)) continue;
102            if (!values[i].equals(that.get(i))) {
103              return false;
104            }
105          }
106          return true;
107        }
108        return false;
109      }
110    
111      public int hashCode() {
112        assert false : "hashCode not designed";
113        return written.hashCode();
114      }
115    
116      /**
117       * Return an iterator over the elements in this tuple.
118       * Note that this doesn't flatten the tuple; one may receive tuples
119       * from this iterator.
120       */
121      public Iterator<Writable> iterator() {
122        final TupleWritable t = this;
123        return new Iterator<Writable>() {
124          int bitIndex = written.nextSetBit(0);
125          public boolean hasNext() {
126            return bitIndex >= 0;
127          }
128          public Writable next() {
129            int returnIndex = bitIndex;
130            if (returnIndex < 0)
131              throw new NoSuchElementException();
132            bitIndex = written.nextSetBit(bitIndex+1);
133            return t.get(returnIndex);
134          }
135          public void remove() {
136            if (!written.get(bitIndex)) {
137              throw new IllegalStateException(
138                "Attempt to remove non-existent val");
139            }
140            written.clear(bitIndex);
141          }
142        };
143      }
144    
145      /**
146       * Convert Tuple to String as in the following.
147       * <tt>[<child1>,<child2>,...,<childn>]</tt>
148       */
149      public String toString() {
150        StringBuffer buf = new StringBuffer("[");
151        for (int i = 0; i < values.length; ++i) {
152          buf.append(has(i) ? values[i].toString() : "");
153          buf.append(",");
154        }
155        if (values.length != 0)
156          buf.setCharAt(buf.length() - 1, ']');
157        else
158          buf.append(']');
159        return buf.toString();
160      }
161    
162      // Writable
163    
164      /** Writes each Writable to <code>out</code>.
165       * TupleWritable format:
166       * {@code
167       *  <count><type1><type2>...<typen><obj1><obj2>...<objn>
168       * }
169       */
170      public void write(DataOutput out) throws IOException {
171        WritableUtils.writeVInt(out, values.length);
172        writeBitSet(out, values.length, written);
173        for (int i = 0; i < values.length; ++i) {
174          Text.writeString(out, values[i].getClass().getName());
175        }
176        for (int i = 0; i < values.length; ++i) {
177          if (has(i)) {
178            values[i].write(out);
179          }
180        }
181      }
182    
183      /**
184       * {@inheritDoc}
185       */
186      @SuppressWarnings("unchecked") // No static typeinfo on Tuples
187      public void readFields(DataInput in) throws IOException {
188        int card = WritableUtils.readVInt(in);
189        values = new Writable[card];
190        readBitSet(in, card, written);
191        Class<? extends Writable>[] cls = new Class[card];
192        try {
193          for (int i = 0; i < card; ++i) {
194            cls[i] = Class.forName(Text.readString(in)).asSubclass(Writable.class);
195          }
196          for (int i = 0; i < card; ++i) {
197            if (cls[i].equals(NullWritable.class)) {
198              values[i] = NullWritable.get();
199            } else {
200              values[i] = cls[i].newInstance();
201            }
202            if (has(i)) {
203              values[i].readFields(in);
204            }
205          }
206        } catch (ClassNotFoundException e) {
207          throw new IOException("Failed tuple init", e);
208        } catch (IllegalAccessException e) {
209          throw new IOException("Failed tuple init", e);
210        } catch (InstantiationException e) {
211          throw new IOException("Failed tuple init", e);
212        }
213      }
214    
215      /**
216       * Record that the tuple contains an element at the position provided.
217       */
218      void setWritten(int i) {
219        written.set(i);
220      }
221    
222      /**
223       * Record that the tuple does not contain an element at the position
224       * provided.
225       */
226      void clearWritten(int i) {
227        written.clear(i);
228      }
229    
230      /**
231       * Clear any record of which writables have been written to, without
232       * releasing storage.
233       */
234      void clearWritten() {
235        written.clear();
236      }
237    
238      /**
239       * Writes the bit set to the stream. The first 64 bit-positions of the bit
240       * set are written as a VLong for backwards-compatibility with older 
241       * versions of TupleWritable. All bit-positions >= 64 are encoded as a byte
242       * for every 8 bit-positions.
243       */
244      private static final void writeBitSet(DataOutput stream, int nbits,
245          BitSet bitSet) throws IOException {
246        long bits = 0L;
247            
248        int bitSetIndex = bitSet.nextSetBit(0);
249        for (;bitSetIndex >= 0 && bitSetIndex < Long.SIZE;
250                bitSetIndex=bitSet.nextSetBit(bitSetIndex+1)) {
251          bits |= 1L << bitSetIndex;
252        }
253        WritableUtils.writeVLong(stream,bits);
254        
255        if (nbits > Long.SIZE) {
256          bits = 0L;
257          for (int lastWordWritten = 0; bitSetIndex >= 0 && bitSetIndex < nbits; 
258                  bitSetIndex = bitSet.nextSetBit(bitSetIndex+1)) {
259            int bitsIndex = bitSetIndex % Byte.SIZE;
260            int word = (bitSetIndex-Long.SIZE) / Byte.SIZE;
261            if (word > lastWordWritten) {
262              stream.writeByte((byte)bits);
263              bits = 0L;
264              for (lastWordWritten++;lastWordWritten<word;lastWordWritten++) {
265                stream.writeByte((byte)bits);
266              }
267            }
268            bits |= 1L << bitsIndex;
269          }
270          stream.writeByte((byte)bits);
271        }
272      }
273    
274      /**
275       * Reads a bitset from the stream that has been written with
276       * {@link #writeBitSet(DataOutput, int, BitSet)}.
277       */
278      private static final void readBitSet(DataInput stream, int nbits, 
279          BitSet bitSet) throws IOException {
280        bitSet.clear();
281        long initialBits = WritableUtils.readVLong(stream);
282        long last = 0L;
283        while (0L != initialBits) {
284          last = Long.lowestOneBit(initialBits);
285          initialBits ^= last;
286          bitSet.set(Long.numberOfTrailingZeros(last));
287        }
288        
289        for (int offset=Long.SIZE; offset < nbits; offset+=Byte.SIZE) {
290          byte bits = stream.readByte();
291          while (0 != bits) {
292            last = Long.lowestOneBit(bits);
293            bits ^= last;
294            bitSet.set(Long.numberOfTrailingZeros(last) + offset);
295          }
296        }
297      }
298    }