聊聊flink的MetricQueryServiceGateway

本文主要研究一下flink的MetricQueryServiceGateway

MetricQueryServiceGateway

flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/MetricQueryServiceGateway.java

public interface MetricQueryServiceGateway {

    CompletableFuture<MetricDumpSerialization.MetricSerializationResult> queryMetrics(Time timeout);

    String getAddress();
}
  • MetricQueryServiceGateway定义了两个方法,一个是queryMetrics,一个是getAddress;它有一个实现类为AkkaQueryServiceGateway

AkkaQueryServiceGateway

flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaQueryServiceGateway.java

public class AkkaQueryServiceGateway implements MetricQueryServiceGateway {

    private final ActorRef queryServiceActorRef;

    public AkkaQueryServiceGateway(ActorRef queryServiceActorRef) {
        this.queryServiceActorRef = Preconditions.checkNotNull(queryServiceActorRef);
    }

    @Override
    public CompletableFuture<MetricDumpSerialization.MetricSerializationResult> queryMetrics(Time timeout) {
        return FutureUtils.toJava(
            Patterns.ask(queryServiceActorRef, MetricQueryService.getCreateDump(), timeout.toMilliseconds())
                .mapTo(ClassTag$.MODULE$.apply(MetricDumpSerialization.MetricSerializationResult.class))
        );
    }

    @Override
    public String getAddress() {
        return queryServiceActorRef.path().toString();
    }
}
  • AkkaQueryServiceGateway实现了MetricQueryServiceGateway接口,它的构造器要求传入queryServiceActorRef;queryMetrics方法ask的消息类型为MetricQueryService.CreateDump;getAddress方法返回的是queryServiceActorRef.path()

MetricQueryService

flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java

public class MetricQueryService extends UntypedActor {
    private static final Logger LOG = LoggerFactory.getLogger(MetricQueryService.class);

    public static final String METRIC_QUERY_SERVICE_NAME = "MetricQueryService";
    private static final String SIZE_EXCEEDED_LOG_TEMPLATE =  "{} will not be reported as the metric dump would exceed the maximum size of {} bytes.";

    private static final CharacterFilter FILTER = new CharacterFilter() {
        @Override
        public String filterCharacters(String input) {
            return replaceInvalidChars(input);
        }
    };

    private final MetricDumpSerializer serializer = new MetricDumpSerializer();

    private final Map<Gauge<?>, Tuple2<QueryScopeInfo, String>> gauges = new HashMap<>();
    private final Map<Counter, Tuple2<QueryScopeInfo, String>> counters = new HashMap<>();
    private final Map<Histogram, Tuple2<QueryScopeInfo, String>> histograms = new HashMap<>();
    private final Map<Meter, Tuple2<QueryScopeInfo, String>> meters = new HashMap<>();

    private final long messageSizeLimit;

    //......

    @Override
    public void onReceive(Object message) {
        try {
            if (message instanceof AddMetric) {
                AddMetric added = (AddMetric) message;

                String metricName = added.metricName;
                Metric metric = added.metric;
                AbstractMetricGroup group = added.group;

                QueryScopeInfo info = group.getQueryServiceMetricInfo(FILTER);

                if (metric instanceof Counter) {
                    counters.put((Counter) metric, new Tuple2<>(info, FILTER.filterCharacters(metricName)));
                } else if (metric instanceof Gauge) {
                    gauges.put((Gauge<?>) metric, new Tuple2<>(info, FILTER.filterCharacters(metricName)));
                } else if (metric instanceof Histogram) {
                    histograms.put((Histogram) metric, new Tuple2<>(info, FILTER.filterCharacters(metricName)));
                } else if (metric instanceof Meter) {
                    meters.put((Meter) metric, new Tuple2<>(info, FILTER.filterCharacters(metricName)));
                }
            } else if (message instanceof RemoveMetric) {
                Metric metric = (((RemoveMetric) message).metric);
                if (metric instanceof Counter) {
                    this.counters.remove(metric);
                } else if (metric instanceof Gauge) {
                    this.gauges.remove(metric);
                } else if (metric instanceof Histogram) {
                    this.histograms.remove(metric);
                } else if (metric instanceof Meter) {
                    this.meters.remove(metric);
                }
            } else if (message instanceof CreateDump) {
                MetricDumpSerialization.MetricSerializationResult dump = serializer.serialize(counters, gauges, histograms, meters);

                dump = enforceSizeLimit(dump);

                getSender().tell(dump, getSelf());
            } else {
                LOG.warn("MetricQueryServiceActor received an invalid message. " + message.toString());
                getSender().tell(new Status.Failure(new IOException("MetricQueryServiceActor received an invalid message. " + message.toString())), getSelf());
            }
        } catch (Exception e) {
            LOG.warn("An exception occurred while processing a message.", e);
        }
    }

    public static Object getCreateDump() {
        return CreateDump.INSTANCE;
    }

    private static class CreateDump implements Serializable {
        private static final CreateDump INSTANCE = new CreateDump();
    }
    //......
}
  • MetricQueryService继承了UntypedActor,它的onReceive方法判断message类型,如果为CreateDump的话,则调用MetricDumpSerialization.MetricDumpSerializer.serialize(counters, gauges, histograms, meters)方法来序列化metrics得到MetricDumpSerialization.MetricSerializationResult,然后使用getSender().tell(dump, getSelf())返回数据

MetricDumpSerialization

flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java

public class MetricDumpSerialization {
    //......

    public static class MetricSerializationResult implements Serializable {

        private static final long serialVersionUID = 6928770855951536906L;

        public final byte[] serializedCounters;
        public final byte[] serializedGauges;
        public final byte[] serializedMeters;
        public final byte[] serializedHistograms;

        public final int numCounters;
        public final int numGauges;
        public final int numMeters;
        public final int numHistograms;

        public MetricSerializationResult(
            byte[] serializedCounters,
            byte[] serializedGauges,
            byte[] serializedMeters,
            byte[] serializedHistograms,
            int numCounters,
            int numGauges,
            int numMeters,
            int numHistograms) {

            Preconditions.checkNotNull(serializedCounters);
            Preconditions.checkNotNull(serializedGauges);
            Preconditions.checkNotNull(serializedMeters);
            Preconditions.checkNotNull(serializedHistograms);
            Preconditions.checkArgument(numCounters >= 0);
            Preconditions.checkArgument(numGauges >= 0);
            Preconditions.checkArgument(numMeters >= 0);
            Preconditions.checkArgument(numHistograms >= 0);
            this.serializedCounters = serializedCounters;
            this.serializedGauges = serializedGauges;
            this.serializedMeters = serializedMeters;
            this.serializedHistograms = serializedHistograms;
            this.numCounters = numCounters;
            this.numGauges = numGauges;
            this.numMeters = numMeters;
            this.numHistograms = numHistograms;
        }
    }

    public static class MetricDumpSerializer {

        private DataOutputSerializer countersBuffer = new DataOutputSerializer(1024 * 8);
        private DataOutputSerializer gaugesBuffer = new DataOutputSerializer(1024 * 8);
        private DataOutputSerializer metersBuffer = new DataOutputSerializer(1024 * 8);
        private DataOutputSerializer histogramsBuffer = new DataOutputSerializer(1024 * 8);

        public MetricSerializationResult serialize(
            Map<Counter, Tuple2<QueryScopeInfo, String>> counters,
            Map<Gauge<?>, Tuple2<QueryScopeInfo, String>> gauges,
            Map<Histogram, Tuple2<QueryScopeInfo, String>> histograms,
            Map<Meter, Tuple2<QueryScopeInfo, String>> meters) {

            countersBuffer.clear();
            int numCounters = 0;
            for (Map.Entry<Counter, Tuple2<QueryScopeInfo, String>> entry : counters.entrySet()) {
                try {
                    serializeCounter(countersBuffer, entry.getValue().f0, entry.getValue().f1, entry.getKey());
                    numCounters++;
                } catch (Exception e) {
                    LOG.debug("Failed to serialize counter.", e);
                }
            }

            gaugesBuffer.clear();
            int numGauges = 0;
            for (Map.Entry<Gauge<?>, Tuple2<QueryScopeInfo, String>> entry : gauges.entrySet()) {
                try {
                    serializeGauge(gaugesBuffer, entry.getValue().f0, entry.getValue().f1, entry.getKey());
                    numGauges++;
                } catch (Exception e) {
                    LOG.debug("Failed to serialize gauge.", e);
                }
            }

            histogramsBuffer.clear();
            int numHistograms = 0;
            for (Map.Entry<Histogram, Tuple2<QueryScopeInfo, String>> entry : histograms.entrySet()) {
                try {
                    serializeHistogram(histogramsBuffer, entry.getValue().f0, entry.getValue().f1, entry.getKey());
                    numHistograms++;
                } catch (Exception e) {
                    LOG.debug("Failed to serialize histogram.", e);
                }
            }

            metersBuffer.clear();
            int numMeters = 0;
            for (Map.Entry<Meter, Tuple2<QueryScopeInfo, String>> entry : meters.entrySet()) {
                try {
                    serializeMeter(metersBuffer, entry.getValue().f0, entry.getValue().f1, entry.getKey());
                    numMeters++;
                } catch (Exception e) {
                    LOG.debug("Failed to serialize meter.", e);
                }
            }

            return new MetricSerializationResult(
                countersBuffer.getCopyOfBuffer(),
                gaugesBuffer.getCopyOfBuffer(),
                metersBuffer.getCopyOfBuffer(),
                histogramsBuffer.getCopyOfBuffer(),
                numCounters,
                numGauges,
                numMeters,
                numHistograms);
        }

        public void close() {
            countersBuffer = null;
            gaugesBuffer = null;
            metersBuffer = null;
            histogramsBuffer = null;
        }
    }

    //......
}
  • MetricDumpSerialization有几个静态类分别是MetricSerializationResult、MetricDumpSerializer、MetricDumpDeserializer;MetricDumpSerializer提供了serialize方法用于将counters、gauges、histograms、meters指标序列化为MetricSerializationResult

小结

  • MetricQueryServiceGateway定义了两个方法,一个是queryMetrics,一个是getAddress;它有一个实现类为AkkaQueryServiceGateway
  • AkkaQueryServiceGateway实现了MetricQueryServiceGateway接口,它的构造器要求传入queryServiceActorRef;queryMetrics方法ask的消息类型为MetricQueryService.CreateDump;getAddress方法返回的是queryServiceActorRef.path()
  • MetricQueryService继承了UntypedActor,它的onReceive方法判断message类型,如果为CreateDump的话,则调用MetricDumpSerialization.MetricDumpSerializer.serialize(counters, gauges, histograms, meters)方法来序列化metrics得到MetricDumpSerialization.MetricSerializationResult,然后使用getSender().tell(dump, getSelf())返回数据;MetricDumpSerialization有几个静态类分别是MetricSerializationResult、MetricDumpSerializer、MetricDumpDeserializer;MetricDumpSerializer提供了serialize方法用于将counters、gauges、histograms、meters指标序列化为MetricSerializationResult

doc

相关推荐