001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.mapred.lib.aggregate;
020    
021    import java.io.IOException;
022    import java.util.ArrayList;
023    
024    import org.apache.hadoop.classification.InterfaceAudience;
025    import org.apache.hadoop.classification.InterfaceStability;
026    import org.apache.hadoop.io.Text;
027    import org.apache.hadoop.io.Writable;
028    import org.apache.hadoop.io.WritableComparable;
029    import org.apache.hadoop.mapred.JobConf;
030    import org.apache.hadoop.mapred.Mapper;
031    import org.apache.hadoop.mapred.Reducer;
032    
033    /**
034     * This abstract class implements some common functionalities of the
035     * the generic mapper, reducer and combiner classes of Aggregate.
036     */
037    @InterfaceAudience.Public
038    @InterfaceStability.Stable
039    public abstract class ValueAggregatorJobBase<K1 extends WritableComparable,
040                                                 V1 extends Writable>
041      implements Mapper<K1, V1, Text, Text>, Reducer<Text, Text, Text, Text> {
042    
043      protected ArrayList<ValueAggregatorDescriptor> aggregatorDescriptorList = null;
044    
045      public void configure(JobConf job) {
046        this.initializeMySpec(job);
047        this.logSpec();
048      }
049    
050      private static ValueAggregatorDescriptor getValueAggregatorDescriptor(
051          String spec, JobConf job) {
052        if (spec == null)
053          return null;
054        String[] segments = spec.split(",", -1);
055        String type = segments[0];
056        if (type.compareToIgnoreCase("UserDefined") == 0) {
057          String className = segments[1];
058          return new UserDefinedValueAggregatorDescriptor(className, job);
059        }
060        return null;
061      }
062    
063      private static ArrayList<ValueAggregatorDescriptor> getAggregatorDescriptors(JobConf job) {
064        String advn = "aggregator.descriptor";
065        int num = job.getInt(advn + ".num", 0);
066        ArrayList<ValueAggregatorDescriptor> retv = new ArrayList<ValueAggregatorDescriptor>(num);
067        for (int i = 0; i < num; i++) {
068          String spec = job.get(advn + "." + i);
069          ValueAggregatorDescriptor ad = getValueAggregatorDescriptor(spec, job);
070          if (ad != null) {
071            retv.add(ad);
072          }
073        }
074        return retv;
075      }
076    
077      private void initializeMySpec(JobConf job) {
078        this.aggregatorDescriptorList = getAggregatorDescriptors(job);
079        if (this.aggregatorDescriptorList.size() == 0) {
080          this.aggregatorDescriptorList
081              .add(new UserDefinedValueAggregatorDescriptor(
082                  ValueAggregatorBaseDescriptor.class.getCanonicalName(), job));
083        }
084      }
085    
086      protected void logSpec() {
087    
088      }
089    
090      public void close() throws IOException {
091      }
092    }