当前位置: 代码迷 >> SQL >> Spark修炼之道(进阶篇)——Spark入门到精通:第九节 Spark SQL运作流程解析
  详细解决方案

Spark修炼之道(进阶篇)——Spark入门到精通:第九节 Spark SQL运作流程解析

热度:543   发布时间:2016-05-05 09:44:37.0
Spark修炼之道(进阶篇)——Spark入门到精通:第九节 Spark SQL运行流程解析

1.整体运行流程

使用下列代码对SparkSQL流程进行分析,让大家明白LogicalPlan的几种状态,理解SparkSQL整体执行流程

// sc is an existing SparkContext.val sqlContext = new org.apache.spark.sql.SQLContext(sc)// this is used to implicitly convert an RDD to a DataFrame.import sqlContext.implicits._// Define the schema using a case class.// Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit,// you can use custom classes that implement the Product interface.case class Person(name: String, age: Int)// Create an RDD of Person objects and register it as a table.val people = sc.textFile("/examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()people.registerTempTable("people")// SQL statements can be run by using the sql methods provided by sqlContext.val teenagers = sqlContext.sql("SELECT name, age FROM people WHERE age >= 13 AND age <= 19")

(1)查看teenagers的Schema信息

scala> teenagers.printSchemaroot |-- name: string (nullable = true) |-- age: integer (nullable = false)

(2)查看运行流程

scala> teenagers.queryExecutionres3: org.apache.spark.sql.SQLContext#QueryExecution === Parsed Logical Plan =='Project [unresolvedalias('name),unresolvedalias('age)] 'Filter (('age >= 13) && ('age <= 19))  'UnresolvedRelation [people], None== Analyzed Logical Plan ==name: string, age: intProject [name#0,age#1] Filter ((age#1 >= 13) && (age#1 <= 19))  Subquery people   LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at <console>:22== Optimized Logical Plan ==Filter ((age#1 >= 13) && (age#1 <= 19)) LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at <console>:22== Physical Plan ==Filter ((age#1 >= 13) && (age#1 <= 19)) Scan PhysicalRDD[name#0,age#1]Code Generation: true

QueryExecution中表示的是整体Spark SQL运行流程,从上面的输出结果可以看到,一个SQL语句要执行需要经过下列步骤:

== (1)Parsed Logical Plan =='Project [unresolvedalias('name),unresolvedalias('age)] 'Filter (('age >= 13) && ('age <= 19))  'UnresolvedRelation [people], None== (2)Analyzed Logical Plan ==name: string, age: intProject [name#0,age#1] Filter ((age#1 >= 13) && (age#1 <= 19))  Subquery people   LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at <console>:22== (3)Optimized Logical Plan ==Filter ((age#1 >= 13) && (age#1 <= 19)) LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at <console>:22== (4)Physical Plan ==Filter ((age#1 >= 13) && (age#1 <= 19)) Scan PhysicalRDD[name#0,age#1]//启动动态字节码生成技术(bytecode generation,CG),提升查询效率Code Generation: true

2.全表查询运行流程

执行语句:

val all= sqlContext.sql("SELECT * FROM people")

运行流程:

scala> all.queryExecutionres9: org.apache.spark.sql.SQLContext#QueryExecution =//注意*号被解析为unresolvedalias(*)== Parsed Logical Plan =='Project [unresolvedalias(*)] 'UnresolvedRelation [people], None== Analyzed Logical Plan ==//unresolvedalias(*)被analyzed为Schema中所有的字段//UnresolvedRelation [people]被analyzed为Subquery peoplename: string, age: intProject [name#0,age#1] Subquery people  LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at <console>:22== Optimized Logical Plan ==LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at <console>:22== Physical Plan ==Scan PhysicalRDD[name#0,age#1]Code Generation: true

3. filter查询运行流程

执行语句:

scala> val filterQuery= sqlContext.sql("SELECT * FROM people WHERE age >= 13 AND age <= 19")filterQuery: org.apache.spark.sql.DataFrame = [name: string, age: int]

执行流程:

scala> filterQuery.queryExecutionres0: org.apache.spark.sql.SQLContext#QueryExecution === Parsed Logical Plan =='Project [unresolvedalias(*)] 'Filter (('age >= 13) && ('age <= 19))  'UnresolvedRelation [people], None== Analyzed Logical Plan ==name: string, age: intProject [name#0,age#1] //多出了Filter,后同 Filter ((age#1 >= 13) && (age#1 <= 19))  Subquery people   LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at <console>:20== Optimized Logical Plan ==Filter ((age#1 >= 13) && (age#1 <= 19)) LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at <console>:20== Physical Plan ==Filter ((age#1 >= 13) && (age#1 <= 19)) Scan PhysicalRDD[name#0,age#1]Code Generation: true

4. join查询运行流程

执行语句:

val joinQuery= sqlContext.sql("SELECT * FROM people a, people b where a.age=b.age")

查看整体执行流程

scala> joinQuery.queryExecutionres0: org.apache.spark.sql.SQLContext#QueryExecution =//注意Filter//Join Inner== Parsed Logical Plan =='Project [unresolvedalias(*)] 'Filter ('a.age = 'b.age)  'Join Inner, None   'UnresolvedRelation [people], Some(a)   'UnresolvedRelation [people], Some(b)== Analyzed Logical Plan ==name: string, age: int, name: string, age: intProject [name#0,age#1,name#2,age#3] Filter (age#1 = age#3)  Join Inner, None   Subquery a    Subquery people     LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at <console>:22   Subquery b    Subquery people     LogicalRDD [name#2,age#3], MapPartitionsRDD[4] at rddToDataFrameHolder at <console>:22== Optimized Logical Plan ==Project [name#0,age#1,name#2,age#3] Join Inner, Some((age#1 = age#3))  LogicalRDD [name#0,age#1], MapPartitionsRDD[4]...//查看其Physical Planscala> joinQuery.queryExecution.sparkPlanres16: org.apache.spark.sql.execution.SparkPlan =TungstenProject [name#0,age#1,name#2,age#3] SortMergeJoin [age#1], [age#3]  Scan PhysicalRDD[name#0,age#1]  Scan PhysicalRDD[name#2,age#3]

前面的例子与下面的例子等同,只不过其运行方式略有不同,执行语句:

scala> val innerQuery= sqlContext.sql("SELECT * FROM people a inner join people b on a.age=b.age")innerQuery: org.apache.spark.sql.DataFrame = [name: string, age: int, name: string, age: int]

查看整体执行流程:

scala> innerQuery.queryExecutionres2: org.apache.spark.sql.SQLContext#QueryExecution =//注意Join Inner//另外这里面没有Filter== Parsed Logical Plan =='Project [unresolvedalias(*)] 'Join Inner, Some(('a.age = 'b.age))  'UnresolvedRelation [people], Some(a)  'UnresolvedRelation [people], Some(b)== Analyzed Logical Plan ==name: string, age: int, name: string, age: intProject [name#0,age#1,name#4,age#5] Join Inner, Some((age#1 = age#5))  Subquery a   Subquery people    LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at <console>:22  Subquery b   Subquery people    LogicalRDD [name#4,age#5], MapPartitionsRDD[4] at rddToDataFrameHolder at <console>:22//注意Optimized Logical Plan与Analyzed Logical Plan//并没有进行特别的优化,突出这一点是为了比较后面的子查询//其Analyzed和Optimized间的区别== Optimized Logical Plan ==Project [name#0,age#1,name#4,age#5] Join Inner, Some((age#1 = age#5))  LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder ...//查看其Physical Planscala> innerQuery.queryExecution.sparkPlanres14: org.apache.spark.sql.execution.SparkPlan =TungstenProject [name#0,age#1,name#6,age#7] SortMergeJoin [age#1], [age#7]  Scan PhysicalRDD[name#0,age#1]  Scan PhysicalRDD[name#6,age#7]

5. 子查询运行流程

执行语句:

scala> val subQuery=sqlContext.sql("SELECT * FROM (SELECT * FROM people WHERE age >= 13)a where a.age <= 19")subQuery: org.apache.spark.sql.DataFrame = [name: string, age: int]

查看整体执行流程:

scala> subQuery.queryExecutionres4: org.apache.spark.sql.SQLContext#QueryExecution === Parsed Logical Plan =='Project [unresolvedalias(*)] 'Filter ('a.age <= 19)  'Subquery a   'Project [unresolvedalias(*)]    'Filter ('age >= 13)     'UnresolvedRelation [people], None== Analyzed Logical Plan ==name: string, age: intProject [name#0,age#1] Filter (age#1 <= 19)  Subquery a   Project [name#0,age#1]    Filter (age#1 >= 13)     Subquery people      LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at <console>:22//这里需要注意Optimized与Analyzed间的区别//Filter被进行了优化== Optimized Logical Plan ==Filter ((age#1 >= 13) && (age#1 <= 19)) LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at <console>:22== Physical Plan ==Filter ((age#1 >= 13) && (age#1 <= 19)) Scan PhysicalRDD[name#0,age#1]Code Generation: true

6. 聚合SQL运行流程

执行语句:

scala> val aggregateQuery=sqlContext.sql("SELECT a.name,sum(a.age) FROM (SELECT * FROM people WHERE age >= 13)a where a.age <= 19 group by a.name")aggregateQuery: org.apache.spark.sql.DataFrame = [name: string, _c1: bigint]

运行流程查看:

scala> aggregateQuery.queryExecutionres6: org.apache.spark.sql.SQLContext#QueryExecution =//注意'Aggregate ['a.name], [unresolvedalias('a.name),unresolvedalias('sum('a.age))]//即group by a.name被 parsed为unresolvedalias('a.name)== Parsed Logical Plan =='Aggregate ['a.name], [unresolvedalias('a.name),unresolvedalias('sum('a.age))] 'Filter ('a.age <= 19)  'Subquery a   'Project [unresolvedalias(*)]    'Filter ('age >= 13)     'UnresolvedRelation [people], None== Analyzed Logical Plan ==name: string, _c1: bigintAggregate [name#0], [name#0,sum(cast(age#1 as bigint)) AS _c1#9L] Filter (age#1 <= 19)  Subquery a   Project [name#0,age#1]    Filter (age#1 >= 13)     Subquery people      LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at <console>:22== Optimized Logical Plan ==Aggregate [name#0], [name#0,sum(cast(age#1 as bigint)) AS _c1#9L] Filter ((age#1 >= 13) && (age#1 <= 19))  LogicalRDD [name#0,age#1], MapPartitions...//查看其Physical Planscala> aggregateQuery.queryExecution.sparkPlanres10: org.apache.spark.sql.execution.SparkPlan =TungstenAggregate(key=[name#0], functions=[(sum(cast(age#1 as bigint)),mode=Final,isDistinct=false)], output=[name#0,_c1#14L]) TungstenAggregate(key=[name#0], functions=[(sum(cast(age#1 as bigint)),mode=Partial,isDistinct=false)], output=[name#0,currentSum#17L])  Filter ((age#1 >= 13) && (age#1 <= 19))   Scan PhysicalRDD[name#0,age#1]

其它SQL语句,大家可以使用同样的方法查看其执行流程,以掌握Spark SQL背后实现的基本思想。

  相关解决方案