001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.mapreduce.lib.join; 020 021import java.io.DataInput; 022import java.io.DataInputStream; 023import java.io.DataOutput; 024import java.io.DataOutputStream; 025import java.io.IOException; 026import java.util.HashSet; 027 028import org.apache.hadoop.classification.InterfaceAudience; 029import org.apache.hadoop.classification.InterfaceStability; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.io.Text; 032import org.apache.hadoop.io.Writable; 033import org.apache.hadoop.io.WritableUtils; 034import org.apache.hadoop.io.serializer.*; 035import org.apache.hadoop.mapreduce.InputSplit; 036import org.apache.hadoop.util.ReflectionUtils; 037 038/** 039 * This InputSplit contains a set of child InputSplits. Any InputSplit inserted 040 * into this collection must have a public default constructor. 041 */ 042@InterfaceAudience.Public 043@InterfaceStability.Stable 044public class CompositeInputSplit extends InputSplit implements Writable { 045 046 private int fill = 0; 047 private long totsize = 0L; 048 private InputSplit[] splits; 049 private Configuration conf = new Configuration(); 050 051 public CompositeInputSplit() { } 052 053 public CompositeInputSplit(int capacity) { 054 splits = new InputSplit[capacity]; 055 } 056 057 /** 058 * Add an InputSplit to this collection. 059 * @throws IOException If capacity was not specified during construction 060 * or if capacity has been reached. 061 */ 062 public void add(InputSplit s) throws IOException, InterruptedException { 063 if (null == splits) { 064 throw new IOException("Uninitialized InputSplit"); 065 } 066 if (fill == splits.length) { 067 throw new IOException("Too many splits"); 068 } 069 splits[fill++] = s; 070 totsize += s.getLength(); 071 } 072 073 /** 074 * Get ith child InputSplit. 075 */ 076 public InputSplit get(int i) { 077 return splits[i]; 078 } 079 080 /** 081 * Return the aggregate length of all child InputSplits currently added. 082 */ 083 public long getLength() throws IOException { 084 return totsize; 085 } 086 087 /** 088 * Get the length of ith child InputSplit. 089 */ 090 public long getLength(int i) throws IOException, InterruptedException { 091 return splits[i].getLength(); 092 } 093 094 /** 095 * Collect a set of hosts from all child InputSplits. 096 */ 097 public String[] getLocations() throws IOException, InterruptedException { 098 HashSet<String> hosts = new HashSet<String>(); 099 for (InputSplit s : splits) { 100 String[] hints = s.getLocations(); 101 if (hints != null && hints.length > 0) { 102 for (String host : hints) { 103 hosts.add(host); 104 } 105 } 106 } 107 return hosts.toArray(new String[hosts.size()]); 108 } 109 110 /** 111 * getLocations from ith InputSplit. 112 */ 113 public String[] getLocation(int i) throws IOException, InterruptedException { 114 return splits[i].getLocations(); 115 } 116 117 /** 118 * Write splits in the following format. 119 * {@code 120 * <count><class1><class2>...<classn><split1><split2>...<splitn> 121 * } 122 */ 123 @SuppressWarnings("unchecked") 124 public void write(DataOutput out) throws IOException { 125 WritableUtils.writeVInt(out, splits.length); 126 for (InputSplit s : splits) { 127 Text.writeString(out, s.getClass().getName()); 128 } 129 for (InputSplit s : splits) { 130 SerializationFactory factory = new SerializationFactory(conf); 131 Serializer serializer = 132 factory.getSerializer(s.getClass()); 133 serializer.open((DataOutputStream)out); 134 serializer.serialize(s); 135 } 136 } 137 138 /** 139 * {@inheritDoc} 140 * @throws IOException If the child InputSplit cannot be read, typically 141 * for failing access checks. 142 */ 143 @SuppressWarnings("unchecked") // Generic array assignment 144 public void readFields(DataInput in) throws IOException { 145 int card = WritableUtils.readVInt(in); 146 if (splits == null || splits.length != card) { 147 splits = new InputSplit[card]; 148 } 149 Class<? extends InputSplit>[] cls = new Class[card]; 150 try { 151 for (int i = 0; i < card; ++i) { 152 cls[i] = 153 Class.forName(Text.readString(in)).asSubclass(InputSplit.class); 154 } 155 for (int i = 0; i < card; ++i) { 156 splits[i] = ReflectionUtils.newInstance(cls[i], null); 157 SerializationFactory factory = new SerializationFactory(conf); 158 Deserializer deserializer = factory.getDeserializer(cls[i]); 159 deserializer.open((DataInputStream)in); 160 splits[i] = (InputSplit)deserializer.deserialize(splits[i]); 161 } 162 } catch (ClassNotFoundException e) { 163 throw new IOException("Failed split init", e); 164 } 165 } 166}