聊聊flink的JDBCAppendTableSink
序
本文主要研究一下flink的JDBCAppendTableSink
实例
JDBCAppendTableSink sink = JDBCAppendTableSink.builder() .setDrivername("org.apache.derby.jdbc.EmbeddedDriver") .setDBUrl("jdbc:derby:memory:ebookshop") .setQuery("INSERT INTO books (id) VALUES (?)") .setParameterTypes(INT_TYPE_INFO) .build(); tableEnv.registerTableSink( "jdbcOutputTable", // specify table schema new String[]{"id"}, new TypeInformation[]{Types.INT}, sink); Table table = ... table.insertInto("jdbcOutputTable");
- 这里使用tableEnv.registerTableSink注册JDBCAppendTableSink,之后利用table.insertInto往该sink写数据
JDBCAppendTableSink
flink-jdbc_2.11-1.7.0-sources.jar!/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSink.java
public class JDBCAppendTableSink implements AppendStreamTableSink<Row>, BatchTableSink<Row> { private final JDBCOutputFormat outputFormat; private String[] fieldNames; private TypeInformation[] fieldTypes; JDBCAppendTableSink(JDBCOutputFormat outputFormat) { this.outputFormat = outputFormat; } public static JDBCAppendTableSinkBuilder builder() { return new JDBCAppendTableSinkBuilder(); } @Override public void emitDataStream(DataStream<Row> dataStream) { dataStream .addSink(new JDBCSinkFunction(outputFormat)) .name(TableConnectorUtil.generateRuntimeName(this.getClass(), fieldNames)); } @Override public void emitDataSet(DataSet<Row> dataSet) { dataSet.output(outputFormat); } @Override public TypeInformation<Row> getOutputType() { return new RowTypeInfo(fieldTypes, fieldNames); } @Override public String[] getFieldNames() { return fieldNames; } @Override public TypeInformation<?>[] getFieldTypes() { return fieldTypes; } @Override public TableSink<Row> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) { int[] types = outputFormat.getTypesArray(); String sinkSchema = String.join(", ", IntStream.of(types).mapToObj(JDBCTypeUtil::getTypeName).collect(Collectors.toList())); String tableSchema = String.join(", ", Stream.of(fieldTypes).map(JDBCTypeUtil::getTypeName).collect(Collectors.toList())); String msg = String.format("Schema of output table is incompatible with JDBCAppendTableSink schema. " + "Table schema: [%s], sink schema: [%s]", tableSchema, sinkSchema); Preconditions.checkArgument(fieldTypes.length == types.length, msg); for (int i = 0; i < types.length; ++i) { Preconditions.checkArgument( JDBCTypeUtil.typeInformationToSqlType(fieldTypes[i]) == types[i], msg); } JDBCAppendTableSink copy; try { copy = new JDBCAppendTableSink(InstantiationUtil.clone(outputFormat)); } catch (IOException | ClassNotFoundException e) { throw new RuntimeException(e); } copy.fieldNames = fieldNames; copy.fieldTypes = fieldTypes; return copy; } @VisibleForTesting JDBCOutputFormat getOutputFormat() { return outputFormat; } }
- JDBCAppendTableSink实现了AppendStreamTableSink接口的emitDataStream方法以及BatchTableSink接口的emitDataSet方法;AppendStreamTableSink接口及BatchTableSink接口都继承自TableSink接口,该接口定义了getOutputType、getFieldNames、getFieldTypes、configure方法
- emitDataStream方法通过JDBCOutputFormat创建JDBCSinkFunction,然后输出到dataStream;emitDataSet方法则直接通过dataSet的output方法采用JDBCOutputFormat输出
- JDBCAppendTableSink提供了builder静态方法用于创建JDBCAppendTableSinkBuilder,可以用来构建JDBCAppendTableSink
JDBCAppendTableSinkBuilder
flink-jdbc_2.11-1.7.0-sources.jar!/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSinkBuilder.java
public class JDBCAppendTableSinkBuilder { private String username; private String password; private String driverName; private String dbURL; private String query; private int batchSize = DEFAULT_BATCH_INTERVAL; private int[] parameterTypes; /** * Specify the username of the JDBC connection. * @param username the username of the JDBC connection. */ public JDBCAppendTableSinkBuilder setUsername(String username) { this.username = username; return this; } /** * Specify the password of the JDBC connection. * @param password the password of the JDBC connection. */ public JDBCAppendTableSinkBuilder setPassword(String password) { this.password = password; return this; } /** * Specify the name of the JDBC driver that will be used. * @param drivername the name of the JDBC driver. */ public JDBCAppendTableSinkBuilder setDrivername(String drivername) { this.driverName = drivername; return this; } /** * Specify the URL of the JDBC database. * @param dbURL the URL of the database, whose format is specified by the * corresponding JDBC driver. */ public JDBCAppendTableSinkBuilder setDBUrl(String dbURL) { this.dbURL = dbURL; return this; } /** * Specify the query that the sink will execute. Usually user can specify * INSERT, REPLACE or UPDATE to push the data to the database. * @param query The query to be executed by the sink. * @see org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.JDBCOutputFormatBuilder#setQuery(String) */ public JDBCAppendTableSinkBuilder setQuery(String query) { this.query = query; return this; } /** * Specify the size of the batch. By default the sink will batch the query * to improve the performance * @param batchSize the size of batch */ public JDBCAppendTableSinkBuilder setBatchSize(int batchSize) { this.batchSize = batchSize; return this; } /** * Specify the type of the rows that the sink will be accepting. * @param types the type of each field */ public JDBCAppendTableSinkBuilder setParameterTypes(TypeInformation<?>... types) { int[] ty = new int[types.length]; for (int i = 0; i < types.length; ++i) { ty[i] = JDBCTypeUtil.typeInformationToSqlType(types[i]); } this.parameterTypes = ty; return this; } /** * Specify the type of the rows that the sink will be accepting. * @param types the type of each field defined by {@see java.sql.Types}. */ public JDBCAppendTableSinkBuilder setParameterTypes(int... types) { this.parameterTypes = types; return this; } /** * Finalizes the configuration and checks validity. * * @return Configured JDBCOutputFormat */ public JDBCAppendTableSink build() { Preconditions.checkNotNull(parameterTypes, "Types of the query parameters are not specified." + " Please specify types using the setParameterTypes() method."); JDBCOutputFormat format = JDBCOutputFormat.buildJDBCOutputFormat() .setUsername(username) .setPassword(password) .setDBUrl(dbURL) .setQuery(query) .setDrivername(driverName) .setBatchInterval(batchSize) .setSqlTypes(parameterTypes) .finish(); return new JDBCAppendTableSink(format); } }
- JDBCAppendTableSinkBuilder提供了setUsername、setPassword、setDrivername、setDBUrl、setQuery、setBatchSize、setParameterTypes方法用于设置构建JDBCOutputFormat的对应属性,最后build方法使用JDBCOutputFormat创建了JDBCAppendTableSink
小结
- JDBCAppendTableSink在开启checkpoint的情况下,它实现的是at-least-once的语义,如果要实现exactly-once的语义,则需要使用类似REPLACE或者INSERT OVERWRITE这类幂等的操作;JDBCAppendTableSink实现了AppendStreamTableSink接口的emitDataStream方法以及BatchTableSink接口的emitDataSet方法
- AppendStreamTableSink接口及BatchTableSink接口都继承自TableSink接口,该接口定义了getOutputType、getFieldNames、getFieldTypes、configure方法;emitDataStream方法通过JDBCOutputFormat创建JDBCSinkFunction,然后输出到dataStream;emitDataSet方法则直接通过dataSet的output方法采用JDBCOutputFormat输出;JDBCAppendTableSink提供了builder静态方法用于创建JDBCAppendTableSinkBuilder,可以用来构建JDBCAppendTableSink
- JDBCAppendTableSinkBuilder提供了setUsername、setPassword、setDrivername、setDBUrl、setQuery、setBatchSize、setParameterTypes方法用于设置构建JDBCOutputFormat的对应属性,最后build方法使用JDBCOutputFormat创建了JDBCAppendTableSink
doc
- JDBCAppendTableSink
- JDBCOutputFormat
- 聊聊flink的JDBCOutputFormat
相关推荐
raidtest 2020-10-09
匆匆那些年 2020-06-27
oXiaoChong 2020-06-20
yuchuanchen 2020-06-16
Spark高级玩法 2020-06-14
Leonwey 2020-06-11
Spark高级玩法 2020-06-09
文报 2020-06-09
xorxos 2020-06-07
xiaoyutongxue 2020-05-27
yuchuanchen 2020-05-27
阿尼古 2020-05-26
千慧 2020-05-18
yuchuanchen 2020-05-17
yuchuanchen 2020-05-16
Spark高级玩法 2020-05-11
yuchuanchen 2020-05-11