【翻译】Flink Table Api & SQL — 配置
本文翻译自官网:Configuration https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/config.html
默认情况下,Table&SQL API已预先配置为产生具有可接受性能的准确结果。
根据表程序的要求,可能需要调整某些参数以进行优化。例如,无界流程序可能需要确保所需的状态大小是有上限的(请参阅流概念)。
总览
在每个表环境中,TableConfig
提供了用于配置当前会话的选项。
对于常见或重要的配置选项,TableConfig
提供了具有详细内联文档的getter和setter方法。
对于更高级的配置,用户可以直接访问基础键值映射。以下各节列出了可用于调整Flink Table和SQL API程序的所有可用选项。
注意:由于执行操作时会在不同的时间点读取选项,因此建议在实例化表环境后尽早设置配置选项。
// instantiate table environment val tEnv: TableEnvironment = ... // access flink configuration val configuration = tEnv.getConfig().getConfiguration() // set low-level key-value options configuration.setString("table.exec.mini-batch.enabled", "true") configuration.setString("table.exec.mini-batch.allow-latency", "5 s") configuration.setString("table.exec.mini-batch.size", "5000")
主要:当前仅 Blink planner 支持键值对的配置选项
执行配置选项
以下选项可用于调整查询执行的性能。
Key | Default | Description |
---|---|---|
table.exec.async-lookup.buffer-capacityBatch Streaming | 100 | async lookup join 可以触发的最大 async i/o 操作的数量 |
table.exec.async-lookup.timeoutBatch Streaming | "3 min" | 异步操作完成的 超时时间 |
table.exec.disabled-operatorsBatch | (none) | 主要用于测试. 以逗号分隔的运算符名称列表,每个名称代表一种禁用的运算符。 可以禁用的运算符包括“ NestedLoopJoin”,“ ShuffleHashJoin”,“ BroadcastHashJoin”, “ SortMergeJoin”,“ HashAgg”,“ SortAgg”。 默认情况下,未禁用任何运算符. |
table.exec.mini-batch.allow-latencyStreaming | "-1 ms" | 最大等待时间可用于MiniBatch缓冲输入记录。 MiniBatch是用于缓冲输入记录以减少状态访问的优化。 MiniBatch以允许的等待时间间隔以及达到最大缓冲记录数触发。 注意:如果将table.exec.mini-batch.enabled设置为true,则其值必须大于零. |
table.exec.mini-batch.enabledStreaming | false | 指定是否启用MiniBatch优化。 MiniBatch是用于缓冲输入记录以减少状态访问的优化。 默认情况下禁用此功能。 要启用此功能,用户应将此配置设置为true。 注意:如果启用了mini batch 处理, 则必须设置“ table.exec.mini-batch.allow-latency”和“ table.exec.mini-batch.size”. |
table.exec.mini-batch.sizeStreaming | -1 | 可以为MiniBatch缓冲最大输入记录数。 MiniBatch是用于缓冲输入记录以减少状态访问的优化。 MiniBatch以允许的等待时间间隔以及达到最大缓冲记录数触发。 注意:MiniBatch当前仅适用于非窗口聚合。 如果将table.exec.mini-batch.enabled设置为true,则其值必须为正. |
table.exec.resource.default-parallelismBatch Streaming | -1 | 为所有运算符(例如聚合,联接,过滤器)设置默认并行度以与并行实例一起运行。 此配置比StreamExecutionEnvironment的并行性具有更高的优先级 (实际上,此配置优先于StreamExecutionEnvironment的并行性)。 值-1表示未设置默认的并行性,则使用StreamExecutionEnvironment的并行性将回退. |
table.exec.resource.external-buffer-memoryBatch | "10 mb" | 设置在排序合并联接和嵌套联接以及窗口上使用的外部缓冲存储器大小. |
table.exec.resource.hash-agg.memoryBatch | "128 mb" | 设置哈希聚合运算符的托管内存大小. |
table.exec.resource.hash-join.memoryBatch | "128 mb" | 设置哈希联接运算符的托管内存。 定义下限. |
table.exec.resource.sort.memoryBatch | "128 mb" | 设置排序运算符的托管缓冲区内存大小. |
table.exec.shuffle-modeBatch | "batch" | 设置执行 shuffle 模式。 只能设置 batch 或 pipeline。 batch:工作将逐步进行。 pipeline:作业将以流模式运行,但是当发送方拥有资源等待将数据发送到接收方时, 接收方等待资源启动可能会导致资源死锁. |
table.exec.sort.async-merge-enabledBatch | true | 是否异步合并排序的溢出文件. |
table.exec.sort.default-limitBatch | -1 | 用户 order 后未设置限制时的默认限制。 -1表示此配置被忽略. |
table.exec.sort.max-num-file-handlesBatch | 128 | 外部合并排序的最大扇入。 它限制了每个运算符的文件句柄数。 如果太小,可能会导致中间合并。 但是,如果太大,将导致同时打开太多文件,占用内存并导致随机读取. |
table.exec.source.idle-timeoutStreaming | "-1 ms" | 当 source 在超时时间内未收到任何元素时,它将被标记为临时空闲。 这样,下游任务就可以前进其水印,而无需在空闲时等待来自该源的水印. |
table.exec.spill-compression.block-sizeBatch | "64 kb" | 溢出数据时用于压缩的内存大小。 内存越大,压缩率越高,但是作业将消耗更多的内存资源. |
table.exec.spill-compression.enabledBatch | true | 是否压缩溢出的数据。 目前,我们仅支持对sort和hash-agg和hash-join运算符压缩溢出的数据. |
table.exec.window-agg.buffer-size-limitBatch | 100000 | 设置组窗口agg运算符中使用的窗口元素缓冲区大小限制。 |
优化器选项
以下选项可用于调整查询优化器的行为,以获得更好的执行计划。
Key | Default | Description |
---|---|---|
table.optimizer.agg-phase-strategyBatch Streaming | "AUTO" | 汇总阶段的策略。 只能设置AUTO,TWO_PHASE或ONE_PHASE。 自动:聚合阶段没有特殊的执行器。 选择两阶段汇总还是一阶段汇总取决于成本。 TWO_PHASE:强制使用具有localAggregate和globalAggregate的两阶段聚合。 请注意,如果聚合调用不支持分为两阶段的优化,我们仍将使用一级聚合。 ONE_PHASE:强制使用仅具有CompleteGlobalAggregate的一级聚合. |
table.optimizer.distinct-agg.split.bucket-numStreaming | 1024 | 拆分独立聚合时配置存储桶数。 该数字在第一级聚合中用于计算存储区密钥“ hash_code(distinct_key)%BUCKET_NUM”,该存储区密钥在拆分后用作附加组密钥. |
table.optimizer.distinct-agg.split.enabledStreaming | false | 告诉优化程序是否将不同的聚合(例如COUNT(DISTINCT col),SUM(DISTINCT col))分成两个级别。 第一次聚合被一个附加 key shuffle,该附加 key 使用distinct_key的哈希码和存储桶数计算得出。 当不同的聚合中存在数据倾斜时,此优化非常有用,并且可以扩大工作量。 默认为false. |
table.optimizer.join-reorder-enabledBatch Streaming | false | 在优化器中启用联接重新排序。 默认为禁用. |
table.optimizer.join.broadcast-thresholdBatch | 1048576 | 配置表的最大大小(以字节为单位),该表在执行联接时将广播到所有工作程序节点。 通过将此值设置为-1以禁用广播. |
table.optimizer.reuse-source-enabledBatch Streaming | true | 如果为true,则优化器将尝试找出重复的表源并重新使用它们。 仅当启用table.optimizer.reuse-sub-plan为true时,此方法才有效. |
table.optimizer.reuse-sub-plan-enabledBatch Streaming | true | 当为 true 时,优化器将尝试找出重复的子计划并重用它们。 |
table.optimizer.source.predicate-pushdown-enabledBatch Streaming | true | 如果为true,则优化器会将谓词下推到FilterableTableSource中。 默认值为true. |
欢迎关注Flink菜鸟公众号,会不定期更新Flink(开发技术)相关的推文