聊聊flink TableEnvironment的scan操作
序
本文主要研究一下flink TableEnvironment的scan操作
实例
//Scanning a directly registered table val tab: Table = tableEnv.scan("tableName") //Scanning a table from a registered catalog val tab: Table = tableEnv.scan("catalogName", "dbName", "tableName")
- scan操作用于从schema读取指定的table,也可以传入catalogName及dbName从指定的catalog及db读取
TableEnvironment.scan
flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/api/TableEnvironment.scala
abstract class TableEnvironment(val config: TableConfig) { private val internalSchema: CalciteSchema = CalciteSchema.createRootSchema(false, false) private val rootSchema: SchemaPlus = internalSchema.plus() //...... @throws[TableException] @varargs def scan(tablePath: String*): Table = { scanInternal(tablePath.toArray) match { case Some(table) => table case None => throw new TableException(s"Table '${tablePath.mkString(".")}' was not found.") } } private[flink] def scanInternal(tablePath: Array[String]): Option[Table] = { require(tablePath != null && !tablePath.isEmpty, "tablePath must not be null or empty.") val schemaPaths = tablePath.slice(0, tablePath.length - 1) val schema = getSchema(schemaPaths) if (schema != null) { val tableName = tablePath(tablePath.length - 1) val table = schema.getTable(tableName) if (table != null) { return Some(new Table(this, CatalogNode(tablePath, table.getRowType(typeFactory)))) } } None } private def getSchema(schemaPath: Array[String]): SchemaPlus = { var schema = rootSchema for (schemaName <- schemaPath) { schema = schema.getSubSchema(schemaName) if (schema == null) { return schema } } schema } //...... }
- scan方法内部调用的是scanInternal,scanInternal首先读取catalog及db信息,然后调用getSchema方法来获取schema
- getSchema是使用SchemaPlus的getSubSchema来按层次获取SchemaPlus,如果没有指定catalog及db,那么这里返回的是rootSchema
- 获取到schema之后,就可以从tablePath数组获取tableName(
数组最后一个元素
),调用SchemaPlus的getTable方法查找Table
小结
- TableEnvironment的scan操作就是从Schema中查找Table,可以使用tableName,或者额外指定catalog及db来查找
- getSchema是使用SchemaPlus的getSubSchema来按层次获取SchemaPlus,如果没有指定catalog及db,那么这里返回的是rootSchema
- 获取到schema之后,就可以从tablePath数组获取tableName(
数组最后一个元素
),调用SchemaPlus的getTable方法查找Table
doc
相关推荐
xiaoyutongxue 2020-05-27
xorxos 2020-04-22
raidtest 2020-10-09
匆匆那些年 2020-06-27
oXiaoChong 2020-06-20
yuchuanchen 2020-06-16
Spark高级玩法 2020-06-14
Leonwey 2020-06-11
Spark高级玩法 2020-06-09
文报 2020-06-09
xorxos 2020-06-07
yuchuanchen 2020-05-27
阿尼古 2020-05-26
千慧 2020-05-18
yuchuanchen 2020-05-17
yuchuanchen 2020-05-16
Spark高级玩法 2020-05-11