001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.metrics.spi; 019 020import java.io.IOException; 021import java.lang.reflect.InvocationHandler; 022import java.lang.reflect.Method; 023import java.lang.reflect.Proxy; 024import java.util.ArrayList; 025 026import org.apache.commons.logging.Log; 027import org.apache.commons.logging.LogFactory; 028 029import org.apache.hadoop.classification.InterfaceAudience; 030import org.apache.hadoop.classification.InterfaceStability; 031import org.apache.hadoop.metrics.ContextFactory; 032import org.apache.hadoop.metrics.MetricsContext; 033import org.apache.hadoop.metrics.MetricsRecord; 034import org.apache.hadoop.metrics.MetricsUtil; 035import org.apache.hadoop.metrics.Updater; 036 037@InterfaceAudience.Public 038@InterfaceStability.Evolving 039public class CompositeContext extends AbstractMetricsContext { 040 041 private static final Log LOG = LogFactory.getLog(CompositeContext.class); 042 private static final String ARITY_LABEL = "arity"; 043 private static final String SUB_FMT = "%s.sub%d"; 044 private final ArrayList<MetricsContext> subctxt = 045 new ArrayList<MetricsContext>(); 046 047 @InterfaceAudience.Private 048 public CompositeContext() { 049 } 050 051 @InterfaceAudience.Private 052 public void init(String contextName, ContextFactory factory) { 053 super.init(contextName, factory); 054 int nKids; 055 try { 056 String sKids = getAttribute(ARITY_LABEL); 057 nKids = Integer.valueOf(sKids); 058 } catch (Exception e) { 059 LOG.error("Unable to initialize composite metric " + contextName + 060 ": could not init arity", e); 061 return; 062 } 063 for (int i = 0; i < nKids; ++i) { 064 MetricsContext ctxt = MetricsUtil.getContext( 065 String.format(SUB_FMT, contextName, i), contextName); 066 if (null != ctxt) { 067 subctxt.add(ctxt); 068 } 069 } 070 } 071 072 @InterfaceAudience.Private 073 @Override 074 public MetricsRecord newRecord(String recordName) { 075 return (MetricsRecord) Proxy.newProxyInstance( 076 MetricsRecord.class.getClassLoader(), 077 new Class[] { MetricsRecord.class }, 078 new MetricsRecordDelegator(recordName, subctxt)); 079 } 080 081 @InterfaceAudience.Private 082 @Override 083 protected void emitRecord(String contextName, String recordName, 084 OutputRecord outRec) throws IOException { 085 for (MetricsContext ctxt : subctxt) { 086 try { 087 ((AbstractMetricsContext)ctxt).emitRecord( 088 contextName, recordName, outRec); 089 if (contextName == null || recordName == null || outRec == null) { 090 throw new IOException(contextName + ":" + recordName + ":" + outRec); 091 } 092 } catch (IOException e) { 093 LOG.warn("emitRecord failed: " + ctxt.getContextName(), e); 094 } 095 } 096 } 097 098 @InterfaceAudience.Private 099 @Override 100 protected void flush() throws IOException { 101 for (MetricsContext ctxt : subctxt) { 102 try { 103 ((AbstractMetricsContext)ctxt).flush(); 104 } catch (IOException e) { 105 LOG.warn("flush failed: " + ctxt.getContextName(), e); 106 } 107 } 108 } 109 110 @InterfaceAudience.Private 111 @Override 112 public void startMonitoring() throws IOException { 113 for (MetricsContext ctxt : subctxt) { 114 try { 115 ctxt.startMonitoring(); 116 } catch (IOException e) { 117 LOG.warn("startMonitoring failed: " + ctxt.getContextName(), e); 118 } 119 } 120 } 121 122 @InterfaceAudience.Private 123 @Override 124 public void stopMonitoring() { 125 for (MetricsContext ctxt : subctxt) { 126 ctxt.stopMonitoring(); 127 } 128 } 129 130 /** 131 * Return true if all subcontexts are monitoring. 132 */ 133 @InterfaceAudience.Private 134 @Override 135 public boolean isMonitoring() { 136 boolean ret = true; 137 for (MetricsContext ctxt : subctxt) { 138 ret &= ctxt.isMonitoring(); 139 } 140 return ret; 141 } 142 143 @InterfaceAudience.Private 144 @Override 145 public void close() { 146 for (MetricsContext ctxt : subctxt) { 147 ctxt.close(); 148 } 149 } 150 151 @InterfaceAudience.Private 152 @Override 153 public void registerUpdater(Updater updater) { 154 for (MetricsContext ctxt : subctxt) { 155 ctxt.registerUpdater(updater); 156 } 157 } 158 159 @InterfaceAudience.Private 160 @Override 161 public void unregisterUpdater(Updater updater) { 162 for (MetricsContext ctxt : subctxt) { 163 ctxt.unregisterUpdater(updater); 164 } 165 } 166 167 private static class MetricsRecordDelegator implements InvocationHandler { 168 private static final Method m_getRecordName = initMethod(); 169 private static Method initMethod() { 170 try { 171 return MetricsRecord.class.getMethod("getRecordName", new Class[0]); 172 } catch (Exception e) { 173 throw new RuntimeException("Internal error", e); 174 } 175 } 176 177 private final String recordName; 178 private final ArrayList<MetricsRecord> subrecs; 179 180 MetricsRecordDelegator(String recordName, ArrayList<MetricsContext> ctxts) { 181 this.recordName = recordName; 182 this.subrecs = new ArrayList<MetricsRecord>(ctxts.size()); 183 for (MetricsContext ctxt : ctxts) { 184 subrecs.add(ctxt.createRecord(recordName)); 185 } 186 } 187 188 public Object invoke(Object p, Method m, Object[] args) throws Throwable { 189 if (m_getRecordName.equals(m)) { 190 return recordName; 191 } 192 assert Void.TYPE.equals(m.getReturnType()); 193 for (MetricsRecord rec : subrecs) { 194 m.invoke(rec, args); 195 } 196 return null; 197 } 198 } 199 200}