最近工作中需要开发一些新的Flink sql 的connector,所以先开始研究研究Flink sql的执行流程。
Planner接口 负责sql解析、转换成Transformation Executor接口 负责将planner转换的Transformation生成streamGraph并执行
public interface Planner { /** * Retrieves a {@link Parser} that provides methods for parsing a SQL string. * * @return initialized {@link Parser} */ Parser getParser(); /** * Converts a relational tree of {@link ModifyOperation}s into a set of runnable * {@link Transformation}s. * * <p>This method accepts a list of {@link ModifyOperation}s to allow reusing common * subtrees of multiple relational queries. Each query's top node should be a {@link ModifyOperation} * in order to pass the expected properties of the output {@link Transformation} such as * output mode (append, retract, upsert) or the expected output type. * * @param modifyOperations list of relational operations to plan, optimize and convert in a * single run. * @return list of corresponding {@link Transformation}s. */ List<Transformation<?>> translate(List<ModifyOperation> modifyOperations); /** * Returns the AST of the specified Table API and SQL queries and the execution plan * to compute the result of the given collection of {@link QueryOperation}s. * * @param operations The collection of relational queries for which the AST * and execution plan will be returned. * @param extended if the plan should contain additional properties such as * e.g. estimated cost, traits */ String explain(List<Operation> operations, boolean extended); /** * Returns completion hints for the given statement at the given cursor position. * The completion happens case insensitively. * * @param statement Partial or slightly incorrect SQL statement * @param position cursor position * @return completion hints that fit at the current cursor position */ String[] getCompletionHints(String statement, int position); }Parser接口 负责sql解析 有两个实现一个是old planner,另一个是blink planner flink对sql的解析依赖于calcite
具体实现
@Override public List<Operation> parse(String statement) { CalciteParser parser = calciteParserSupplier.get(); FlinkPlannerImpl planner = validatorSupplier.get(); // parse the sql query //依赖calcite将sql语句解析为sqlNode SqlNode parsed = parser.parse(statement); //将sqlnode转换为Operation Operation operation = SqlToOperationConverter.convert(planner, catalogManager, parsed) .orElseThrow(() -> new TableException("Unsupported query: " + statement)); return Collections.singletonList(operation); } /** * This is the main entrance for executing all kinds of DDL/DML {@code SqlNode}s, different * SqlNode will have it's implementation in the #convert(type) method whose 'type' argument * is subclass of {@code SqlNode}. * * @param flinkPlanner FlinkPlannerImpl to convertCreateTable sql node to rel node * @param catalogManager CatalogManager to resolve full path for operations * @param sqlNode SqlNode to execute on */ public static Optional<Operation> convert( FlinkPlannerImpl flinkPlanner, CatalogManager catalogManager, SqlNode sqlNode) { // validate the query // 校验sql的合法性 final SqlNode validated = flinkPlanner.validate(sqlNode); SqlToOperationConverter converter = new SqlToOperationConverter(flinkPlanner, catalogManager); //对不同的ddl/dml进行转换 if (validated instanceof SqlCreateTable) { return Optional.of(converter.convertCreateTable((SqlCreateTable) validated)); } else if (validated instanceof SqlDropTable) { return Optional.of(converter.convertDropTable((SqlDropTable) validated)); } else if (validated instanceof SqlAlterTable) { return Optional.of(converter.convertAlterTable((SqlAlterTable) validated)); } else if (validated instanceof SqlCreateFunction) { return Optional.of(converter.convertCreateFunction((SqlCreateFunction) validated)); } else if (validated instanceof SqlAlterFunction) { return Optional.of(converter.convertAlterFunction((SqlAlterFunction) validated)); } else if (validated instanceof SqlDropFunction) { return Optional.of(converter.convertDropFunction((SqlDropFunction) validated)); } else if (validated instanceof RichSqlInsert) { return Optional.of(converter.convertSqlInsert((RichSqlInsert) validated)); } else if (validated instanceof SqlUseCatalog) { return Optional.of(converter.convertUseCatalog((SqlUseCatalog) validated)); } else if (validated instanceof SqlUseDatabase) { return Optional.of(converter.convertUseDatabase((SqlUseDatabase) validated)); } else if (validated instanceof SqlCreateDatabase) { return Optional.of(converter.convertCreateDatabase((SqlCreateDatabase) validated)); } else if (validated instanceof SqlDropDatabase) { return Optional.of(converter.convertDropDatabase((SqlDropDatabase) validated)); } else if (validated instanceof SqlAlterDatabase) { return Optional.of(converter.convertAlterDatabase((SqlAlterDatabase) validated)); } else if (validated instanceof SqlCreateCatalog) { return Optional.of(converter.convertCreateCatalog((SqlCreateCatalog) validated)); } else if (validated.getKind().belongsTo(SqlKind.QUERY)) { return Optional.of(converter.convertSqlQuery(validated)); } else { return Optional.empty(); } }举个栗子:
CREATE TABLE user_behavior ( user_id BIGINT, item_id BIGINT, category_id BIGINT, behavior STRING, ts TIMESTAMP(3), proctime as PROCTIME(), -- 通过计算列产生一个处理时间列 WATERMARK FOR ts as ts - INTERVAL '5' SECOND -- 在ts上定义watermark,ts成为事件时间列 ) WITH ( 'connector.type' = 'kafka', -- 使用 kafka connector 'connector.version' = 'universal', -- kafka 版本,universal 支持 0.11 以上的版本 'connector.topic' = 'user_behavior', -- kafka topic 'connector.startup-mode' = 'earliest-offset', -- 从起始 offset 开始读取 --'connector.properties.group.id' = '', 'connector.properties.zookeeper.connect' = 'localhost:2181', -- zookeeper 地址 'connector.properties.bootstrap.servers' = 'localhost:9092', -- kafka broker 地址 'format.type' = 'json' -- 数据源格式为 json )上面的sql经过SqlNode parsed = parser.parse(statement);解析之后如下图:
对应的表名、列名、with属性参数、主键、唯一键、分区键、水印、表注释、表操作(create table、alter table、drop table。。。)都放到SqlNode对象的对应属性中,SqlNode是一个树形结构也就是AST。 不同的sql经过convert处理后返回不同的Operation,最后会根据不同的Operation有不同的处理行为。 Operation主要分为Create/Alter/Drop/Modify/Query/Use 几大类,每个下面又细分如CreateOperation
看下createTable的处理,返回的就是CreateTableOperation
/** * Convert the {@link SqlCreateTable} node. */ private Operation convertCreateTable(SqlCreateTable sqlCreateTable) { // primary key and unique keys are not supported if ((sqlCreateTable.getPrimaryKeyList().size() > 0) || (sqlCreateTable.getUniqueKeysList().size() > 0)) { throw new SqlConversionException("Primary key and unique key are not supported yet."); } // set with properties //将with参数放到一个map中 Map<String, String> properties = new HashMap<>(); sqlCreateTable.getPropertyList().getList().forEach(p -> properties.put(((SqlTableOption) p).getKeyString(), ((SqlTableOption) p).getValueString())); //schema TableSchema tableSchema = createTableSchema(sqlCreateTable); String tableComment = sqlCreateTable.getComment().map(comment -> comment.getNlsString().getValue()).orElse(null); // set partition key List<String> partitionKeys = sqlCreateTable.getPartitionKeyList() .getList() .stream() .map(p -> ((SqlIdentifier) p).getSimple()) .collect(Collectors.toList()); CatalogTable catalogTable = new CatalogTableImpl(tableSchema, partitionKeys, properties, tableComment); UnresolvedIdentifier unresolvedIdentifier = UnresolvedIdentifier.of(sqlCreateTable.fullTableName()); ObjectIdentifier identifier = catalogManager.qualifyIdentifier(unresolvedIdentifier); return new CreateTableOperation( identifier, catalogTable, sqlCreateTable.isIfNotExists()); } else if (operation instanceof CreateTableOperation) { CreateTableOperation createTableOperation = (CreateTableOperation) operation; catalogManager.createTable( createTableOperation.getCatalogTable(), createTableOperation.getTableIdentifier(), createTableOperation.isIgnoreIfExists()); }对表的DDL操作(比如对table、function、database的操作)入口为tableEnv.sqlUpdate(sql),会调用到catalogManager对表的DDL进行维护,如果是query语句入口tableEnv.sqlQuery(sql),则会走
else if (validated.getKind().belongsTo(SqlKind.QUERY)) { return Optional.of(converter.convertSqlQuery(validated)); } private PlannerQueryOperation toQueryOperation(FlinkPlannerImpl planner, SqlNode validated) { // transform to a relational tree RelRoot relational = planner.rel(validated); return new PlannerQueryOperation(relational.project()); }将SqlNode转换为RelNode,
主要包含3个步骤:
推断table类型
推断计算列
推断watermark分配
override def toRel(context: RelOptTable.ToRelContext): RelNode = { val cluster = context.getCluster val flinkContext = cluster .getPlanner .getContext .unwrap(classOf[FlinkContext]) val tableSource = findAndCreateTableSource(flinkContext.getTableConfig.getConfiguration)//查找并创建tableSource val tableSourceTable = new TableSourceTable[T]( relOptSchema, schemaTable.getTableIdentifier, erasedRowType, statistic, tableSource, schemaTable.isStreamingMode, catalogTable) // 1. push table scan // Get row type of physical fields. val physicalFields = getRowType .getFieldList .filter(f => !columnExprs.contains(f.getName)) .map(f => f.getIndex) .toArray // Copy this table with physical scan row type. val newRelTable = tableSourceTable.copy(tableSource, physicalFields) val scan = LogicalTableScan.create(cluster, newRelTable) val relBuilder = FlinkRelBuilder.of(cluster, getRelOptSchema) relBuilder.push(scan) val toRexFactory = cluster .getPlanner .getContext .unwrap(classOf[FlinkContext]) .getSqlExprToRexConverterFactory // 2. push computed column project val fieldNames = erasedRowType.getFieldNames.asScala if (columnExprs.nonEmpty) { val fieldExprs = fieldNames .map { name => if (columnExprs.contains(name)) { columnExprs(name) } else { s"`$name`" } }.toArray val rexNodes = toRexFactory.create(newRelTable.getRowType).convertToRexNodes(fieldExprs) relBuilder.projectNamed(rexNodes.toList, fieldNames, true) } // 3. push watermark assigner val watermarkSpec = catalogTable .getSchema // we only support single watermark currently .getWatermarkSpecs.asScala.headOption if (schemaTable.isStreamingMode && watermarkSpec.nonEmpty) { if (TableSourceValidation.hasRowtimeAttribute(tableSource)) { throw new TableException( "If watermark is specified in DDL, the underlying TableSource of connector" + " shouldn't return an non-empty list of RowtimeAttributeDescriptor" + " via DefinedRowtimeAttributes interface.") } val rowtime = watermarkSpec.get.getRowtimeAttribute if (rowtime.contains(".")) { throw new TableException( s"Nested field '$rowtime' as rowtime attribute is not supported right now.") } val rowtimeIndex = fieldNames.indexOf(rowtime) val watermarkRexNode = toRexFactory .create(erasedRowType) .convertToRexNode(watermarkSpec.get.getWatermarkExpr) relBuilder.watermark(rowtimeIndex, watermarkRexNode) } // 4. returns the final RelNode relBuilder.build() }对Source table进行推断是在Query语句SqlNode变成RelNode阶段
见CatalogSourceTable.scala
lazy val tableSource: TableSource[T] = findAndCreateTableSource().asInstanceOf[TableSource[T]]TableFactoryUtil会根据DDL中的with参数进行推断
private static <T> TableSource<T> findAndCreateTableSource(Map<String, String> properties) { try { return TableFactoryService .find(TableSourceFactory.class, properties) .createTableSource(properties); } catch (Throwable t) { throw new TableException("findAndCreateTableSource failed.", t); } }主要逻辑在TableFactoryService#filter方法中
private static <T extends TableFactory> List<T> filter( List<TableFactory> foundFactories, Class<T> factoryClass, Map<String, String> properties) { Preconditions.checkNotNull(factoryClass); Preconditions.checkNotNull(properties); //先根据TableSourceFactory.class 过滤出source的factory List<T> classFactories = filterByFactoryClass( factoryClass, properties, foundFactories); //再根据with参数中的connect.type过滤出满足条件的SourceFactory List<T> contextFactories = filterByContext( factoryClass, properties, classFactories); //最终判断一下所有properties是否都支持 return filterBySupportedProperties( factoryClass, properties, classFactories, contextFactories); }大致调用栈如下 后续的优化部分其实都是直接基于 RelNode来完成的。
转换的流程主要分为四个部分,即 1)将 Operation 转换为 RelNode,2)优化 RelNode,3)转换成 ExecNode,4)转换为底层的 Transformation 算子。
如果是DML操作进过SQL转化后会变为ModifyOperation,就会调用Planner的translate方法。
对Sink Table的推断是在Insert into语句SqlNode变成RelNode阶段 调用栈如下:
具体实现在PlannerBase.scala
override def translate( modifyOperations: util.List[ModifyOperation]): util.List[Transformation[_]] = { if (modifyOperations.isEmpty) { return List.empty[Transformation[_]] } // prepare the execEnv before translating getExecEnv.configure( getTableConfig.getConfiguration, Thread.currentThread().getContextClassLoader) overrideEnvParallelism() //转化为relNode val relNodes = modifyOperations.map(translateToRel) //SQL优化 val optimizedRelNodes = optimize(relNodes) //转化为execNode val execNodes = translateToExecNodePlan(optimizedRelNodes) //转化为底层的transfomation translateToPlan(execNodes) }优化比较复杂,Blink主要是基于Calcite自己的优化,并自定义了一些优化逻辑,有兴趣的读者可以自行研究。
在得到Transformation后的转化逻辑就和streaming模式一致了,可以参考我之前的博客Flink源码阅读之基于Flink1.10的任务提交流程和[Flink源码阅读之基于Flink1.10的任务执行流程](