聊聊flink Table的OrderBy及Limit
序
本文主要研究一下flink Table的OrderBy及Limit
实例
Table in = tableEnv.fromDataSet(ds, "a, b, c"); Table result = in.orderBy("a.asc"); Table in = tableEnv.fromDataSet(ds, "a, b, c"); // returns the first 5 records from the sorted result Table result1 = in.orderBy("a.asc").fetch(5); // skips the first 3 records and returns all following records from the sorted result Table result2 = in.orderBy("a.asc").offset(3); // skips the first 10 records and returns the next 5 records from the sorted result Table result3 = in.orderBy("a.asc").offset(10).fetch(5);
- orderBy方法类似sql的order by;limit则由offset及fetch两个方法构成,类似sql的offset及fetch
Table
flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/api/table.scala
class Table( private[flink] val tableEnv: TableEnvironment, private[flink] val logicalPlan: LogicalNode) { //...... def orderBy(fields: String): Table = { val parsedFields = ExpressionParser.parseExpressionList(fields) orderBy(parsedFields: _*) } def orderBy(fields: Expression*): Table = { val order: Seq[Ordering] = fields.map { case o: Ordering => o case e => Asc(e) } new Table(tableEnv, Sort(order, logicalPlan).validate(tableEnv)) } def offset(offset: Int): Table = { new Table(tableEnv, Limit(offset, -1, logicalPlan).validate(tableEnv)) } def fetch(fetch: Int): Table = { if (fetch < 0) { throw new ValidationException("FETCH count must be equal or larger than 0.") } this.logicalPlan match { case Limit(o, -1, c) => // replace LIMIT without FETCH by LIMIT with FETCH new Table(tableEnv, Limit(o, fetch, c).validate(tableEnv)) case Limit(_, _, _) => throw new ValidationException("FETCH is already defined.") case _ => new Table(tableEnv, Limit(0, fetch, logicalPlan).validate(tableEnv)) } } //...... }
- Table的orderBy方法,支持String或Expression类型的参数,其中String类型最终是转为Expression类型;orderBy方法最后使用Sort重新创建了Table;offset及fetch方法,使用Limit重新创建了Table(
offset方法创建的Limit其fetch为-1;fetch方法如果之前没有指定offset则创建的Limit的offset为0
)
Sort
flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/plan/logical/operators.scala
case class Sort(order: Seq[Ordering], child: LogicalNode) extends UnaryNode { override def output: Seq[Attribute] = child.output override protected[logical] def construct(relBuilder: RelBuilder): RelBuilder = { child.construct(relBuilder) relBuilder.sort(order.map(_.toRexNode(relBuilder)).asJava) } override def validate(tableEnv: TableEnvironment): LogicalNode = { if (tableEnv.isInstanceOf[StreamTableEnvironment]) { failValidation(s"Sort on stream tables is currently not supported.") } super.validate(tableEnv) } }
- Sort继承了UnaryNode,它的构造器接收Set类型的Ordering,其construct方法使用relBuilder.sort来构建sort条件
Ordering
flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/expressions/ordering.scala
abstract class Ordering extends UnaryExpression { override private[flink] def validateInput(): ValidationResult = { if (!child.isInstanceOf[NamedExpression]) { ValidationFailure(s"Sort should only based on field reference") } else { ValidationSuccess } } } case class Asc(child: Expression) extends Ordering { override def toString: String = s"($child).asc" override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = { child.toRexNode } override private[flink] def resultType: TypeInformation[_] = child.resultType } case class Desc(child: Expression) extends Ordering { override def toString: String = s"($child).desc" override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = { relBuilder.desc(child.toRexNode) } override private[flink] def resultType: TypeInformation[_] = child.resultType }
- Ordering是一个抽象类,它有Asc及Desc两个子类
Limit
flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/plan/logical/operators.scala
case class Limit(offset: Int, fetch: Int = -1, child: LogicalNode) extends UnaryNode { override def output: Seq[Attribute] = child.output override protected[logical] def construct(relBuilder: RelBuilder): RelBuilder = { child.construct(relBuilder) relBuilder.limit(offset, fetch) } override def validate(tableEnv: TableEnvironment): LogicalNode = { if (tableEnv.isInstanceOf[StreamTableEnvironment]) { failValidation(s"Limit on stream tables is currently not supported.") } if (!child.isInstanceOf[Sort]) { failValidation(s"Limit operator must be preceded by an OrderBy operator.") } if (offset < 0) { failValidation(s"Offset should be greater than or equal to zero.") } super.validate(tableEnv) } }
- Limit继承了UnaryNode,它的构造器接收offset及fetch参数,它的construct方法通过relBuilder.limit来设置offset及fetch
小结
- Table的orderBy方法类似sql的order by;limit则由offset及fetch两个方法构成,类似sql的offset及fetch
- Table的orderBy方法,支持String或Expression类型的参数,其中String类型最终是转为Expression类型;orderBy方法最后使用Sort重新创建了Table;offset及fetch方法,使用Limit重新创建了Table(
offset方法创建的Limit其fetch为-1;fetch方法如果之前没有指定offset则创建的Limit的offset为0
) - Sort继承了UnaryNode,它的构造器接收Set类型的Ordering,其construct方法使用relBuilder.sort来构建sort条件;Ordering是一个抽象类,它有Asc及Desc两个子类;Limit继承了UnaryNode,它的构造器接收offset及fetch参数,它的construct方法通过relBuilder.limit来设置offset及fetch
doc
相关推荐
xiaoyutongxue 2020-05-27
raidtest 2020-10-09
匆匆那些年 2020-06-27
oXiaoChong 2020-06-20
yuchuanchen 2020-06-16
Spark高级玩法 2020-06-14
Leonwey 2020-06-11
Spark高级玩法 2020-06-09
文报 2020-06-09
xorxos 2020-06-07
yuchuanchen 2020-05-27
阿尼古 2020-05-26
千慧 2020-05-18
yuchuanchen 2020-05-17
yuchuanchen 2020-05-16
Spark高级玩法 2020-05-11
yuchuanchen 2020-05-11