聊聊flink KeyedStream的intervalJoin操作
序
本文主要研究一下flink KeyedStream的intervalJoin操作
实例
DataStream<Integer> orangeStream = ... DataStream<Integer> greenStream = ... orangeStream .keyBy(<KeySelector>) .intervalJoin(greenStream.keyBy(<KeySelector>)) .between(Time.milliseconds(-2), Time.milliseconds(1)) .process (new ProcessJoinFunction<Integer, Integer, String(){ @Override public void processElement(Integer left, Integer right, Context ctx, Collector<String> out) { out.collect(first + "," + second); } });
KeyedStream.intervalJoin
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/KeyedStream.java
@Public public class KeyedStream<T, KEY> extends DataStream<T> { //...... @PublicEvolving public <T1> IntervalJoin<T, T1, KEY> intervalJoin(KeyedStream<T1, KEY> otherStream) { return new IntervalJoin<>(this, otherStream); } //...... }
- KeyedStream的intervalJoin创建并返回IntervalJoin
IntervalJoin
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/KeyedStream.java
@PublicEvolving public static class IntervalJoin<T1, T2, KEY> { private final KeyedStream<T1, KEY> streamOne; private final KeyedStream<T2, KEY> streamTwo; IntervalJoin( KeyedStream<T1, KEY> streamOne, KeyedStream<T2, KEY> streamTwo ) { this.streamOne = checkNotNull(streamOne); this.streamTwo = checkNotNull(streamTwo); } @PublicEvolving public IntervalJoined<T1, T2, KEY> between(Time lowerBound, Time upperBound) { TimeCharacteristic timeCharacteristic = streamOne.getExecutionEnvironment().getStreamTimeCharacteristic(); if (timeCharacteristic != TimeCharacteristic.EventTime) { throw new UnsupportedTimeCharacteristicException("Time-bounded stream joins are only supported in event time"); } checkNotNull(lowerBound, "A lower bound needs to be provided for a time-bounded join"); checkNotNull(upperBound, "An upper bound needs to be provided for a time-bounded join"); return new IntervalJoined<>( streamOne, streamTwo, lowerBound.toMilliseconds(), upperBound.toMilliseconds(), true, true ); } }
- IntervalJoin提供了between操作,用于设置interval的lowerBound及upperBound,这里可以看到between方法里头对非TimeCharacteristic.EventTime的直接抛出UnsupportedTimeCharacteristicException;between操作创建并返回IntervalJoined
IntervalJoined
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/KeyedStream.java
@PublicEvolving public static class IntervalJoined<IN1, IN2, KEY> { private final KeyedStream<IN1, KEY> left; private final KeyedStream<IN2, KEY> right; private final long lowerBound; private final long upperBound; private final KeySelector<IN1, KEY> keySelector1; private final KeySelector<IN2, KEY> keySelector2; private boolean lowerBoundInclusive; private boolean upperBoundInclusive; public IntervalJoined( KeyedStream<IN1, KEY> left, KeyedStream<IN2, KEY> right, long lowerBound, long upperBound, boolean lowerBoundInclusive, boolean upperBoundInclusive) { this.left = checkNotNull(left); this.right = checkNotNull(right); this.lowerBound = lowerBound; this.upperBound = upperBound; this.lowerBoundInclusive = lowerBoundInclusive; this.upperBoundInclusive = upperBoundInclusive; this.keySelector1 = left.getKeySelector(); this.keySelector2 = right.getKeySelector(); } @PublicEvolving public IntervalJoined<IN1, IN2, KEY> upperBoundExclusive() { this.upperBoundInclusive = false; return this; } @PublicEvolving public IntervalJoined<IN1, IN2, KEY> lowerBoundExclusive() { this.lowerBoundInclusive = false; return this; } @PublicEvolving public <OUT> SingleOutputStreamOperator<OUT> process(ProcessJoinFunction<IN1, IN2, OUT> processJoinFunction) { Preconditions.checkNotNull(processJoinFunction); final TypeInformation<OUT> outputType = TypeExtractor.getBinaryOperatorReturnType( processJoinFunction, ProcessJoinFunction.class, 0, 1, 2, TypeExtractor.NO_INDEX, left.getType(), right.getType(), Utils.getCallLocationName(), true ); return process(processJoinFunction, outputType); } @PublicEvolving public <OUT> SingleOutputStreamOperator<OUT> process( ProcessJoinFunction<IN1, IN2, OUT> processJoinFunction, TypeInformation<OUT> outputType) { Preconditions.checkNotNull(processJoinFunction); Preconditions.checkNotNull(outputType); final ProcessJoinFunction<IN1, IN2, OUT> cleanedUdf = left.getExecutionEnvironment().clean(processJoinFunction); final IntervalJoinOperator<KEY, IN1, IN2, OUT> operator = new IntervalJoinOperator<>( lowerBound, upperBound, lowerBoundInclusive, upperBoundInclusive, left.getType().createSerializer(left.getExecutionConfig()), right.getType().createSerializer(right.getExecutionConfig()), cleanedUdf ); return left .connect(right) .keyBy(keySelector1, keySelector2) .transform("Interval Join", outputType, operator); } }
- IntervalJoined默认对lowerBound及upperBound是inclusive的,它也提供了lowerBoundExclusive、upperBoundExclusive来单独设置为exclusive;IntervalJoined提供了process操作,接收的是ProcessJoinFunction;process操作里头创建了IntervalJoinOperator,然后执行left.connect(right).keyBy(keySelector1, keySelector2).transform("Interval Join", outputType, operator),返回的是SingleOutputStreamOperator(
本实例left为orangeStream,right为greenStream
)
ProcessJoinFunction
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/functions/co/ProcessJoinFunction.java
@PublicEvolving public abstract class ProcessJoinFunction<IN1, IN2, OUT> extends AbstractRichFunction { private static final long serialVersionUID = -2444626938039012398L; public abstract void processElement(IN1 left, IN2 right, Context ctx, Collector<OUT> out) throws Exception; public abstract class Context { public abstract long getLeftTimestamp(); public abstract long getRightTimestamp(); public abstract long getTimestamp(); public abstract <X> void output(OutputTag<X> outputTag, X value); } }
- ProcessJoinFunction继承了AbstractRichFunction,它定义了processElement抽象方法,同时也定义了自身的Context对象,该对象定义了getLeftTimestamp、getRightTimestamp、getTimestamp、output四个抽象方法
IntervalJoinOperator
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java
@Internal public class IntervalJoinOperator<K, T1, T2, OUT> extends AbstractUdfStreamOperator<OUT, ProcessJoinFunction<T1, T2, OUT>> implements TwoInputStreamOperator<T1, T2, OUT>, Triggerable<K, String> { private static final long serialVersionUID = -5380774605111543454L; private static final Logger logger = LoggerFactory.getLogger(IntervalJoinOperator.class); private static final String LEFT_BUFFER = "LEFT_BUFFER"; private static final String RIGHT_BUFFER = "RIGHT_BUFFER"; private static final String CLEANUP_TIMER_NAME = "CLEANUP_TIMER"; private static final String CLEANUP_NAMESPACE_LEFT = "CLEANUP_LEFT"; private static final String CLEANUP_NAMESPACE_RIGHT = "CLEANUP_RIGHT"; private final long lowerBound; private final long upperBound; private final TypeSerializer<T1> leftTypeSerializer; private final TypeSerializer<T2> rightTypeSerializer; private transient MapState<Long, List<BufferEntry<T1>>> leftBuffer; private transient MapState<Long, List<BufferEntry<T2>>> rightBuffer; private transient TimestampedCollector<OUT> collector; private transient ContextImpl context; private transient InternalTimerService<String> internalTimerService; public IntervalJoinOperator( long lowerBound, long upperBound, boolean lowerBoundInclusive, boolean upperBoundInclusive, TypeSerializer<T1> leftTypeSerializer, TypeSerializer<T2> rightTypeSerializer, ProcessJoinFunction<T1, T2, OUT> udf) { super(Preconditions.checkNotNull(udf)); Preconditions.checkArgument(lowerBound <= upperBound, "lowerBound <= upperBound must be fulfilled"); // Move buffer by +1 / -1 depending on inclusiveness in order not needing // to check for inclusiveness later on this.lowerBound = (lowerBoundInclusive) ? lowerBound : lowerBound + 1L; this.upperBound = (upperBoundInclusive) ? upperBound : upperBound - 1L; this.leftTypeSerializer = Preconditions.checkNotNull(leftTypeSerializer); this.rightTypeSerializer = Preconditions.checkNotNull(rightTypeSerializer); } @Override public void open() throws Exception { super.open(); collector = new TimestampedCollector<>(output); context = new ContextImpl(userFunction); internalTimerService = getInternalTimerService(CLEANUP_TIMER_NAME, StringSerializer.INSTANCE, this); } @Override public void initializeState(StateInitializationContext context) throws Exception { super.initializeState(context); this.leftBuffer = context.getKeyedStateStore().getMapState(new MapStateDescriptor<>( LEFT_BUFFER, LongSerializer.INSTANCE, new ListSeriawelizer<>(new BufferEntrySerializer<>(leftTypeSerializer)) )); this.rightBuffer = context.getKeyedStateStore().getMapState(new MapStateDescriptor<>( RIGHT_BUFFER, LongSerializer.INSTANCE, new ListSerializer<>(new BufferEntrySerializer<>(rightTypeSerializer)) )); } @Override public void processElement1(StreamRecord<T1> record) throws Exception { processElement(record, leftBuffer, rightBuffer, lowerBound, upperBound, true); } @Override public void processElement2(StreamRecord<T2> record) throws Exception { processElement(record, rightBuffer, leftBuffer, -upperBound, -lowerBound, false); } @SuppressWarnings("unchecked") private <THIS, OTHER> void processElement( final StreamRecord<THIS> record, final MapState<Long, List<IntervalJoinOperator.BufferEntry<THIS>>> ourBuffer, final MapState<Long, List<IntervalJoinOperator.BufferEntry<OTHER>>> otherBuffer, final long relativeLowerBound, final long relativeUpperBound, final boolean isLeft) throws Exception { final THIS ourValue = record.getValue(); final long ourTimestamp = record.getTimestamp(); if (ourTimestamp == Long.MIN_VALUE) { throw new FlinkException("Long.MIN_VALUE timestamp: Elements used in " + "interval stream joins need to have timestamps meaningful timestamps."); } if (isLate(ourTimestamp)) { return; } addToBuffer(ourBuffer, ourValue, ourTimestamp); for (Map.Entry<Long, List<BufferEntry<OTHER>>> bucket: otherBuffer.entries()) { final long timestamp = bucket.getKey(); if (timestamp < ourTimestamp + relativeLowerBound || timestamp > ourTimestamp + relativeUpperBound) { continue; } for (BufferEntry<OTHER> entry: bucket.getValue()) { if (isLeft) { collect((T1) ourValue, (T2) entry.element, ourTimestamp, timestamp); } else { collect((T1) entry.element, (T2) ourValue, timestamp, ourTimestamp); } } } long cleanupTime = (relativeUpperBound > 0L) ? ourTimestamp + relativeUpperBound : ourTimestamp; if (isLeft) { internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_LEFT, cleanupTime); } else { internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_RIGHT, cleanupTime); } } private boolean isLate(long timestamp) { long currentWatermark = internalTimerService.currentWatermark(); return currentWatermark != Long.MIN_VALUE && timestamp < currentWatermark; } private void collect(T1 left, T2 right, long leftTimestamp, long rightTimestamp) throws Exception { final long resultTimestamp = Math.max(leftTimestamp, rightTimestamp); collector.setAbsoluteTimestamp(resultTimestamp); context.updateTimestamps(leftTimestamp, rightTimestamp, resultTimestamp); userFunction.processElement(left, right, context, collector); } @Override public void onEventTime(InternalTimer<K, String> timer) throws Exception { long timerTimestamp = timer.getTimestamp(); String namespace = timer.getNamespace(); logger.trace("onEventTime @ {}", timerTimestamp); switch (namespace) { case CLEANUP_NAMESPACE_LEFT: { long timestamp = (upperBound <= 0L) ? timerTimestamp : timerTimestamp - upperBound; logger.trace("Removing from left buffer @ {}", timestamp); leftBuffer.remove(timestamp); break; } case CLEANUP_NAMESPACE_RIGHT: { long timestamp = (lowerBound <= 0L) ? timerTimestamp + lowerBound : timerTimestamp; logger.trace("Removing from right buffer @ {}", timestamp); rightBuffer.remove(timestamp); break; } default: throw new RuntimeException("Invalid namespace " + namespace); } } @Override public void onProcessingTime(InternalTimer<K, String> timer) throws Exception { // do nothing. } //...... }
- IntervalJoinOperator继承了AbstractUdfStreamOperator抽象类,实现了TwoInputStreamOperator及Triggerable接口
- IntervalJoinOperator覆盖了AbstractUdfStreamOperator(
StreamOperator定义
)的open、initializeState方法,它在open方法里头创建了InternalTimerService,传递的Triggerable参数为this,即自身实现的Triggerable接口;在initializeState方法里头创建了leftBuffer和rightBuffer两个MapState - IntervalJoinOperator实现了TwoInputStreamOperator接口定义的processElement1、processElement2方法(
TwoInputStreamOperator接口定义的其他一些方法在AbstractUdfStreamOperator的父类AbstractStreamOperator中有实现
);processElement1、processElement2方法内部都调用了processElement方法,只是传递的relativeLowerBound、relativeUpperBound、isLeft参数不同以及leftBuffer和rightBuffer的传参顺序不同 - processElement方法里头实现了intervalJoin的时间匹配逻辑,它会从internalTimerService获取currentWatermark,然后判断element是否late,如果late直接返回,否则继续往下执行;之后就是把element的value添加到ourBuffer中(
对于processElement1来说ourBuffer为leftBuffer,对于processElement2来说ourBuffer为rightBuffer
);之后就是遍历otherBuffer中的每个元素,挨个判断时间是否满足要求(即ourTimestamp + relativeLowerBound <= timestamp <= ourTimestamp + relativeUpperBound
),不满足要求的直接跳过,满足要求的就调用collect方法(collect方法里头执行的是userFunction.processElement,即调用用户定义的ProcessJoinFunction的processElement方法
);之后就是计算cleanupTime,调用internalTimerService.registerEventTimeTimer注册清理该element的timer - IntervalJoinOperator实现了Triggerable接口定义的onEventTime及onProcessingTime方法,其中onProcessingTime不做任何操作,而onEventTime则会根据timestamp清理leftBuffer或者rightBuffer中的element
小结
- flink的intervalJoin操作要求是KeyedStream,而且必须是TimeCharacteristic.EventTime;KeyedStream的intervalJoin创建并返回IntervalJoin;IntervalJoin提供了between操作,用于设置interval的lowerBound及upperBound,该操作创建并返回IntervalJoined
- IntervalJoined提供了process操作,接收的是ProcessJoinFunction;process操作里头创建了IntervalJoinOperator,然后执行left.connect(right).keyBy(keySelector1, keySelector2).transform("Interval Join", outputType, operator),返回的是SingleOutputStreamOperator
- IntervalJoinOperator继承了AbstractUdfStreamOperator抽象类,实现了TwoInputStreamOperator及Triggerable接口;它覆盖了AbstractUdfStreamOperator(
StreamOperator定义
)的open、initializeState方法,它在open方法里头创建了InternalTimerService,传递的Triggerable参数为this,即自身实现的Triggerable接口;在initializeState方法里头创建了leftBuffer和rightBuffer两个MapState;它实现了TwoInputStreamOperator接口定义的processElement1、processElement2方法,processElement1、processElement2方法内部都调用了processElement方法,只是传递的relativeLowerBound、relativeUpperBound、isLeft参数不同以及leftBuffer和rightBuffer的传参顺序不同 - IntervalJoinOperator的processElement方法里头实现了intervalJoin的时间匹配逻辑,它首先判断element是否late,如果late直接返回,之后将element添加到buffer中,然后对之后就是遍历otherBuffer中的每个元素,挨个判断时间是否满足要求(
即ourTimestamp + relativeLowerBound <= timestamp <= ourTimestamp + relativeUpperBound
),不满足要求的直接跳过,满足要求的就调用collect方法(collect方法里头执行的是userFunction.processElement,即调用用户定义的ProcessJoinFunction的processElement方法
);之后就是计算cleanupTime,调用internalTimerService.registerEventTimeTimer注册清理该element的timer - IntervalJoinOperator实现了Triggerable接口定义的onEventTime及onProcessingTime方法,其中onProcessingTime不做任何操作,而onEventTime则会根据timestamp清理leftBuffer或者rightBuffer中的element
doc
相关推荐
raidtest 2020-10-09
匆匆那些年 2020-06-27
oXiaoChong 2020-06-20
yuchuanchen 2020-06-16
Spark高级玩法 2020-06-14
Leonwey 2020-06-11
Spark高级玩法 2020-06-09
文报 2020-06-09
xorxos 2020-06-07
xiaoyutongxue 2020-05-27
yuchuanchen 2020-05-27
阿尼古 2020-05-26
千慧 2020-05-18
yuchuanchen 2020-05-17
yuchuanchen 2020-05-16
Spark高级玩法 2020-05-11
yuchuanchen 2020-05-11