001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.mapred.join;
020
021 import java.io.DataInput;
022 import java.io.DataOutput;
023 import java.io.IOException;
024 import java.util.HashSet;
025
026 import org.apache.hadoop.classification.InterfaceAudience;
027 import org.apache.hadoop.classification.InterfaceStability;
028 import org.apache.hadoop.io.Text;
029 import org.apache.hadoop.io.WritableUtils;
030 import org.apache.hadoop.mapred.InputSplit;
031 import org.apache.hadoop.util.ReflectionUtils;
032
033 /**
034 * This InputSplit contains a set of child InputSplits. Any InputSplit inserted
035 * into this collection must have a public default constructor.
036 */
037 @InterfaceAudience.Public
038 @InterfaceStability.Stable
039 public class CompositeInputSplit implements InputSplit {
040
041 private int fill = 0;
042 private long totsize = 0L;
043 private InputSplit[] splits;
044
045 public CompositeInputSplit() { }
046
047 public CompositeInputSplit(int capacity) {
048 splits = new InputSplit[capacity];
049 }
050
051 /**
052 * Add an InputSplit to this collection.
053 * @throws IOException If capacity was not specified during construction
054 * or if capacity has been reached.
055 */
056 public void add(InputSplit s) throws IOException {
057 if (null == splits) {
058 throw new IOException("Uninitialized InputSplit");
059 }
060 if (fill == splits.length) {
061 throw new IOException("Too many splits");
062 }
063 splits[fill++] = s;
064 totsize += s.getLength();
065 }
066
067 /**
068 * Get ith child InputSplit.
069 */
070 public InputSplit get(int i) {
071 return splits[i];
072 }
073
074 /**
075 * Return the aggregate length of all child InputSplits currently added.
076 */
077 public long getLength() throws IOException {
078 return totsize;
079 }
080
081 /**
082 * Get the length of ith child InputSplit.
083 */
084 public long getLength(int i) throws IOException {
085 return splits[i].getLength();
086 }
087
088 /**
089 * Collect a set of hosts from all child InputSplits.
090 */
091 public String[] getLocations() throws IOException {
092 HashSet<String> hosts = new HashSet<String>();
093 for (InputSplit s : splits) {
094 String[] hints = s.getLocations();
095 if (hints != null && hints.length > 0) {
096 for (String host : hints) {
097 hosts.add(host);
098 }
099 }
100 }
101 return hosts.toArray(new String[hosts.size()]);
102 }
103
104 /**
105 * getLocations from ith InputSplit.
106 */
107 public String[] getLocation(int i) throws IOException {
108 return splits[i].getLocations();
109 }
110
111 /**
112 * Write splits in the following format.
113 * {@code
114 * <count><class1><class2>...<classn><split1><split2>...<splitn>
115 * }
116 */
117 public void write(DataOutput out) throws IOException {
118 WritableUtils.writeVInt(out, splits.length);
119 for (InputSplit s : splits) {
120 Text.writeString(out, s.getClass().getName());
121 }
122 for (InputSplit s : splits) {
123 s.write(out);
124 }
125 }
126
127 /**
128 * {@inheritDoc}
129 * @throws IOException If the child InputSplit cannot be read, typically
130 * for faliing access checks.
131 */
132 @SuppressWarnings("unchecked") // Generic array assignment
133 public void readFields(DataInput in) throws IOException {
134 int card = WritableUtils.readVInt(in);
135 if (splits == null || splits.length != card) {
136 splits = new InputSplit[card];
137 }
138 Class<? extends InputSplit>[] cls = new Class[card];
139 try {
140 for (int i = 0; i < card; ++i) {
141 cls[i] =
142 Class.forName(Text.readString(in)).asSubclass(InputSplit.class);
143 }
144 for (int i = 0; i < card; ++i) {
145 splits[i] = ReflectionUtils.newInstance(cls[i], null);
146 splits[i].readFields(in);
147 }
148 } catch (ClassNotFoundException e) {
149 throw (IOException)new IOException("Failed split init").initCause(e);
150 }
151 }
152
153 }