001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.metrics.spi;
019
020import java.io.IOException;
021import java.lang.reflect.InvocationHandler;
022import java.lang.reflect.Method;
023import java.lang.reflect.Proxy;
024import java.util.ArrayList;
025
026import org.apache.commons.logging.Log;
027import org.apache.commons.logging.LogFactory;
028
029import org.apache.hadoop.classification.InterfaceAudience;
030import org.apache.hadoop.classification.InterfaceStability;
031import org.apache.hadoop.metrics.ContextFactory;
032import org.apache.hadoop.metrics.MetricsContext;
033import org.apache.hadoop.metrics.MetricsRecord;
034import org.apache.hadoop.metrics.MetricsUtil;
035import org.apache.hadoop.metrics.Updater;
036
037@InterfaceAudience.Public
038@InterfaceStability.Evolving
039public class CompositeContext extends AbstractMetricsContext {
040
041  private static final Log LOG = LogFactory.getLog(CompositeContext.class);
042  private static final String ARITY_LABEL = "arity";
043  private static final String SUB_FMT = "%s.sub%d";
044  private final ArrayList<MetricsContext> subctxt =
045    new ArrayList<MetricsContext>();
046
047  @InterfaceAudience.Private
048  public CompositeContext() {
049  }
050
051  @InterfaceAudience.Private
052  public void init(String contextName, ContextFactory factory) {
053    super.init(contextName, factory);
054    int nKids;
055    try {
056      String sKids = getAttribute(ARITY_LABEL);
057      nKids = Integer.parseInt(sKids);
058    } catch (Exception e) {
059      LOG.error("Unable to initialize composite metric " + contextName +
060                ": could not init arity", e);
061      return;
062    }
063    for (int i = 0; i < nKids; ++i) {
064      MetricsContext ctxt = MetricsUtil.getContext(
065          String.format(SUB_FMT, contextName, i), contextName);
066      if (null != ctxt) {
067        subctxt.add(ctxt);
068      }
069    }
070  }
071
072  @InterfaceAudience.Private
073  @Override
074  public MetricsRecord newRecord(String recordName) {
075    return (MetricsRecord) Proxy.newProxyInstance(
076        MetricsRecord.class.getClassLoader(),
077        new Class[] { MetricsRecord.class },
078        new MetricsRecordDelegator(recordName, subctxt));
079  }
080
081  @InterfaceAudience.Private
082  @Override
083  protected void emitRecord(String contextName, String recordName,
084      OutputRecord outRec) throws IOException {
085    for (MetricsContext ctxt : subctxt) {
086      try {
087        ((AbstractMetricsContext)ctxt).emitRecord(
088          contextName, recordName, outRec);
089        if (contextName == null || recordName == null || outRec == null) {
090          throw new IOException(contextName + ":" + recordName + ":" + outRec);
091        }
092      } catch (IOException e) {
093        LOG.warn("emitRecord failed: " + ctxt.getContextName(), e);
094      }
095    }
096  }
097
098  @InterfaceAudience.Private
099  @Override
100  protected void flush() throws IOException {
101    for (MetricsContext ctxt : subctxt) {
102      try {
103        ((AbstractMetricsContext)ctxt).flush();
104      } catch (IOException e) {
105        LOG.warn("flush failed: " + ctxt.getContextName(), e);
106      }
107    }
108  }
109
110  @InterfaceAudience.Private
111  @Override
112  public void startMonitoring() throws IOException {
113    for (MetricsContext ctxt : subctxt) {
114      try {
115        ctxt.startMonitoring();
116      } catch (IOException e) {
117        LOG.warn("startMonitoring failed: " + ctxt.getContextName(), e);
118      }
119    }
120  }
121
122  @InterfaceAudience.Private
123  @Override
124  public void stopMonitoring() {
125    for (MetricsContext ctxt : subctxt) {
126      ctxt.stopMonitoring();
127    }
128  }
129
130  /**
131   * Return true if all subcontexts are monitoring.
132   */
133  @InterfaceAudience.Private
134  @Override
135  public boolean isMonitoring() {
136    boolean ret = true;
137    for (MetricsContext ctxt : subctxt) {
138      ret &= ctxt.isMonitoring();
139    }
140    return ret;
141  }
142
143  @InterfaceAudience.Private
144  @Override
145  public void close() {
146    for (MetricsContext ctxt : subctxt) {
147      ctxt.close();
148    }
149  }
150
151  @InterfaceAudience.Private
152  @Override
153  public void registerUpdater(Updater updater) {
154    for (MetricsContext ctxt : subctxt) {
155      ctxt.registerUpdater(updater);
156    }
157  }
158
159  @InterfaceAudience.Private
160  @Override
161  public void unregisterUpdater(Updater updater) {
162    for (MetricsContext ctxt : subctxt) {
163      ctxt.unregisterUpdater(updater);
164    }
165  }
166
167  private static class MetricsRecordDelegator implements InvocationHandler {
168    private static final Method m_getRecordName = initMethod();
169    private static Method initMethod() {
170      try {
171        return MetricsRecord.class.getMethod("getRecordName", new Class[0]);
172      } catch (Exception e) {
173        throw new RuntimeException("Internal error", e);
174      }
175    }
176
177    private final String recordName;
178    private final ArrayList<MetricsRecord> subrecs;
179
180    MetricsRecordDelegator(String recordName, ArrayList<MetricsContext> ctxts) {
181      this.recordName = recordName;
182      this.subrecs = new ArrayList<MetricsRecord>(ctxts.size());
183      for (MetricsContext ctxt : ctxts) {
184        subrecs.add(ctxt.createRecord(recordName));
185      }
186    }
187
188    public Object invoke(Object p, Method m, Object[] args) throws Throwable {
189      if (m_getRecordName.equals(m)) {
190        return recordName;
191      }
192      assert Void.TYPE.equals(m.getReturnType());
193      for (MetricsRecord rec : subrecs) {
194        m.invoke(rec, args);
195      }
196      return null;
197    }
198  }
199
200}