聊聊flink的CsvTableSource
序
本文主要研究一下flink的CsvTableSource
TableSource
flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/sources/TableSource.scala
trait TableSource[T] { /** Returns the [[TypeInformation]] for the return type of the [[TableSource]]. * The fields of the return type are mapped to the table schema based on their name. * * @return The type of the returned [[DataSet]] or [[DataStream]]. */ def getReturnType: TypeInformation[T] /** * Returns the schema of the produced table. * * @return The [[TableSchema]] of the produced table. */ def getTableSchema: TableSchema /** * Describes the table source. * * @return A String explaining the [[TableSource]]. */ def explainSource(): String = TableConnectorUtil.generateRuntimeName(getClass, getTableSchema.getFieldNames) }
- TableSource定义了三个方法,分别是getReturnType、getTableSchema、explainSource
BatchTableSource
flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/sources/BatchTableSource.scala
trait BatchTableSource[T] extends TableSource[T] { /** * Returns the data of the table as a [[DataSet]]. * * NOTE: This method is for internal use only for defining a [[TableSource]]. * Do not use it in Table API programs. */ def getDataSet(execEnv: ExecutionEnvironment): DataSet[T] }
- BatchTableSource继承了TableSource,它定义了getDataSet方法
StreamTableSource
flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/sources/StreamTableSource.scala
trait StreamTableSource[T] extends TableSource[T] { /** * Returns the data of the table as a [[DataStream]]. * * NOTE: This method is for internal use only for defining a [[TableSource]]. * Do not use it in Table API programs. */ def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[T] }
- StreamTableSource继承了TableSource,它定义了getDataStream方法
CsvTableSource
flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/sources/CsvTableSource.scala
class CsvTableSource private ( private val path: String, private val fieldNames: Array[String], private val fieldTypes: Array[TypeInformation[_]], private val selectedFields: Array[Int], private val fieldDelim: String, private val rowDelim: String, private val quoteCharacter: Character, private val ignoreFirstLine: Boolean, private val ignoreComments: String, private val lenient: Boolean) extends BatchTableSource[Row] with StreamTableSource[Row] with ProjectableTableSource[Row] { def this( path: String, fieldNames: Array[String], fieldTypes: Array[TypeInformation[_]], fieldDelim: String = CsvInputFormat.DEFAULT_FIELD_DELIMITER, rowDelim: String = CsvInputFormat.DEFAULT_LINE_DELIMITER, quoteCharacter: Character = null, ignoreFirstLine: Boolean = false, ignoreComments: String = null, lenient: Boolean = false) = { this( path, fieldNames, fieldTypes, fieldTypes.indices.toArray, // initially, all fields are returned fieldDelim, rowDelim, quoteCharacter, ignoreFirstLine, ignoreComments, lenient) } def this(path: String, fieldNames: Array[String], fieldTypes: Array[TypeInformation[_]]) = { this(path, fieldNames, fieldTypes, CsvInputFormat.DEFAULT_FIELD_DELIMITER, CsvInputFormat.DEFAULT_LINE_DELIMITER, null, false, null, false) } if (fieldNames.length != fieldTypes.length) { throw new TableException("Number of field names and field types must be equal.") } private val selectedFieldTypes = selectedFields.map(fieldTypes(_)) private val selectedFieldNames = selectedFields.map(fieldNames(_)) private val returnType: RowTypeInfo = new RowTypeInfo(selectedFieldTypes, selectedFieldNames) override def getDataSet(execEnv: ExecutionEnvironment): DataSet[Row] = { execEnv.createInput(createCsvInput(), returnType).name(explainSource()) } /** Returns the [[RowTypeInfo]] for the return type of the [[CsvTableSource]]. */ override def getReturnType: RowTypeInfo = returnType override def getDataStream(streamExecEnv: StreamExecutionEnvironment): DataStream[Row] = { streamExecEnv.createInput(createCsvInput(), returnType).name(explainSource()) } /** Returns the schema of the produced table. */ override def getTableSchema = new TableSchema(fieldNames, fieldTypes) /** Returns a copy of [[TableSource]] with ability to project fields */ override def projectFields(fields: Array[Int]): CsvTableSource = { val selectedFields = if (fields.isEmpty) Array(0) else fields new CsvTableSource( path, fieldNames, fieldTypes, selectedFields, fieldDelim, rowDelim, quoteCharacter, ignoreFirstLine, ignoreComments, lenient) } private def createCsvInput(): RowCsvInputFormat = { val inputFormat = new RowCsvInputFormat( new Path(path), selectedFieldTypes, rowDelim, fieldDelim, selectedFields) inputFormat.setSkipFirstLineAsHeader(ignoreFirstLine) inputFormat.setLenient(lenient) if (quoteCharacter != null) { inputFormat.enableQuotedStringParsing(quoteCharacter) } if (ignoreComments != null) { inputFormat.setCommentPrefix(ignoreComments) } inputFormat } override def equals(other: Any): Boolean = other match { case that: CsvTableSource => returnType == that.returnType && path == that.path && fieldDelim == that.fieldDelim && rowDelim == that.rowDelim && quoteCharacter == that.quoteCharacter && ignoreFirstLine == that.ignoreFirstLine && ignoreComments == that.ignoreComments && lenient == that.lenient case _ => false } override def hashCode(): Int = { returnType.hashCode() } override def explainSource(): String = { s"CsvTableSource(" + s"read fields: ${getReturnType.getFieldNames.mkString(", ")})" } }
- CsvTableSource同时实现了BatchTableSource及StreamTableSource接口;getDataSet方法使用ExecutionEnvironment.createInput创建DataSet;getDataStream方法使用StreamExecutionEnvironment.createInput创建DataStream
- ExecutionEnvironment.createInput及StreamExecutionEnvironment.createInput接收的InputFormat为RowCsvInputFormat,通过createCsvInput创建而来
- getTableSchema方法返回的TableSchema通过fieldNames及fieldTypes创建;getReturnType方法返回的RowTypeInfo通过selectedFieldTypes及selectedFieldNames创建;explainSource方法这里返回的是CsvTableSource开头的字符串
小结
- TableSource定义了三个方法,分别是getReturnType、getTableSchema、explainSource;BatchTableSource继承了TableSource,它定义了getDataSet方法;StreamTableSource继承了TableSource,它定义了getDataStream方法
- CsvTableSource同时实现了BatchTableSource及StreamTableSource接口;getDataSet方法使用ExecutionEnvironment.createInput创建DataSet;getDataStream方法使用StreamExecutionEnvironment.createInput创建DataStream
- ExecutionEnvironment.createInput及StreamExecutionEnvironment.createInput接收的InputFormat为RowCsvInputFormat,通过createCsvInput创建而来;getTableSchema方法返回的TableSchema通过fieldNames及fieldTypes创建;getReturnType方法返回的RowTypeInfo通过selectedFieldTypes及selectedFieldNames创建;explainSource方法这里返回的是CsvTableSource开头的字符串
doc
相关推荐
xiaoyutongxue 2020-05-27
raidtest 2020-10-09
匆匆那些年 2020-06-27
oXiaoChong 2020-06-20
yuchuanchen 2020-06-16
Spark高级玩法 2020-06-14
Leonwey 2020-06-11
Spark高级玩法 2020-06-09
文报 2020-06-09
xorxos 2020-06-07
yuchuanchen 2020-05-27
阿尼古 2020-05-26
千慧 2020-05-18
yuchuanchen 2020-05-17
yuchuanchen 2020-05-16
Spark高级玩法 2020-05-11
yuchuanchen 2020-05-11