001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.hadoop.mapreduce.lib.join;
019
020 import java.io.ByteArrayInputStream;
021 import java.io.ByteArrayOutputStream;
022 import java.io.DataInputStream;
023 import java.io.DataOutputStream;
024 import java.io.IOException;
025
026 import org.apache.hadoop.classification.InterfaceAudience;
027 import org.apache.hadoop.classification.InterfaceStability;
028 import org.apache.hadoop.io.Writable;
029
030 /**
031 * This class provides an implementation of ResetableIterator. This
032 * implementation uses a byte array to store elements added to it.
033 */
034 @InterfaceAudience.Public
035 @InterfaceStability.Stable
036 public class StreamBackedIterator<X extends Writable>
037 implements ResetableIterator<X> {
038
039 private static class ReplayableByteInputStream extends ByteArrayInputStream {
040 public ReplayableByteInputStream(byte[] arr) {
041 super(arr);
042 }
043 public void resetStream() {
044 mark = 0;
045 reset();
046 }
047 }
048
049 private ByteArrayOutputStream outbuf = new ByteArrayOutputStream();
050 private DataOutputStream outfbuf = new DataOutputStream(outbuf);
051 private ReplayableByteInputStream inbuf;
052 private DataInputStream infbuf;
053
054 public StreamBackedIterator() { }
055
056 public boolean hasNext() {
057 return infbuf != null && inbuf.available() > 0;
058 }
059
060 public boolean next(X val) throws IOException {
061 if (hasNext()) {
062 inbuf.mark(0);
063 val.readFields(infbuf);
064 return true;
065 }
066 return false;
067 }
068
069 public boolean replay(X val) throws IOException {
070 inbuf.reset();
071 if (0 == inbuf.available())
072 return false;
073 val.readFields(infbuf);
074 return true;
075 }
076
077 public void reset() {
078 if (null != outfbuf) {
079 inbuf = new ReplayableByteInputStream(outbuf.toByteArray());
080 infbuf = new DataInputStream(inbuf);
081 outfbuf = null;
082 }
083 inbuf.resetStream();
084 }
085
086 public void add(X item) throws IOException {
087 item.write(outfbuf);
088 }
089
090 public void close() throws IOException {
091 if (null != infbuf)
092 infbuf.close();
093 if (null != outfbuf)
094 outfbuf.close();
095 }
096
097 public void clear() {
098 if (null != inbuf)
099 inbuf.resetStream();
100 outbuf.reset();
101 outfbuf = new DataOutputStream(outbuf);
102 }
103 }