001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.hadoop.mapreduce.lib.join;
019    
020    import java.io.ByteArrayInputStream;
021    import java.io.ByteArrayOutputStream;
022    import java.io.DataInputStream;
023    import java.io.DataOutputStream;
024    import java.io.IOException;
025    
026    import org.apache.hadoop.classification.InterfaceAudience;
027    import org.apache.hadoop.classification.InterfaceStability;
028    import org.apache.hadoop.io.Writable;
029    
030    /**
031     * This class provides an implementation of ResetableIterator. This
032     * implementation uses a byte array to store elements added to it.
033     */
034    @InterfaceAudience.Public
035    @InterfaceStability.Stable
036    public class StreamBackedIterator<X extends Writable>
037        implements ResetableIterator<X> {
038    
039      private static class ReplayableByteInputStream extends ByteArrayInputStream {
040        public ReplayableByteInputStream(byte[] arr) {
041          super(arr);
042        }
043        public void resetStream() {
044          mark = 0;
045          reset();
046        }
047      }
048    
049      private ByteArrayOutputStream outbuf = new ByteArrayOutputStream();
050      private DataOutputStream outfbuf = new DataOutputStream(outbuf);
051      private ReplayableByteInputStream inbuf;
052      private DataInputStream infbuf;
053    
054      public StreamBackedIterator() { }
055    
056      public boolean hasNext() {
057        return infbuf != null && inbuf.available() > 0;
058      }
059    
060      public boolean next(X val) throws IOException {
061        if (hasNext()) {
062          inbuf.mark(0);
063          val.readFields(infbuf);
064          return true;
065        }
066        return false;
067      }
068    
069      public boolean replay(X val) throws IOException {
070        inbuf.reset();
071        if (0 == inbuf.available())
072          return false;
073        val.readFields(infbuf);
074        return true;
075      }
076    
077      public void reset() {
078        if (null != outfbuf) {
079          inbuf = new ReplayableByteInputStream(outbuf.toByteArray());
080          infbuf =  new DataInputStream(inbuf);
081          outfbuf = null;
082        }
083        inbuf.resetStream();
084      }
085    
086      public void add(X item) throws IOException {
087        item.write(outfbuf);
088      }
089    
090      public void close() throws IOException {
091        if (null != infbuf)
092          infbuf.close();
093        if (null != outfbuf)
094          outfbuf.close();
095      }
096    
097      public void clear() {
098        if (null != inbuf)
099          inbuf.resetStream();
100        outbuf.reset();
101        outfbuf = new DataOutputStream(outbuf);
102      }
103    }