目录
回顾
初始化流程
Sql Parse 阶段
References
spark.version=2.4.4
在学习SparkSQL运行流程原理前可以先了解下SparkSQL中涉及到的一些基础概念,SparkSQL架构
通常SQL语句执行都会完成以下流程: 1、词法和语法解析Parse:生成逻辑计划 2、绑定Bind:生成可执行计划 3、优化Optimize:生成最优执行计划 4、执行Execute:返回实际数据
SparkSQL对SQL语句的处理和关系型数据库采用了类似的方法,SparkSQL会先将SQL语句进行解析Parse形成一个Tree,然后使用Rule对Tree进行绑定、优化等处理过程,通过模式匹配对不同类型的节点采用不同的操作。而SparkSQL的查询优化器是Catalyst,它负责处理查询语句的解析、绑定、优化和生成物理计划等过程,Catalyst是SparkSQL最核心的部分,其性能优劣将决定整体的性能。
这里在回顾下Catalyst大致流程
1、将SQL语句通过词法和语法解析生成未绑定的逻辑执行计划(Unresolved LogicalPlan),包含Unresolved Relation、Unresolved Function和Unresolved Attribute,然后在后续步骤中使用不同的Rule应用到该逻辑计划上
2、Analyzer使用Analysis Rules,配合元数据(如SessionCatalog 或是 Hive Metastore等)完善未绑定的逻辑计划的属性而转换成绑定的逻辑计划。具体流程是县实例化一个Simple Analyzer,然后遍历预定义好的Batch,通过父类Rule Executor的执行方法运行Batch里的Rules,每个Rule会对未绑定的逻辑计划进行处理,有些可以通过一次解析处理,有些需要多次迭代,迭代直到达到FixedPoint次数或前后两次的树结构没变化才停止操作。 3、Optimizer使用Optimization Rules,将绑定的逻辑计划进行合并、列裁剪和过滤器下推等优化工作后生成优化的逻辑计划。
4、Planner使用Planning Strategies,对优化的逻辑计划进行转换(Transform)生成可以执行的物理计划。根据过去的性能统计数据,选择最佳的物理执行计划CostModel,最后生成可以执行的物理执行计划树,得到SparkPlan。
5、在最终真正执行物理执行计划之前,还要进行preparations规则处理,最后调用SparkPlan的execute执行计算RDD。
在解析SQL语句前先需要在SparkSession中完成对SQLContext的初始化,它定义了Spark SQL执行的上下文,并把元数据保存在SessionCatalog中,这些元数据包括表名称、表字段名称和字段类型等。
可以参照前文:SparkSession初始化
SessionCatalog中保存的是表名和逻辑执行计划对应的哈希列表,这些数据将在解析未绑定的逻辑计划上使用。
(SessionCatalog中的表名对应的逻辑执行计划是什么?是这个Dataset对应的逻辑执行计划)。
该阶段简单点理解就是使用Antlr4,将一条Sql语句解析成语法树。也就是上文所述的将SQL语句通过词法和语法解析生成未绑定的逻辑执行计划(Unresolved LogicalPlan)。
val sqlText = "select name from t1 where uid = 2020" val df:DataFrame = spark.sql(sqlText) // #1 spark.sqlContext.sql(sqlText) // #2 def sql(sqlText: String): DataFrame = sparkSession.sql(sqlText) /** * Executes a SQL query using Spark, returning the result as a `DataFrame`. * The dialect that is used for SQL parsing can be configured with 'spark.sql.dialect'. */ def sql(sqlText: String): DataFrame = { Dataset.ofRows(self, sessionState.sqlParser.parsePlan(sqlText)) }无论是#1或#2最终都会执行到Dataset.ofRows(....), 接下来学习下SQL语句执行的具体流程:
如上代码中所示在执行SparkSession.sql()时会执行sessionState.sqlParser.parsePlan(sqlText),而sqlParser是SessionState基于一个特定SparkSession维护所有单个session作用域的所有状态时的核心Sql解析器类(更多可以参照前文:SparkSession初始化)。
然后继续调用trait(特质) ParserInterface中定义的parsePlan(),这里简单了解一下ParserInterface的定义,如下:
package org.apache.spark.sql.catalyst.parser /** * Interface for a parser. */ @DeveloperApi trait ParserInterface { /** * Parse a string to a [[LogicalPlan]]. */ @throws[ParseException]("Text cannot be parsed to a LogicalPlan") def parsePlan(sqlText: String): LogicalPlan ...... // 略这里又会辗转去org.apache.spark.sql.catalyst.parser.AbstractSqlParser调用parse方法。
先了解下AbstractSqlParser的定义,如下可以看到该抽象类继承自ParserInterface,如下:
/** * Base SQL parsing infrastructure. */ abstract class AbstractSqlParser extends ParserInterface with Logging { ...... // 略 /** Creates LogicalPlan for a given SQL string. */ override def parsePlan(sqlText: String): LogicalPlan = parse(sqlText) { parser => astBuilder.visitSingleStatement(parser.singleStatement()) match { case plan: LogicalPlan => plan case _ => val position = Origin(None, None) throw new ParseException(Option(sqlText), "Unsupported SQL statement", position, position) } } /** Get the builder (visitor) which converts a ParseTree into an AST. */ protected def astBuilder: AstBuilder protected def parse[T](command: String)(toResult: SqlBaseParser => T): T = { logDebug(s"Parsing command: $command") // 实例化词法解析器SqlBaseLexer val lexer = new SqlBaseLexer(new UpperCaseCharStream(CharStreams.fromString(command))) lexer.removeErrorListeners() lexer.addErrorListener(ParseErrorListener) lexer.legacy_setops_precedence_enbled = SQLConf.get.setOpsPrecedenceEnforced val tokenStream = new CommonTokenStream(lexer) // 实例化语法解析器SqlBaseParser val parser = new SqlBaseParser(tokenStream) parser.addParseListener(PostProcessor) parser.removeErrorListeners() parser.addErrorListener(ParseErrorListener) parser.legacy_setops_precedence_enbled = SQLConf.get.setOpsPrecedenceEnforced try { try { // first, try parsing with potentially faster SLL mode parser.getInterpreter.setPredictionMode(PredictionMode.SLL) toResult(parser) } catch { case e: ParseCancellationException => // if we fail, parse with LL mode tokenStream.seek(0) // rewind input stream parser.reset() // Try Again. parser.getInterpreter.setPredictionMode(PredictionMode.LL) toResult(parser) } } catch { case e: ParseException if e.command.isDefined => throw e case e: ParseException => throw e.withCommand(command) case e: AnalysisException => val position = Origin(e.line, e.startPosition) throw new ParseException(Option(command), e.message, position, position) } } }Spark 2.0版本起使用Antlr进行词法和语法解析。使用Antlr生成未绑定的逻辑计划分为两个阶段:
第一阶段的过程为词法分析(Lexical Analysis),负责将符号(Token)分组成符号类(Token class or Token type);
第二阶段就是真正的Parser,默认Antlr会构建出一颗分析树(Parser Tree)或者叫语法树(Syntax Tree)。
可以看到具体的SQL解析在AbastrctSqlParser抽象类中的parse方法进行,解析完毕后生成语法树,语法树会根据系统初始化的AstBuilder解析生成表达式、逻辑计划或表标识对象。
在AbstractSqlParse的parse方法中,先实例化词法解析器SqlBaseLexer和语法解析器SqlBaseParser,然后尝试用Antlr较快的解析模式SLL,如果解析失败,则会再尝试使用普通解析模型LL,解析完毕后返回解析结果。
Spark提供了一个.g4文件(\sql\catalyst\src\main\antlr4\org\apache\spark\sql\catalyst\parser\SqlBase.g4),编译的时候会使用Antlr根据这个.g4生成对应的词法分析类和语法分析类,同时还使用了访问者模式,用以构建Logical Plan(语法树)。另外这里无论是SqlBaseLexer还是SqlBaseParser都是Antlr4的东西,包括最后的toResult(parser)也是调用访问者模式的类去遍历语法树来生成Logical Plan。
Spark使用Antlr4的访问者模式,生成Logical Plan。这里顺便说下怎么实现访问者模式吧,在使用antlr4命令的时候,加上-visit参数就会生成SqlBaseBaseVisitor,里面提供了默认的访问各个节点的触发方法。我们可以通过继承这个类,重写对应节点的visit方法,实现自己的访问逻辑,而这个继承的类就是org.apache.spark.sql.catalyst.parser.AstBuilder。
接下来再来看看AstBuilder做了什么?
protected def astBuilder: AstBuilder class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging { import ParserUtils._ /** * Create a logical plan using a query specification. */ override def visitQuerySpecification( ctx: QuerySpecificationContext): LogicalPlan = withOrigin(ctx) { val from = OneRowRelation().optional(ctx.fromClause) { visitFromClause(ctx.fromClause) } /** * Add a query specification to a logical plan. The query specification is the core of the logical * plan, this is where sourcing (FROM clause), transforming (SELECT TRANSFORM/MAP/REDUCE), * projection (SELECT), aggregation (GROUP BY ... HAVING ...) and filtering (WHERE) takes place. * * Note that query hints are ignored (both by the parser and the builder). */ withQuerySpecification(ctx, from) } ...... // 略该段代码中会先判断是否有FROM子语句,有的话会去生成对应的Logical Plan,再调用withQuerySpecification()方法,而withQuerySpecification()方法是比较核心的一个方法。它会处理包括SELECT,FILTER,GROUP BY,HAVING等子语句的逻辑。
AstBuilder大意就是使用Scala的模式匹配,匹配不同的子语句生成不同的Logical Plan。
然后再来说说最终生成的LogicalPlan,LogicalPlan其实是继承自TreeNode,所以本质上LogicalPlan就是一棵树。(更多可以参照:SparkSQL 架构基础概念)
最后再来看一下示例:
import spark.implicits._ //生成DataFrame val df = Seq((1, 1)).toDF("key", "value") df.createOrReplaceTempView("src") //调用spark.sql val query = spark.sql("select key from src ") logger.info(query.queryExecution.logical)logger.info打印如下:
'Project ['key] +- 'UnresolvedRelation `src`这个Project是UnaryNode的一个子类(SELECT自然是一元节点),表明我们要查询的字段是key。
abstract class OrderPreservingUnaryNode extends UnaryNode { override final def outputOrdering: Seq[SortOrder] = child.outputOrdering } case class Project(projectList: Seq[NamedExpression], child: LogicalPlan) extends OrderPreservingUnaryNode { ...... // 略 }UnresolvedRelation是一个新的概念,这里顺便说下,我们通过SQL parse生成的这棵树,其实叫Unresolved LogicalPlan,这里的Unresolved的意思说,还不知道src是否存在,或它的元数据是什么样,只有通过Analysis阶段后,才会把Unresolved变成Resolved LogicalPlan。这里的意思可以理解为,读取名为src的表,但这张表的情况未知,有待验证。
https://www.cnblogs.com/listenfwind/p/12735833.html