聊聊flink的window操作
序
本文主要研究一下flink的window操作
window
DataStream
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/DataStream.java
public AllWindowedStream<T, TimeWindow> timeWindowAll(Time size) { if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) { return windowAll(TumblingProcessingTimeWindows.of(size)); } else { return windowAll(TumblingEventTimeWindows.of(size)); } } public AllWindowedStream<T, TimeWindow> timeWindowAll(Time size, Time slide) { if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) { return windowAll(SlidingProcessingTimeWindows.of(size, slide)); } else { return windowAll(SlidingEventTimeWindows.of(size, slide)); } } public AllWindowedStream<T, GlobalWindow> countWindowAll(long size) { return windowAll(GlobalWindows.create()).trigger(PurgingTrigger.of(CountTrigger.of(size))); } public AllWindowedStream<T, GlobalWindow> countWindowAll(long size, long slide) { return windowAll(GlobalWindows.create()) .evictor(CountEvictor.of(size)) .trigger(CountTrigger.of(slide)); } @PublicEvolving public <W extends Window> AllWindowedStream<T, W> windowAll(WindowAssigner<? super T, W> assigner) { return new AllWindowedStream<>(this, assigner); }
- 对于非KeyedStream,有timeWindowAll、countWindowAll、windowAll操作,其中最主要的是windowAll操作,它的parallelism为1,它需要一个WindowAssigner参数,返回的是AllWindowedStream
KeyedStream
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/KeyedStream.java
public WindowedStream<T, KEY, TimeWindow> timeWindow(Time size) { if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) { return window(TumblingProcessingTimeWindows.of(size)); } else { return window(TumblingEventTimeWindows.of(size)); } } public WindowedStream<T, KEY, TimeWindow> timeWindow(Time size, Time slide) { if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) { return window(SlidingProcessingTimeWindows.of(size, slide)); } else { return window(SlidingEventTimeWindows.of(size, slide)); } } public WindowedStream<T, KEY, GlobalWindow> countWindow(long size) { return window(GlobalWindows.create()).trigger(PurgingTrigger.of(CountTrigger.of(size))); } public WindowedStream<T, KEY, GlobalWindow> countWindow(long size, long slide) { return window(GlobalWindows.create()) .evictor(CountEvictor.of(size)) .trigger(CountTrigger.of(slide)); } @PublicEvolving public <W extends Window> WindowedStream<T, KEY, W> window(WindowAssigner<? super T, W> assigner) { return new WindowedStream<>(this, assigner); }
- 对于KeyedStream除了继承了DataStream的window相关操作,它主要用的是timeWindow、countWindow、window操作,其中最主要的是window操作,它也需要一个WindowAssigner参数,返回的是WindowedStream
WindowedStream
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/WindowedStream.java
@Public public class WindowedStream<T, K, W extends Window> { /** The keyed data stream that is windowed by this stream. */ private final KeyedStream<T, K> input; /** The window assigner. */ private final WindowAssigner<? super T, W> windowAssigner; /** The trigger that is used for window evaluation/emission. */ private Trigger<? super T, ? super W> trigger; /** The evictor that is used for evicting elements before window evaluation. */ private Evictor<? super T, ? super W> evictor; /** The user-specified allowed lateness. */ private long allowedLateness = 0L; /** * Side output {@code OutputTag} for late data. If no tag is set late data will simply be * dropped. */ private OutputTag<T> lateDataOutputTag; @PublicEvolving public WindowedStream(KeyedStream<T, K> input, WindowAssigner<? super T, W> windowAssigner) { this.input = input; this.windowAssigner = windowAssigner; this.trigger = windowAssigner.getDefaultTrigger(input.getExecutionEnvironment()); } @PublicEvolving public WindowedStream<T, K, W> trigger(Trigger<? super T, ? super W> trigger) { if (windowAssigner instanceof MergingWindowAssigner && !trigger.canMerge()) { throw new UnsupportedOperationException("A merging window assigner cannot be used with a trigger that does not support merging."); } if (windowAssigner instanceof BaseAlignedWindowAssigner) { throw new UnsupportedOperationException("Cannot use a " + windowAssigner.getClass().getSimpleName() + " with a custom trigger."); } this.trigger = trigger; return this; } @PublicEvolving public WindowedStream<T, K, W> allowedLateness(Time lateness) { final long millis = lateness.toMilliseconds(); checkArgument(millis >= 0, "The allowed lateness cannot be negative."); this.allowedLateness = millis; return this; } @PublicEvolving public WindowedStream<T, K, W> sideOutputLateData(OutputTag<T> outputTag) { Preconditions.checkNotNull(outputTag, "Side output tag must not be null."); this.lateDataOutputTag = input.getExecutionEnvironment().clean(outputTag); return this; } @PublicEvolving public WindowedStream<T, K, W> evictor(Evictor<? super T, ? super W> evictor) { if (windowAssigner instanceof BaseAlignedWindowAssigner) { throw new UnsupportedOperationException("Cannot use a " + windowAssigner.getClass().getSimpleName() + " with an Evictor."); } this.evictor = evictor; return this; } // ------------------------------------------------------------------------ // Operations on the keyed windows // ------------------------------------------------------------------------ //...... }
- WindowedStream有几个参数,其中构造器要求的是input及windowAssigner参数,然后还有Trigger、Evictor、allowedLateness、OutputTag这几个可选参数;另外还必须设置operation function,主要有ReduceFunction、AggregateFunction、FoldFunction(
废弃
)、ProcessWindowFunction这几个 - windowAssigner主要用来决定元素如何划分到window中,这里主要有TumblingEventTimeWindows/TumblingProcessingTimeWindows、SlidingEventTimeWindows/SlidingProcessingTimeWindows、EventTimeSessionWindows/ProcessingTimeSessionWindows、GlobalWindows这几个
- Trigger用来触发window的发射,Evictor用来在发射window的时候剔除元素,allowedLateness用于指定允许元素落后于watermark的最大时间,超出则被丢弃(
仅仅对于event-time window有效
),OutputTag用于将late数据输出到side output,可以通过SingleOutputStreamOperator.getSideOutput(OutputTag)方法来获取
小结
- window操作是处理无限数据流的核心,它将数据流分割为有限大小的buckets,然后就可以在这些有限数据上进行相关的操作。flink的window操作主要分为两大类,一类是针对KeyedStream的window操作,一个是针对non-key stream的windowAll操作
- window操作主要有几个参数,WindowAssigner是必不可少的参数,主要有TumblingEventTimeWindows/TumblingProcessingTimeWindows、SlidingEventTimeWindows/SlidingProcessingTimeWindows、EventTimeSessionWindows/ProcessingTimeSessionWindows、GlobalWindows这几个;另外还必须设置operation function,主要有ReduceFunction、AggregateFunction、FoldFunction(
废弃
)、ProcessWindowFunction这几个 - Trigger、Evictor、allowedLateness、OutputTag这几个为可选参数,Trigger用来触发window的发射,Evictor用来在发射window的时候剔除元素,allowedLateness用于指定允许元素落后于watermark的最大时间,超出则被丢弃(
仅仅对于event-time window有效
),OutputTag用于将late数据输出到side output,可以通过SingleOutputStreamOperator.getSideOutput(OutputTag)方法来获取
doc
相关推荐
raidtest 2020-10-09
匆匆那些年 2020-06-27
oXiaoChong 2020-06-20
yuchuanchen 2020-06-16
Spark高级玩法 2020-06-14
Leonwey 2020-06-11
Spark高级玩法 2020-06-09
文报 2020-06-09
xorxos 2020-06-07
xiaoyutongxue 2020-05-27
yuchuanchen 2020-05-27
阿尼古 2020-05-26
千慧 2020-05-18
yuchuanchen 2020-05-17
yuchuanchen 2020-05-16
Spark高级玩法 2020-05-11
yuchuanchen 2020-05-11