flume消息处理的监控信息使用
Flume现在使用越来越多,在使用过程中难免发现性能瓶颈或者消息丢失的问题。在遇到这些问题的时候,第一想到的是通过java自带命令去分析问题和使用一些日志去定位问题。
Flume在处理消息时自带了很多counter,并可以以JMX、Ganglia、JSON等方式发布出来,在需要的时候,在启动脚本中增加该配置项即可使用:
-Dflume.monitoring.type=http -Dflume.monitoring.port=34545
增加启动后,可输入http://188.1.186.XXX:34545/metrics 得到监控信息的json数据
如果在linux上运行,直接执行 curl -XGET '188.1.186.XXX:34545/metrics'
得到消息,可以通过放到eclipse中,命名一个json文件,ctrl+shift+f 格式化下:
{ "SINK.k1": { "ConnectionCreatedCount": "1", "ConnectionClosedCount": "0", "Type": "SINK", "BatchCompleteCount": "0", "BatchEmptyCount": "0", "EventDrainAttemptCount": "7908340", "StartTime": "1514878638909", "EventDrainSuccessCount": "7657343", "BatchUnderflowCount": "250997", "StopTime": "0", "ConnectionFailedCount": "0" }, "CHANNEL.c1": { "ChannelCapacity": "1000000", "ChannelFillPercentage": "0.0", "Type": "CHANNEL", "ChannelSize": "0", "EventTakeSuccessCount": "7908340", "EventTakeAttemptCount": "7908466", "StartTime": "1514878638906", "EventPutAttemptCount": "7908340", "EventPutSuccessCount": "7908340", "StopTime": "0" }, "SOURCE.r1": { "KafkaEventGetTimer": "6468875", "AppendBatchAcceptedCount": "0", "EventAcceptedCount": "7908340", "AppendReceivedCount": "0", "StartTime": "1514878643588", "AppendBatchReceivedCount": "0", "KafkaCommitTimer": "156254", "EventReceivedCount": "7908340", "Type": "SOURCE", "AppendAcceptedCount": "0", "OpenConnectionCount": "0", "KafkaEmptyCount": "0", "StopTime": "0" } }
通过对source、channel、sink中的指标了解系统的处理瓶颈。
如果自己开发的插件,同样可以使用这些counter来完成统计信息的输出,如:
public class ElasticSearchSink extends AbstractSink implements Configurable { private static final Logger logger = LoggerFactory.getLogger(ElasticSearchSink.class); private BulkProcessor bulkProcessor;private SinkCounter sinkCounter; @Override public void configure(Context context) { ... buildIndexBuilder(context); buildSerializer(context); if (sinkCounter == null) { sinkCounter = new SinkCounter(getName()); } bulkProcessor = new BulkProcessorBuilder().buildBulkProcessor(context, client); } @Override public Status process() throws EventDeliveryException { Channel channel = getChannel(); Transaction txn = channel.getTransaction(); txn.begin(); try { Event event = channel.take(); if (event != null) { String body = new String(event.getBody(), Charsets.UTF_8); sinkCounter.incrementEventDrainAttemptCount(); if (!Strings.isNullOrEmpty(body)) { String index = indexBuilder.getIndex(event); String type = indexBuilder.getType(event); String id = indexBuilder.getId(event); XContentBuilder xContentBuilder = serializer.serialize(event); if(index!=null && xContentBuilder != null) { if (!StringUtil.isNullOrEmpty(id)) { bulkProcessor.add(new IndexRequest(index, type, id) .source(xContentBuilder)); sinkCounter.incrementEventDrainSuccessCount();<br /> ...<br />
@Override
public synchronized void start() {
sinkCounter.start();
sinkCounter.incrementConnectionCreatedCount();
super.start();
}
...
总结:
1.遇到flume性能问题,在启动脚本增加-Dflume.monitoring.type=http -Dflume.monitoring.port=34545 开启监控日志
2.通过http://<hostname>:<port>/metrics获取信息
3.自定义开发插件时,可以使用flume已有counter记录统计信息
备注:更多内容,参考官方文档http://flume.apache.org/FlumeUserGuide.html#monitoring