聊聊flink的MetricQueryServiceGateway
序
本文主要研究一下flink的MetricQueryServiceGateway
MetricQueryServiceGateway
flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/MetricQueryServiceGateway.java
public interface MetricQueryServiceGateway { CompletableFuture<MetricDumpSerialization.MetricSerializationResult> queryMetrics(Time timeout); String getAddress(); }
- MetricQueryServiceGateway定义了两个方法,一个是queryMetrics,一个是getAddress;它有一个实现类为AkkaQueryServiceGateway
AkkaQueryServiceGateway
flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaQueryServiceGateway.java
public class AkkaQueryServiceGateway implements MetricQueryServiceGateway { private final ActorRef queryServiceActorRef; public AkkaQueryServiceGateway(ActorRef queryServiceActorRef) { this.queryServiceActorRef = Preconditions.checkNotNull(queryServiceActorRef); } @Override public CompletableFuture<MetricDumpSerialization.MetricSerializationResult> queryMetrics(Time timeout) { return FutureUtils.toJava( Patterns.ask(queryServiceActorRef, MetricQueryService.getCreateDump(), timeout.toMilliseconds()) .mapTo(ClassTag$.MODULE$.apply(MetricDumpSerialization.MetricSerializationResult.class)) ); } @Override public String getAddress() { return queryServiceActorRef.path().toString(); } }
- AkkaQueryServiceGateway实现了MetricQueryServiceGateway接口,它的构造器要求传入queryServiceActorRef;queryMetrics方法ask的消息类型为MetricQueryService.CreateDump;getAddress方法返回的是queryServiceActorRef.path()
MetricQueryService
flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java
public class MetricQueryService extends UntypedActor { private static final Logger LOG = LoggerFactory.getLogger(MetricQueryService.class); public static final String METRIC_QUERY_SERVICE_NAME = "MetricQueryService"; private static final String SIZE_EXCEEDED_LOG_TEMPLATE = "{} will not be reported as the metric dump would exceed the maximum size of {} bytes."; private static final CharacterFilter FILTER = new CharacterFilter() { @Override public String filterCharacters(String input) { return replaceInvalidChars(input); } }; private final MetricDumpSerializer serializer = new MetricDumpSerializer(); private final Map<Gauge<?>, Tuple2<QueryScopeInfo, String>> gauges = new HashMap<>(); private final Map<Counter, Tuple2<QueryScopeInfo, String>> counters = new HashMap<>(); private final Map<Histogram, Tuple2<QueryScopeInfo, String>> histograms = new HashMap<>(); private final Map<Meter, Tuple2<QueryScopeInfo, String>> meters = new HashMap<>(); private final long messageSizeLimit; //...... @Override public void onReceive(Object message) { try { if (message instanceof AddMetric) { AddMetric added = (AddMetric) message; String metricName = added.metricName; Metric metric = added.metric; AbstractMetricGroup group = added.group; QueryScopeInfo info = group.getQueryServiceMetricInfo(FILTER); if (metric instanceof Counter) { counters.put((Counter) metric, new Tuple2<>(info, FILTER.filterCharacters(metricName))); } else if (metric instanceof Gauge) { gauges.put((Gauge<?>) metric, new Tuple2<>(info, FILTER.filterCharacters(metricName))); } else if (metric instanceof Histogram) { histograms.put((Histogram) metric, new Tuple2<>(info, FILTER.filterCharacters(metricName))); } else if (metric instanceof Meter) { meters.put((Meter) metric, new Tuple2<>(info, FILTER.filterCharacters(metricName))); } } else if (message instanceof RemoveMetric) { Metric metric = (((RemoveMetric) message).metric); if (metric instanceof Counter) { this.counters.remove(metric); } else if (metric instanceof Gauge) { this.gauges.remove(metric); } else if (metric instanceof Histogram) { this.histograms.remove(metric); } else if (metric instanceof Meter) { this.meters.remove(metric); } } else if (message instanceof CreateDump) { MetricDumpSerialization.MetricSerializationResult dump = serializer.serialize(counters, gauges, histograms, meters); dump = enforceSizeLimit(dump); getSender().tell(dump, getSelf()); } else { LOG.warn("MetricQueryServiceActor received an invalid message. " + message.toString()); getSender().tell(new Status.Failure(new IOException("MetricQueryServiceActor received an invalid message. " + message.toString())), getSelf()); } } catch (Exception e) { LOG.warn("An exception occurred while processing a message.", e); } } public static Object getCreateDump() { return CreateDump.INSTANCE; } private static class CreateDump implements Serializable { private static final CreateDump INSTANCE = new CreateDump(); } //...... }
- MetricQueryService继承了UntypedActor,它的onReceive方法判断message类型,如果为CreateDump的话,则调用MetricDumpSerialization.MetricDumpSerializer.serialize(counters, gauges, histograms, meters)方法来序列化metrics得到MetricDumpSerialization.MetricSerializationResult,然后使用getSender().tell(dump, getSelf())返回数据
MetricDumpSerialization
flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java
public class MetricDumpSerialization { //...... public static class MetricSerializationResult implements Serializable { private static final long serialVersionUID = 6928770855951536906L; public final byte[] serializedCounters; public final byte[] serializedGauges; public final byte[] serializedMeters; public final byte[] serializedHistograms; public final int numCounters; public final int numGauges; public final int numMeters; public final int numHistograms; public MetricSerializationResult( byte[] serializedCounters, byte[] serializedGauges, byte[] serializedMeters, byte[] serializedHistograms, int numCounters, int numGauges, int numMeters, int numHistograms) { Preconditions.checkNotNull(serializedCounters); Preconditions.checkNotNull(serializedGauges); Preconditions.checkNotNull(serializedMeters); Preconditions.checkNotNull(serializedHistograms); Preconditions.checkArgument(numCounters >= 0); Preconditions.checkArgument(numGauges >= 0); Preconditions.checkArgument(numMeters >= 0); Preconditions.checkArgument(numHistograms >= 0); this.serializedCounters = serializedCounters; this.serializedGauges = serializedGauges; this.serializedMeters = serializedMeters; this.serializedHistograms = serializedHistograms; this.numCounters = numCounters; this.numGauges = numGauges; this.numMeters = numMeters; this.numHistograms = numHistograms; } } public static class MetricDumpSerializer { private DataOutputSerializer countersBuffer = new DataOutputSerializer(1024 * 8); private DataOutputSerializer gaugesBuffer = new DataOutputSerializer(1024 * 8); private DataOutputSerializer metersBuffer = new DataOutputSerializer(1024 * 8); private DataOutputSerializer histogramsBuffer = new DataOutputSerializer(1024 * 8); public MetricSerializationResult serialize( Map<Counter, Tuple2<QueryScopeInfo, String>> counters, Map<Gauge<?>, Tuple2<QueryScopeInfo, String>> gauges, Map<Histogram, Tuple2<QueryScopeInfo, String>> histograms, Map<Meter, Tuple2<QueryScopeInfo, String>> meters) { countersBuffer.clear(); int numCounters = 0; for (Map.Entry<Counter, Tuple2<QueryScopeInfo, String>> entry : counters.entrySet()) { try { serializeCounter(countersBuffer, entry.getValue().f0, entry.getValue().f1, entry.getKey()); numCounters++; } catch (Exception e) { LOG.debug("Failed to serialize counter.", e); } } gaugesBuffer.clear(); int numGauges = 0; for (Map.Entry<Gauge<?>, Tuple2<QueryScopeInfo, String>> entry : gauges.entrySet()) { try { serializeGauge(gaugesBuffer, entry.getValue().f0, entry.getValue().f1, entry.getKey()); numGauges++; } catch (Exception e) { LOG.debug("Failed to serialize gauge.", e); } } histogramsBuffer.clear(); int numHistograms = 0; for (Map.Entry<Histogram, Tuple2<QueryScopeInfo, String>> entry : histograms.entrySet()) { try { serializeHistogram(histogramsBuffer, entry.getValue().f0, entry.getValue().f1, entry.getKey()); numHistograms++; } catch (Exception e) { LOG.debug("Failed to serialize histogram.", e); } } metersBuffer.clear(); int numMeters = 0; for (Map.Entry<Meter, Tuple2<QueryScopeInfo, String>> entry : meters.entrySet()) { try { serializeMeter(metersBuffer, entry.getValue().f0, entry.getValue().f1, entry.getKey()); numMeters++; } catch (Exception e) { LOG.debug("Failed to serialize meter.", e); } } return new MetricSerializationResult( countersBuffer.getCopyOfBuffer(), gaugesBuffer.getCopyOfBuffer(), metersBuffer.getCopyOfBuffer(), histogramsBuffer.getCopyOfBuffer(), numCounters, numGauges, numMeters, numHistograms); } public void close() { countersBuffer = null; gaugesBuffer = null; metersBuffer = null; histogramsBuffer = null; } } //...... }
- MetricDumpSerialization有几个静态类分别是MetricSerializationResult、MetricDumpSerializer、MetricDumpDeserializer;MetricDumpSerializer提供了serialize方法用于将counters、gauges、histograms、meters指标序列化为MetricSerializationResult
小结
- MetricQueryServiceGateway定义了两个方法,一个是queryMetrics,一个是getAddress;它有一个实现类为AkkaQueryServiceGateway
- AkkaQueryServiceGateway实现了MetricQueryServiceGateway接口,它的构造器要求传入queryServiceActorRef;queryMetrics方法ask的消息类型为MetricQueryService.CreateDump;getAddress方法返回的是queryServiceActorRef.path()
- MetricQueryService继承了UntypedActor,它的onReceive方法判断message类型,如果为CreateDump的话,则调用MetricDumpSerialization.MetricDumpSerializer.serialize(counters, gauges, histograms, meters)方法来序列化metrics得到MetricDumpSerialization.MetricSerializationResult,然后使用getSender().tell(dump, getSelf())返回数据;MetricDumpSerialization有几个静态类分别是MetricSerializationResult、MetricDumpSerializer、MetricDumpDeserializer;MetricDumpSerializer提供了serialize方法用于将counters、gauges、histograms、meters指标序列化为MetricSerializationResult
doc
相关推荐
raidtest 2020-10-09
匆匆那些年 2020-06-27
oXiaoChong 2020-06-20
yuchuanchen 2020-06-16
Spark高级玩法 2020-06-14
Leonwey 2020-06-11
Spark高级玩法 2020-06-09
文报 2020-06-09
xorxos 2020-06-07
xiaoyutongxue 2020-05-27
yuchuanchen 2020-05-27
阿尼古 2020-05-26
千慧 2020-05-18
yuchuanchen 2020-05-17
yuchuanchen 2020-05-16
Spark高级玩法 2020-05-11
yuchuanchen 2020-05-11