聊聊flink的CsvTableSink
序
本文主要研究一下flink的CsvTableSink
TableSink
flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/sinks/TableSink.scala
trait TableSink[T] { /** * Returns the type expected by this [[TableSink]]. * * This type should depend on the types returned by [[getFieldNames]]. * * @return The type expected by this [[TableSink]]. */ def getOutputType: TypeInformation[T] /** Returns the names of the table fields. */ def getFieldNames: Array[String] /** Returns the types of the table fields. */ def getFieldTypes: Array[TypeInformation[_]] /** * Return a copy of this [[TableSink]] configured with the field names and types of the * [[Table]] to emit. * * @param fieldNames The field names of the table to emit. * @param fieldTypes The field types of the table to emit. * @return A copy of this [[TableSink]] configured with the field names and types of the * [[Table]] to emit. */ def configure(fieldNames: Array[String], fieldTypes: Array[TypeInformation[_]]): TableSink[T] }
- TableSink定义了getOutputType、getFieldNames、getFieldTypes、configure三个方法
BatchTableSink
flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/sinks/BatchTableSink.scala
trait BatchTableSink[T] extends TableSink[T] { /** Emits the DataSet. */ def emitDataSet(dataSet: DataSet[T]): Unit }
- BatchTableSink继承了TableSink,定义了emitDataSet方法
StreamTableSink
flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/sinks/StreamTableSink.scala
trait StreamTableSink[T] extends TableSink[T] { /** Emits the DataStream. */ def emitDataStream(dataStream: DataStream[T]): Unit }
- StreamTableSink继承了TableSink,定义了emitDataStream方法
TableSinkBase
flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/sinks/TableSinkBase.scala
trait TableSinkBase[T] extends TableSink[T] { private var fieldNames: Option[Array[String]] = None private var fieldTypes: Option[Array[TypeInformation[_]]] = None /** Return a deep copy of the [[TableSink]]. */ protected def copy: TableSinkBase[T] /** * Return the field names of the [[Table]] to emit. */ def getFieldNames: Array[String] = { fieldNames match { case Some(n) => n case None => throw new IllegalStateException( "TableSink must be configured to retrieve field names.") } } /** Return the field types of the [[Table]] to emit. */ def getFieldTypes: Array[TypeInformation[_]] = { fieldTypes match { case Some(t) => t case None => throw new IllegalStateException( "TableSink must be configured to retrieve field types.") } } /** * Return a copy of this [[TableSink]] configured with the field names and types of the * [[Table]] to emit. * * @param fieldNames The field names of the table to emit. * @param fieldTypes The field types of the table to emit. * @return A copy of this [[TableSink]] configured with the field names and types of the * [[Table]] to emit. */ final def configure(fieldNames: Array[String], fieldTypes: Array[TypeInformation[_]]): TableSink[T] = { val configuredSink = this.copy configuredSink.fieldNames = Some(fieldNames) configuredSink.fieldTypes = Some(fieldTypes) configuredSink } }
- TableSinkBase继承了TableSink,它实现了getFieldNames、getFieldTypes、configure方法
CsvTableSink
flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/sinks/CsvTableSink.scala
class CsvTableSink( path: String, fieldDelim: Option[String], numFiles: Option[Int], writeMode: Option[WriteMode]) extends TableSinkBase[Row] with BatchTableSink[Row] with AppendStreamTableSink[Row] { /** * A simple [[TableSink]] to emit data as CSV files. * * @param path The output path to write the Table to. * @param fieldDelim The field delimiter, ',' by default. */ def this(path: String, fieldDelim: String = ",") { this(path, Some(fieldDelim), None, None) } /** * A simple [[TableSink]] to emit data as CSV files. * * @param path The output path to write the Table to. * @param fieldDelim The field delimiter. * @param numFiles The number of files to write to. * @param writeMode The write mode to specify whether existing files are overwritten or not. */ def this(path: String, fieldDelim: String, numFiles: Int, writeMode: WriteMode) { this(path, Some(fieldDelim), Some(numFiles), Some(writeMode)) } override def emitDataSet(dataSet: DataSet[Row]): Unit = { val csvRows = dataSet.map(new CsvFormatter(fieldDelim.getOrElse(","))) if (numFiles.isDefined) { csvRows.setParallelism(numFiles.get) } val sink = writeMode match { case None => csvRows.writeAsText(path) case Some(wm) => csvRows.writeAsText(path, wm) } if (numFiles.isDefined) { sink.setParallelism(numFiles.get) } sink.name(TableConnectorUtil.generateRuntimeName(this.getClass, getFieldNames)) } override def emitDataStream(dataStream: DataStream[Row]): Unit = { val csvRows = dataStream.map(new CsvFormatter(fieldDelim.getOrElse(","))) if (numFiles.isDefined) { csvRows.setParallelism(numFiles.get) } val sink = writeMode match { case None => csvRows.writeAsText(path) case Some(wm) => csvRows.writeAsText(path, wm) } if (numFiles.isDefined) { sink.setParallelism(numFiles.get) } sink.name(TableConnectorUtil.generateRuntimeName(this.getClass, getFieldNames)) } override protected def copy: TableSinkBase[Row] = { new CsvTableSink(path, fieldDelim, numFiles, writeMode) } override def getOutputType: TypeInformation[Row] = { new RowTypeInfo(getFieldTypes: _*) } } /** * Formats a [[Row]] into a [[String]] with fields separated by the field delimiter. * * @param fieldDelim The field delimiter. */ class CsvFormatter(fieldDelim: String) extends MapFunction[Row, String] { override def map(row: Row): String = { val builder = new StringBuilder // write first value val v = row.getField(0) if (v != null) { builder.append(v.toString) } // write following values for (i <- 1 until row.getArity) { builder.append(fieldDelim) val v = row.getField(i) if (v != null) { builder.append(v.toString) } } builder.mkString } } /** * Formats a [[Row]] into a [[String]] with fields separated by the field delimiter. * * @param fieldDelim The field delimiter. */ class CsvFormatter(fieldDelim: String) extends MapFunction[Row, String] { override def map(row: Row): String = { val builder = new StringBuilder // write first value val v = row.getField(0) if (v != null) { builder.append(v.toString) } // write following values for (i <- 1 until row.getArity) { builder.append(fieldDelim) val v = row.getField(i) if (v != null) { builder.append(v.toString) } } builder.mkString } }
- CsvTableSink继承了TableSinkBase,实现了BatchTableSink及AppendStreamTableSink接口,而AppendStreamTableSink则继承了StreamTableSink
- emitDataSet及emitDataStream都使用了CsvFormatter,它是一个MapFunction,将Row类型转换为String
- CsvTableSink有一个名为writeMode的可选参数,WriteMode是一个枚举,它有NO_OVERWRITE及OVERWRITE两个枚举值,用于写csv文件时指定是否要覆盖已有的同名文件
小结
- TableSink定义了getOutputType、getFieldNames、getFieldTypes、configure三个方法;BatchTableSink继承了TableSink,定义了emitDataSet方法;StreamTableSink继承了TableSink,定义了emitDataStream方法;TableSinkBase继承了TableSink,它实现了getFieldNames、getFieldTypes、configure方法
- CsvTableSink继承了TableSinkBase,实现了BatchTableSink及AppendStreamTableSink接口,而AppendStreamTableSink则继承了StreamTableSink;emitDataSet及emitDataStream都使用了CsvFormatter,它是一个MapFunction,将Row类型转换为String
- CsvTableSink有一个名为writeMode的可选参数,WriteMode是一个枚举,它有NO_OVERWRITE及OVERWRITE两个枚举值,用于写csv文件时指定是否要覆盖已有的同名文件
doc
相关推荐
xiaoyutongxue 2020-05-27
raidtest 2020-10-09
匆匆那些年 2020-06-27
oXiaoChong 2020-06-20
yuchuanchen 2020-06-16
Spark高级玩法 2020-06-14
Leonwey 2020-06-11
Spark高级玩法 2020-06-09
文报 2020-06-09
xorxos 2020-06-07
yuchuanchen 2020-05-27
阿尼古 2020-05-26
千慧 2020-05-18
yuchuanchen 2020-05-17
yuchuanchen 2020-05-16
Spark高级玩法 2020-05-11
yuchuanchen 2020-05-11