聊聊flink的TableFunction
序
本文主要研究一下flink的TableFunction
实例
// The generic type "Tuple2<String, Integer>" determines the schema of the returned table as (String, Integer). public class Split extends TableFunction<Tuple2<String, Integer>> { private String separator = " "; public Split(String separator) { this.separator = separator; } public void eval(String str) { for (String s : str.split(separator)) { // use collect(...) to emit a row collect(new Tuple2<String, Integer>(s, s.length())); } } } BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); Table myTable = ... // table schema: [a: String] // Register the function. tableEnv.registerFunction("split", new Split("#")); // Use the table function in the Java Table API. "as" specifies the field names of the table. myTable.join(new Table(tableEnv, "split(a) as (word, length)")) .select("a, word, length"); myTable.leftOuterJoin(new Table(tableEnv, "split(a) as (word, length)")) .select("a, word, length"); // Use the table function in SQL with LATERAL and TABLE keywords. // CROSS JOIN a table function (equivalent to "join" in Table API). tableEnv.sqlQuery("SELECT a, word, length FROM MyTable, LATERAL TABLE(split(a)) as T(word, length)"); // LEFT JOIN a table function (equivalent to "leftOuterJoin" in Table API). tableEnv.sqlQuery("SELECT a, word, length FROM MyTable LEFT JOIN LATERAL TABLE(split(a)) as T(word, length) ON TRUE");
- 本实例自定义了Split function,并通过TableEnvironment.registerFunction注册,最后在Table的api或者TableEnvironment.sqlQuery中使用;这里的Split定义了public的eval方法,用于发射数据
UserDefinedFunction
flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/functions/UserDefinedFunction.scala
abstract class UserDefinedFunction extends Serializable { /** * Setup method for user-defined function. It can be used for initialization work. * * By default, this method does nothing. */ @throws(classOf[Exception]) def open(context: FunctionContext): Unit = {} /** * Tear-down method for user-defined function. It can be used for clean up work. * * By default, this method does nothing. */ @throws(classOf[Exception]) def close(): Unit = {} /** * @return true if and only if a call to this function is guaranteed to always return * the same result given the same parameters; true is assumed by default * if user's function is not pure functional, like random(), date(), now()... * isDeterministic must return false */ def isDeterministic: Boolean = true final def functionIdentifier: String = { val md5 = EncodingUtils.hex(EncodingUtils.md5(EncodingUtils.encodeObjectToString(this))) getClass.getCanonicalName.replace('.', '$').concat("$").concat(md5) } /** * Returns the name of the UDF that is used for plan explain and logging. */ override def toString: String = getClass.getSimpleName }
- UserDefinedFunction定义了open、close、functionIdentifier方法
TableFunction
flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/functions/TableFunction.scala
abstract class TableFunction[T] extends UserDefinedFunction { // ---------------------------------------------------------------------------------------------- /** * Emit an output row. * * @param row the output row */ protected def collect(row: T): Unit = { collector.collect(row) } // ---------------------------------------------------------------------------------------------- /** * The code generated collector used to emit row. */ private var collector: Collector[T] = _ /** * Internal use. Sets the current collector. */ private[flink] final def setCollector(collector: Collector[T]): Unit = { this.collector = collector } // ---------------------------------------------------------------------------------------------- /** * Returns the result type of the evaluation method with a given signature. * * This method needs to be overridden in case Flink's type extraction facilities are not * sufficient to extract the [[TypeInformation]] based on the return type of the evaluation * method. Flink's type extraction facilities can handle basic types or * simple POJOs but might be wrong for more complex, custom, or composite types. * * @return [[TypeInformation]] of result type or null if Flink should determine the type */ def getResultType: TypeInformation[T] = null /** * Returns [[TypeInformation]] about the operands of the evaluation method with a given * signature. * * In order to perform operand type inference in SQL (especially when NULL is used) it might be * necessary to determine the parameter [[TypeInformation]] of an evaluation method. * By default Flink's type extraction facilities are used for this but might be wrong for * more complex, custom, or composite types. * * @param signature signature of the method the operand types need to be determined * @return [[TypeInformation]] of operand types */ def getParameterTypes(signature: Array[Class[_]]): Array[TypeInformation[_]] = { signature.map { c => try { TypeExtractor.getForClass(c) } catch { case ite: InvalidTypesException => throw new ValidationException( s"Parameter types of table function '${this.getClass.getCanonicalName}' cannot be " + s"automatically determined. Please provide type information manually.") } } } }
- TableFunction继承了UserDefinedFunction,定义了collect、setCollector、getResultType、getParameterTypes方法
ProcessOperator
flink-streaming-java_2.11-1.7.1-sources.jar!/org/apache/flink/streaming/api/operators/ProcessOperator.java
@Internal public class ProcessOperator<IN, OUT> extends AbstractUdfStreamOperator<OUT, ProcessFunction<IN, OUT>> implements OneInputStreamOperator<IN, OUT> { private static final long serialVersionUID = 1L; private transient TimestampedCollector<OUT> collector; private transient ContextImpl context; /** We listen to this ourselves because we don't have an {@link InternalTimerService}. */ private long currentWatermark = Long.MIN_VALUE; public ProcessOperator(ProcessFunction<IN, OUT> function) { super(function); chainingStrategy = ChainingStrategy.ALWAYS; } @Override public void open() throws Exception { super.open(); collector = new TimestampedCollector<>(output); context = new ContextImpl(userFunction, getProcessingTimeService()); } @Override public void processElement(StreamRecord<IN> element) throws Exception { collector.setTimestamp(element); context.element = element; userFunction.processElement(element.getValue(), context, collector); context.element = null; } @Override public void processWatermark(Watermark mark) throws Exception { super.processWatermark(mark); this.currentWatermark = mark.getTimestamp(); } //...... }
- ProcessOperator的processElement方法调用了userFunction.processElement,这里userFunction为CRowCorrelateProcessRunner
CRowCorrelateProcessRunner
flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/runtime/CRowCorrelateProcessRunner.scala
class CRowCorrelateProcessRunner( processName: String, processCode: String, collectorName: String, collectorCode: String, @transient var returnType: TypeInformation[CRow]) extends ProcessFunction[CRow, CRow] with ResultTypeQueryable[CRow] with Compiler[Any] with Logging { private var function: ProcessFunction[Row, Row] = _ private var collector: TableFunctionCollector[_] = _ private var cRowWrapper: CRowWrappingCollector = _ override def open(parameters: Configuration): Unit = { LOG.debug(s"Compiling TableFunctionCollector: $collectorName \n\n Code:\n$collectorCode") val clazz = compile(getRuntimeContext.getUserCodeClassLoader, collectorName, collectorCode) LOG.debug("Instantiating TableFunctionCollector.") collector = clazz.newInstance().asInstanceOf[TableFunctionCollector[_]] this.cRowWrapper = new CRowWrappingCollector() LOG.debug(s"Compiling ProcessFunction: $processName \n\n Code:\n$processCode") val processClazz = compile(getRuntimeContext.getUserCodeClassLoader, processName, processCode) val constructor = processClazz.getConstructor(classOf[TableFunctionCollector[_]]) LOG.debug("Instantiating ProcessFunction.") function = constructor.newInstance(collector).asInstanceOf[ProcessFunction[Row, Row]] FunctionUtils.setFunctionRuntimeContext(collector, getRuntimeContext) FunctionUtils.setFunctionRuntimeContext(function, getRuntimeContext) FunctionUtils.openFunction(collector, parameters) FunctionUtils.openFunction(function, parameters) } override def processElement( in: CRow, ctx: ProcessFunction[CRow, CRow]#Context, out: Collector[CRow]) : Unit = { cRowWrapper.out = out cRowWrapper.setChange(in.change) collector.setCollector(cRowWrapper) collector.setInput(in.row) collector.reset() function.processElement( in.row, ctx.asInstanceOf[ProcessFunction[Row, Row]#Context], cRowWrapper) } override def getProducedType: TypeInformation[CRow] = returnType override def close(): Unit = { FunctionUtils.closeFunction(collector) FunctionUtils.closeFunction(function) } }
- CRowCorrelateProcessRunner的processElement方法调用了function.processElement,这里function会去调用Split的eval方法
小结
- TableFunction继承了UserDefinedFunction,定义了collect、setCollector、getResultType、getParameterTypes方法;UserDefinedFunction定义了open、close、functionIdentifier方法
- 自定义TableFunction的话,除了继承TableFunction类外,还需要定义一个public的eval方法,该方法的参数类型需要依据使用场景来定义,比如本实例中调用split的时候传入的是table的a字段,该字段为String类型,因而eval方法的入参就定义为String类型
- ProcessOperator的processElement方法调用了userFunction.processElement,这里userFunction为CRowCorrelateProcessRunner;CRowCorrelateProcessRunner的processElement方法调用了function.processElement,这里function会去调用Split的eval方法
doc
相关推荐
xiaoyutongxue 2020-05-27
世樹 2020-11-11
SCNUHB 2020-11-10
bleach00 2020-11-10
FellowYourHeart 2020-10-05
momode 2020-09-11
思君夜未眠 2020-09-04
jessieHJ 2020-08-19
行吟阁 2020-08-09
表格的现在还是较为常用的一种标签,但不是用来布局,常见处理、显示表格式数据。在HTML网页中,要想创建表格,就需要使用表格相关的标签。<table> <tr> <td>单元格内的文字</td> ...
gufudhn 2020-08-09
末点 2020-08-03
nimeijian 2020-07-30
好记忆也需烂 2020-07-28
zlsdmx 2020-07-05
tomson 2020-07-05
tianqi 2020-07-05
onlykg 2020-07-04