001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.hadoop.metrics.spi;
019
020 import java.io.IOException;
021 import java.lang.reflect.InvocationHandler;
022 import java.lang.reflect.Method;
023 import java.lang.reflect.Proxy;
024 import java.util.ArrayList;
025
026 import org.apache.commons.logging.Log;
027 import org.apache.commons.logging.LogFactory;
028
029 import org.apache.hadoop.classification.InterfaceAudience;
030 import org.apache.hadoop.classification.InterfaceStability;
031 import org.apache.hadoop.metrics.ContextFactory;
032 import org.apache.hadoop.metrics.MetricsContext;
033 import org.apache.hadoop.metrics.MetricsRecord;
034 import org.apache.hadoop.metrics.MetricsUtil;
035 import org.apache.hadoop.metrics.Updater;
036
037 @InterfaceAudience.Public
038 @InterfaceStability.Evolving
039 public class CompositeContext extends AbstractMetricsContext {
040
041 private static final Log LOG = LogFactory.getLog(CompositeContext.class);
042 private static final String ARITY_LABEL = "arity";
043 private static final String SUB_FMT = "%s.sub%d";
044 private final ArrayList<MetricsContext> subctxt =
045 new ArrayList<MetricsContext>();
046
047 @InterfaceAudience.Private
048 public CompositeContext() {
049 }
050
051 @InterfaceAudience.Private
052 public void init(String contextName, ContextFactory factory) {
053 super.init(contextName, factory);
054 int nKids;
055 try {
056 String sKids = getAttribute(ARITY_LABEL);
057 nKids = Integer.valueOf(sKids);
058 } catch (Exception e) {
059 LOG.error("Unable to initialize composite metric " + contextName +
060 ": could not init arity", e);
061 return;
062 }
063 for (int i = 0; i < nKids; ++i) {
064 MetricsContext ctxt = MetricsUtil.getContext(
065 String.format(SUB_FMT, contextName, i), contextName);
066 if (null != ctxt) {
067 subctxt.add(ctxt);
068 }
069 }
070 }
071
072 @InterfaceAudience.Private
073 @Override
074 public MetricsRecord newRecord(String recordName) {
075 return (MetricsRecord) Proxy.newProxyInstance(
076 MetricsRecord.class.getClassLoader(),
077 new Class[] { MetricsRecord.class },
078 new MetricsRecordDelegator(recordName, subctxt));
079 }
080
081 @InterfaceAudience.Private
082 @Override
083 protected void emitRecord(String contextName, String recordName,
084 OutputRecord outRec) throws IOException {
085 for (MetricsContext ctxt : subctxt) {
086 try {
087 ((AbstractMetricsContext)ctxt).emitRecord(
088 contextName, recordName, outRec);
089 if (contextName == null || recordName == null || outRec == null) {
090 throw new IOException(contextName + ":" + recordName + ":" + outRec);
091 }
092 } catch (IOException e) {
093 LOG.warn("emitRecord failed: " + ctxt.getContextName(), e);
094 }
095 }
096 }
097
098 @InterfaceAudience.Private
099 @Override
100 protected void flush() throws IOException {
101 for (MetricsContext ctxt : subctxt) {
102 try {
103 ((AbstractMetricsContext)ctxt).flush();
104 } catch (IOException e) {
105 LOG.warn("flush failed: " + ctxt.getContextName(), e);
106 }
107 }
108 }
109
110 @InterfaceAudience.Private
111 @Override
112 public void startMonitoring() throws IOException {
113 for (MetricsContext ctxt : subctxt) {
114 try {
115 ctxt.startMonitoring();
116 } catch (IOException e) {
117 LOG.warn("startMonitoring failed: " + ctxt.getContextName(), e);
118 }
119 }
120 }
121
122 @InterfaceAudience.Private
123 @Override
124 public void stopMonitoring() {
125 for (MetricsContext ctxt : subctxt) {
126 ctxt.stopMonitoring();
127 }
128 }
129
130 /**
131 * Return true if all subcontexts are monitoring.
132 */
133 @InterfaceAudience.Private
134 @Override
135 public boolean isMonitoring() {
136 boolean ret = true;
137 for (MetricsContext ctxt : subctxt) {
138 ret &= ctxt.isMonitoring();
139 }
140 return ret;
141 }
142
143 @InterfaceAudience.Private
144 @Override
145 public void close() {
146 for (MetricsContext ctxt : subctxt) {
147 ctxt.close();
148 }
149 }
150
151 @InterfaceAudience.Private
152 @Override
153 public void registerUpdater(Updater updater) {
154 for (MetricsContext ctxt : subctxt) {
155 ctxt.registerUpdater(updater);
156 }
157 }
158
159 @InterfaceAudience.Private
160 @Override
161 public void unregisterUpdater(Updater updater) {
162 for (MetricsContext ctxt : subctxt) {
163 ctxt.unregisterUpdater(updater);
164 }
165 }
166
167 private static class MetricsRecordDelegator implements InvocationHandler {
168 private static final Method m_getRecordName = initMethod();
169 private static Method initMethod() {
170 try {
171 return MetricsRecord.class.getMethod("getRecordName", new Class[0]);
172 } catch (Exception e) {
173 throw new RuntimeException("Internal error", e);
174 }
175 }
176
177 private final String recordName;
178 private final ArrayList<MetricsRecord> subrecs;
179
180 MetricsRecordDelegator(String recordName, ArrayList<MetricsContext> ctxts) {
181 this.recordName = recordName;
182 this.subrecs = new ArrayList<MetricsRecord>(ctxts.size());
183 for (MetricsContext ctxt : ctxts) {
184 subrecs.add(ctxt.createRecord(recordName));
185 }
186 }
187
188 public Object invoke(Object p, Method m, Object[] args) throws Throwable {
189 if (m_getRecordName.equals(m)) {
190 return recordName;
191 }
192 assert Void.TYPE.equals(m.getReturnType());
193 for (MetricsRecord rec : subrecs) {
194 m.invoke(rec, args);
195 }
196 return null;
197 }
198 }
199
200 }