聊聊flink的Global Window
序
本文主要研究一下flink的Global Window
GlobalWindow
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/windows/GlobalWindow.java
@PublicEvolving public class GlobalWindow extends Window { private static final GlobalWindow INSTANCE = new GlobalWindow(); private GlobalWindow() { } public static GlobalWindow get() { return INSTANCE; } @Override public long maxTimestamp() { return Long.MAX_VALUE; } @Override public boolean equals(Object o) { return this == o || !(o == null || getClass() != o.getClass()); } @Override public int hashCode() { return 0; } @Override public String toString() { return "GlobalWindow"; } /** * A {@link TypeSerializer} for {@link GlobalWindow}. */ public static class Serializer extends TypeSerializerSingleton<GlobalWindow> { private static final long serialVersionUID = 1L; @Override public boolean isImmutableType() { return true; } @Override public GlobalWindow createInstance() { return GlobalWindow.INSTANCE; } @Override public GlobalWindow copy(GlobalWindow from) { return from; } @Override public GlobalWindow copy(GlobalWindow from, GlobalWindow reuse) { return from; } @Override public int getLength() { return 0; } @Override public void serialize(GlobalWindow record, DataOutputView target) throws IOException { target.writeByte(0); } @Override public GlobalWindow deserialize(DataInputView source) throws IOException { source.readByte(); return GlobalWindow.INSTANCE; } @Override public GlobalWindow deserialize(GlobalWindow reuse, DataInputView source) throws IOException { source.readByte(); return GlobalWindow.INSTANCE; } @Override public void copy(DataInputView source, DataOutputView target) throws IOException { source.readByte(); target.writeByte(0); } @Override public boolean canEqual(Object obj) { return obj instanceof Serializer; } } }
- GlobalWindow继承了Window,它的maxTimestamp方法与TimeWindow不同,TimeWindow有start和end属性,其maxTimestamp方法返回的是end-1;而GlobalWindow的maxTimestamp方法返回的是Long.MAX_VALUE;GlobalWindow定义了自己的Serializer
GlobalWindows
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
@PublicEvolving public class GlobalWindows extends WindowAssigner<Object, GlobalWindow> { private static final long serialVersionUID = 1L; private GlobalWindows() {} @Override public Collection<GlobalWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) { return Collections.singletonList(GlobalWindow.get()); } @Override public Trigger<Object, GlobalWindow> getDefaultTrigger(StreamExecutionEnvironment env) { return new NeverTrigger(); } @Override public String toString() { return "GlobalWindows()"; } /** * Creates a new {@code GlobalWindows} {@link WindowAssigner} that assigns * all elements to the same {@link GlobalWindow}. * * @return The global window policy. */ public static GlobalWindows create() { return new GlobalWindows(); } /** * A trigger that never fires, as default Trigger for GlobalWindows. */ @Internal public static class NeverTrigger extends Trigger<Object, GlobalWindow> { private static final long serialVersionUID = 1L; @Override public TriggerResult onElement(Object element, long timestamp, GlobalWindow window, TriggerContext ctx) { return TriggerResult.CONTINUE; } @Override public TriggerResult onEventTime(long time, GlobalWindow window, TriggerContext ctx) { return TriggerResult.CONTINUE; } @Override public TriggerResult onProcessingTime(long time, GlobalWindow window, TriggerContext ctx) { return TriggerResult.CONTINUE; } @Override public void clear(GlobalWindow window, TriggerContext ctx) throws Exception {} @Override public void onMerge(GlobalWindow window, OnMergeContext ctx) { } } @Override public TypeSerializer<GlobalWindow> getWindowSerializer(ExecutionConfig executionConfig) { return new GlobalWindow.Serializer(); } @Override public boolean isEventTime() { return false; } }
- GlobalWindows继承了WindowAssigner,key类型为Object,窗口类型为GlobalWindow
- assignWindows方法返回的是GlobalWindow;getDefaultTrigger方法返回的是NeverTrigger;getWindowSerializer返回的是GlobalWindow.Serializer();isEventTime返回的为false
- NeverTrigger继承了Trigger,其onElement、onProcessingTime、onProcessingTime返回的TriggerResult均为TriggerResult.CONTINUE
小结
- GlobalWindows继承了WindowAssigner,key类型为Object,窗口类型为GlobalWindow;GlobalWindow继承了Window,它的maxTimestamp方法与TimeWindow不同,TimeWindow有start和end属性,其maxTimestamp方法返回的是end-1;而GlobalWindow的maxTimestamp方法返回的是Long.MAX_VALUE;GlobalWindow定义了自己的Serializer
- GlobalWindows的assignWindows方法返回的是GlobalWindow;getDefaultTrigger方法返回的是NeverTrigger;getWindowSerializer返回的是GlobalWindow.Serializer();isEventTime返回的为false
- NeverTrigger继承了Trigger,其onElement、onProcessingTime、onProcessingTime返回的TriggerResult均为TriggerResult.CONTINUE;该行为就是不做任何触发操作;如果需要触发操作,则需要在定义window操作时设置自定义的trigger,覆盖GlobalWindows默认的NeverTrigger
doc
相关推荐
raidtest 2020-10-09
匆匆那些年 2020-06-27
oXiaoChong 2020-06-20
yuchuanchen 2020-06-16
Spark高级玩法 2020-06-14
Leonwey 2020-06-11
Spark高级玩法 2020-06-09
文报 2020-06-09
xorxos 2020-06-07
xiaoyutongxue 2020-05-27
yuchuanchen 2020-05-27
阿尼古 2020-05-26
千慧 2020-05-18
yuchuanchen 2020-05-17
yuchuanchen 2020-05-16
Spark高级玩法 2020-05-11
yuchuanchen 2020-05-11