聊聊flink DataStream的connect操作
序
本文主要研究一下flink DataStream的connect操作
DataStream.connect
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/DataStream.java
@Public public class DataStream<T> { //...... public <R> ConnectedStreams<T, R> connect(DataStream<R> dataStream) { return new ConnectedStreams<>(environment, this, dataStream); } @PublicEvolving public <R> BroadcastConnectedStream<T, R> connect(BroadcastStream<R> broadcastStream) { return new BroadcastConnectedStream<>( environment, this, Preconditions.checkNotNull(broadcastStream), broadcastStream.getBroadcastStateDescriptor()); } //...... }
- DataStream的connect操作创建的是ConnectedStreams或BroadcastConnectedStream,它用了两个泛型,即不要求两个dataStream的element是同一类型
ConnectedStreams
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
@Public public class ConnectedStreams<IN1, IN2> { protected final StreamExecutionEnvironment environment; protected final DataStream<IN1> inputStream1; protected final DataStream<IN2> inputStream2; protected ConnectedStreams(StreamExecutionEnvironment env, DataStream<IN1> input1, DataStream<IN2> input2) { this.environment = requireNonNull(env); this.inputStream1 = requireNonNull(input1); this.inputStream2 = requireNonNull(input2); } public StreamExecutionEnvironment getExecutionEnvironment() { return environment; } public DataStream<IN1> getFirstInput() { return inputStream1; } public DataStream<IN2> getSecondInput() { return inputStream2; } public TypeInformation<IN1> getType1() { return inputStream1.getType(); } public TypeInformation<IN2> getType2() { return inputStream2.getType(); } public ConnectedStreams<IN1, IN2> keyBy(int keyPosition1, int keyPosition2) { return new ConnectedStreams<>(this.environment, inputStream1.keyBy(keyPosition1), inputStream2.keyBy(keyPosition2)); } public ConnectedStreams<IN1, IN2> keyBy(int[] keyPositions1, int[] keyPositions2) { return new ConnectedStreams<>(environment, inputStream1.keyBy(keyPositions1), inputStream2.keyBy(keyPositions2)); } public ConnectedStreams<IN1, IN2> keyBy(String field1, String field2) { return new ConnectedStreams<>(environment, inputStream1.keyBy(field1), inputStream2.keyBy(field2)); } public ConnectedStreams<IN1, IN2> keyBy(String[] fields1, String[] fields2) { return new ConnectedStreams<>(environment, inputStream1.keyBy(fields1), inputStream2.keyBy(fields2)); } public ConnectedStreams<IN1, IN2> keyBy(KeySelector<IN1, ?> keySelector1, KeySelector<IN2, ?> keySelector2) { return new ConnectedStreams<>(environment, inputStream1.keyBy(keySelector1), inputStream2.keyBy(keySelector2)); } public <KEY> ConnectedStreams<IN1, IN2> keyBy( KeySelector<IN1, KEY> keySelector1, KeySelector<IN2, KEY> keySelector2, TypeInformation<KEY> keyType) { return new ConnectedStreams<>( environment, inputStream1.keyBy(keySelector1, keyType), inputStream2.keyBy(keySelector2, keyType)); } public <R> SingleOutputStreamOperator<R> map(CoMapFunction<IN1, IN2, R> coMapper) { TypeInformation<R> outTypeInfo = TypeExtractor.getBinaryOperatorReturnType( coMapper, CoMapFunction.class, 0, 1, 2, TypeExtractor.NO_INDEX, getType1(), getType2(), Utils.getCallLocationName(), true); return transform("Co-Map", outTypeInfo, new CoStreamMap<>(inputStream1.clean(coMapper))); } public <R> SingleOutputStreamOperator<R> flatMap( CoFlatMapFunction<IN1, IN2, R> coFlatMapper) { TypeInformation<R> outTypeInfo = TypeExtractor.getBinaryOperatorReturnType( coFlatMapper, CoFlatMapFunction.class, 0, 1, 2, TypeExtractor.NO_INDEX, getType1(), getType2(), Utils.getCallLocationName(), true); return transform("Co-Flat Map", outTypeInfo, new CoStreamFlatMap<>(inputStream1.clean(coFlatMapper))); } @PublicEvolving public <R> SingleOutputStreamOperator<R> process( CoProcessFunction<IN1, IN2, R> coProcessFunction) { TypeInformation<R> outTypeInfo = TypeExtractor.getBinaryOperatorReturnType( coProcessFunction, CoProcessFunction.class, 0, 1, 2, TypeExtractor.NO_INDEX, getType1(), getType2(), Utils.getCallLocationName(), true); return process(coProcessFunction, outTypeInfo); } @Internal public <R> SingleOutputStreamOperator<R> process( CoProcessFunction<IN1, IN2, R> coProcessFunction, TypeInformation<R> outputType) { TwoInputStreamOperator<IN1, IN2, R> operator; if ((inputStream1 instanceof KeyedStream) && (inputStream2 instanceof KeyedStream)) { operator = new KeyedCoProcessOperator<>(inputStream1.clean(coProcessFunction)); } else { operator = new CoProcessOperator<>(inputStream1.clean(coProcessFunction)); } return transform("Co-Process", outputType, operator); } @PublicEvolving public <R> SingleOutputStreamOperator<R> transform(String functionName, TypeInformation<R> outTypeInfo, TwoInputStreamOperator<IN1, IN2, R> operator) { // read the output type of the input Transforms to coax out errors about MissingTypeInfo inputStream1.getType(); inputStream2.getType(); TwoInputTransformation<IN1, IN2, R> transform = new TwoInputTransformation<>( inputStream1.getTransformation(), inputStream2.getTransformation(), functionName, operator, outTypeInfo, environment.getParallelism()); if (inputStream1 instanceof KeyedStream && inputStream2 instanceof KeyedStream) { KeyedStream<IN1, ?> keyedInput1 = (KeyedStream<IN1, ?>) inputStream1; KeyedStream<IN2, ?> keyedInput2 = (KeyedStream<IN2, ?>) inputStream2; TypeInformation<?> keyType1 = keyedInput1.getKeyType(); TypeInformation<?> keyType2 = keyedInput2.getKeyType(); if (!(keyType1.canEqual(keyType2) && keyType1.equals(keyType2))) { throw new UnsupportedOperationException("Key types if input KeyedStreams " + "don't match: " + keyType1 + " and " + keyType2 + "."); } transform.setStateKeySelectors(keyedInput1.getKeySelector(), keyedInput2.getKeySelector()); transform.setStateKeyType(keyType1); } @SuppressWarnings({ "unchecked", "rawtypes" }) SingleOutputStreamOperator<R> returnStream = new SingleOutputStreamOperator(environment, transform); getExecutionEnvironment().addOperator(transform); return returnStream; } }
- ConnectedStreams提供了keyBy方法用于指定两个stream的keySelector,提供了map、flatMap、process、transform操作,其中前三个操作最后都是调用transform操作
- transform操作接收TwoInputStreamOperator类型的operator,然后转换为SingleOutputStreamOperator
- map操作接收CoMapFunction,flatMap操作接收CoFlatMapFunction,process操作接收CoProcessFunction
CoMapFunction
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/functions/co/CoMapFunction.java
@Public public interface CoMapFunction<IN1, IN2, OUT> extends Function, Serializable { OUT map1(IN1 value) throws Exception; OUT map2(IN2 value) throws Exception; }
- CoMapFunction继承了Function,它定义了map1、map2方法
CoFlatMapFunction
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/functions/co/CoFlatMapFunction.java
@Public public interface CoFlatMapFunction<IN1, IN2, OUT> extends Function, Serializable { void flatMap1(IN1 value, Collector<OUT> out) throws Exception; void flatMap2(IN2 value, Collector<OUT> out) throws Exception; }
- CoFlatMapFunction继承了Function,它定义了map1、map2方法,与CoMapFunction不同的是,CoFlatMapFunction的map1、map2方法多了Collector参数
CoProcessFunction
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/functions/co/CoProcessFunction.java
@PublicEvolving public abstract class CoProcessFunction<IN1, IN2, OUT> extends AbstractRichFunction { private static final long serialVersionUID = 1L; public abstract void processElement1(IN1 value, Context ctx, Collector<OUT> out) throws Exception; public abstract void processElement2(IN2 value, Context ctx, Collector<OUT> out) throws Exception; public void onTimer(long timestamp, OnTimerContext ctx, Collector<OUT> out) throws Exception {} public abstract class Context { public abstract Long timestamp(); public abstract TimerService timerService(); public abstract <X> void output(OutputTag<X> outputTag, X value); } public abstract class OnTimerContext extends Context { /** * The {@link TimeDomain} of the firing timer. */ public abstract TimeDomain timeDomain(); } }
- CoProcessFunction继承了AbstractRichFunction,它定义了processElement1、processElement2方法,与CoFlatMapFunction不同的是,它定义的这两个方法多了Context参数
- CoProcessFunction定义了Context及OnTimerContext,在processElement1、processElement2方法可以访问到Context,Context提供了timestamp、timerService、output方法
- CoProcessFunction与CoFlatMapFunction不同的另外一点是它可以使用TimerService来注册timer,然后在onTimer方法里头实现响应的逻辑
小结
- DataStream的connect操作创建的是ConnectedStreams或BroadcastConnectedStream,它用了两个泛型,即不要求两个dataStream的element是同一类型
- ConnectedStreams提供了keyBy方法用于指定两个stream的keySelector,提供了map、flatMap、process、transform操作,其中前三个操作最后都是调用transform操作;transform操作接收TwoInputStreamOperator类型的operator,然后转换为SingleOutputStreamOperator;map操作接收CoMapFunction,flatMap操作接收CoFlatMapFunction,process操作接收CoProcessFunction
- CoFlatMapFunction与CoMapFunction不同的是,CoFlatMapFunction的map1、map2方法多了Collector参数;CoProcessFunction定义了processElement1、processElement2方法,与CoFlatMapFunction不同的是,它定义的这两个方法多了Context参数;CoProcessFunction与CoFlatMapFunction不同的另外一点是它可以使用TimerService来注册timer,然后在onTimer方法里头实现响应的逻辑
doc
相关推荐
raidtest 2020-10-09
匆匆那些年 2020-06-27
oXiaoChong 2020-06-20
yuchuanchen 2020-06-16
Spark高级玩法 2020-06-14
Leonwey 2020-06-11
Spark高级玩法 2020-06-09
文报 2020-06-09
xorxos 2020-06-07
xiaoyutongxue 2020-05-27
yuchuanchen 2020-05-27
阿尼古 2020-05-26
千慧 2020-05-18
yuchuanchen 2020-05-17
yuchuanchen 2020-05-16
Spark高级玩法 2020-05-11
yuchuanchen 2020-05-11