[case41]聊聊storm的GraphiteStormReporter
序
本文主要研究一下storm的GraphiteStormReporter
GraphiteStormReporter
storm-core-1.2.2-sources.jar!/org/apache/storm/metrics2/reporters/GraphiteStormReporter.java
public class GraphiteStormReporter extends ScheduledStormReporter { private final static Logger LOG = LoggerFactory.getLogger(GraphiteStormReporter.class); public static final String GRAPHITE_PREFIXED_WITH = "graphite.prefixed.with"; public static final String GRAPHITE_HOST = "graphite.host"; public static final String GRAPHITE_PORT = "graphite.port"; public static final String GRAPHITE_TRANSPORT = "graphite.transport"; @Override public void prepare(MetricRegistry metricsRegistry, Map stormConf, Map reporterConf) { LOG.debug("Preparing..."); GraphiteReporter.Builder builder = GraphiteReporter.forRegistry(metricsRegistry); TimeUnit durationUnit = MetricsUtils.getMetricsDurationUnit(reporterConf); if (durationUnit != null) { builder.convertDurationsTo(durationUnit); } TimeUnit rateUnit = MetricsUtils.getMetricsRateUnit(reporterConf); if (rateUnit != null) { builder.convertRatesTo(rateUnit); } StormMetricsFilter filter = getMetricsFilter(reporterConf); if(filter != null){ builder.filter(filter); } String prefix = getMetricsPrefixedWith(reporterConf); if (prefix != null) { builder.prefixedWith(prefix); } //defaults to 10 reportingPeriod = getReportPeriod(reporterConf); //defaults to seconds reportingPeriodUnit = getReportPeriodUnit(reporterConf); // Not exposed: // * withClock(Clock) String host = getMetricsTargetHost(reporterConf); Integer port = getMetricsTargetPort(reporterConf); String transport = getMetricsTargetTransport(reporterConf); GraphiteSender sender = null; if (transport.equalsIgnoreCase("udp")) { sender = new GraphiteUDP(host, port); } else { sender = new Graphite(host, port); } reporter = builder.build(sender); } private static String getMetricsPrefixedWith(Map reporterConf) { return Utils.getString(reporterConf.get(GRAPHITE_PREFIXED_WITH), null); } private static String getMetricsTargetHost(Map reporterConf) { return Utils.getString(reporterConf.get(GRAPHITE_HOST), null); } private static Integer getMetricsTargetPort(Map reporterConf) { return Utils.getInt(reporterConf.get(GRAPHITE_PORT), null); } private static String getMetricsTargetTransport(Map reporterConf) { return Utils.getString(reporterConf.get(GRAPHITE_TRANSPORT), "tcp"); } }
- 继承了ScheduledStormReporter,实现prepare方法
- prepare方法根据配置文件创建com.codahale.metrics.graphite.GraphiteSender,然后创建com.codahale.metrics.graphite.GraphiteReporter
ScheduledStormReporter
storm-core-1.2.2-sources.jar!/org/apache/storm/metrics2/reporters/ScheduledStormReporter.java
public abstract class ScheduledStormReporter implements StormReporter{ private static final Logger LOG = LoggerFactory.getLogger(ScheduledStormReporter.class); protected ScheduledReporter reporter; protected long reportingPeriod; protected TimeUnit reportingPeriodUnit; @Override public void start() { if (reporter != null) { LOG.debug("Starting..."); reporter.start(reportingPeriod, reportingPeriodUnit); } else { throw new IllegalStateException("Attempt to start without preparing " + getClass().getSimpleName()); } } @Override public void stop() { if (reporter != null) { LOG.debug("Stopping..."); reporter.stop(); } else { throw new IllegalStateException("Attempt to stop without preparing " + getClass().getSimpleName()); } } public static TimeUnit getReportPeriodUnit(Map<String, Object> reporterConf) { TimeUnit unit = getTimeUnitForConfig(reporterConf, REPORT_PERIOD_UNITS); return unit == null ? TimeUnit.SECONDS : unit; } private static TimeUnit getTimeUnitForConfig(Map reporterConf, String configName) { String rateUnitString = Utils.getString(reporterConf.get(configName), null); if (rateUnitString != null) { return TimeUnit.valueOf(rateUnitString); } return null; } public static long getReportPeriod(Map reporterConf) { return Utils.getInt(reporterConf.get(REPORT_PERIOD), 10).longValue(); } public static StormMetricsFilter getMetricsFilter(Map reporterConf){ StormMetricsFilter filter = null; Map<String, Object> filterConf = (Map)reporterConf.get("filter"); if(filterConf != null) { String clazz = (String) filterConf.get("class"); if (clazz != null) { filter = Utils.newInstance(clazz); filter.prepare(filterConf); } } return filter; } }
- ScheduledStormReporter封装了对reporter的生命周期的控制,启动时调用start,关闭时调用stop
小结
- storm从1.2版本开始启用了新的metrics,即metrics2,新版的metrics基于Dropwizard Metrics
- 默认提供了Console Reporter、CSV Reporter、Ganglia Reporter 、Graphite Reporter、JMX Reporter
doc
相关推荐
枫叶上的雨露 2020-05-02
LandryBean 2020-03-12
一名java从业者 2020-01-09
weeniebear 2013-03-25
weeniebear 2014-05-28
sfqbluesky 2019-12-12
AbnerSunYH 2016-08-12
weeniebear 2016-08-11
Stereo 2016-07-27
芒果先生Mango 2018-05-31
dykun 2019-08-16
GimmeS 2016-10-11
benbendy 2016-09-30
Johnhao 2016-09-30
AbnerSunYH 2016-04-28
benbendy 2016-04-15