当前位置: 代码迷 >> SQL >> Spark修炼之道(高级篇)——Spark源码翻阅:第十三节 Spark SQL之SQLContext(一)
  详细解决方案

Spark修炼之道(高级篇)——Spark源码翻阅:第十三节 Spark SQL之SQLContext(一)

热度:505   发布时间:2016-05-05 09:44:35.0
Spark修炼之道(高级篇)——Spark源码阅读:第十三节 Spark SQL之SQLContext(一)

作者:周志湖

1. SQLContext的创建

SQLContext是Spark SQL进行结构化数据处理的入口,可以通过它进行DataFrame的创建及SQL的执行,其创建方式如下:

//sc为SparkContextval sqlContext = new org.apache.spark.sql.SQLContext(sc)

其对应的源码为:

def this(sparkContext: SparkContext) = {    this(sparkContext, new CacheManager, SQLContext.createListenerAndUI(sparkContext), true)  }

其调用的是私有的主构造函数:

//1.主构造器中的参数CacheManager用于缓存查询结果//在进行后续查询时会自动读取缓存中的数据//2.SQLListener用于监听Spark scheduler事件,它继承自SparkListener//3.isRootContext表示是否是根SQLContextclass SQLContext private[sql](    @transient val sparkContext: SparkContext,    @transient protected[sql] val cacheManager: CacheManager,    @transient private[sql] val listener: SQLListener,    val isRootContext: Boolean)  extends org.apache.spark.Logging with Serializable {

当spark.sql.allowMultipleContexts设置为true时,则允许创建多个SQLContexts/HiveContexts,创建方法为newSession

def newSession(): SQLContext = {    new SQLContext(      sparkContext = sparkContext,      cacheManager = cacheManager,      listener = listener,      isRootContext = false)  }

其isRootContext 被设置为false,否则会抛出异常,因为root SQLContext只能有一个,其它SQLContext与root SQLContext共享SparkContext, CacheManager, SQLListener。如果spark.sql.allowMultipleContexts为false,则只允许一个SQLContext存在

2. 核心成员变量 ——catalog

 protected[sql] lazy val catalog: Catalog = new SimpleCatalog(conf)

catalog用于注销表、注销表、判断表是否存在等,例如当DataFrame调用registerTempTable 方法时

val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()people.registerTempTable("people")

会sqlContext的registerDataFrameAsTable方法

def registerTempTable(tableName: String): Unit = {    sqlContext.registerDataFrameAsTable(this, tableName)  }

sqlContext.registerDataFrameAsTable实质上调用的就是catalog的registerTable 方法:

private[sql] def registerDataFrameAsTable(df: DataFrame, tableName: String): Unit = {    catalog.registerTable(TableIdentifier(tableName), df.logicalPlan)  }

SimpleCatalog整体源码如下:

class SimpleCatalog(val conf: CatalystConf) extends Catalog {  private[this] val tables = new ConcurrentHashMap[String, LogicalPlan]  override def registerTable(tableIdent: TableIdentifier, plan: LogicalPlan): Unit = {    tables.put(getTableName(tableIdent), plan)  }  override def unregisterTable(tableIdent: TableIdentifier): Unit = {    tables.remove(getTableName(tableIdent))  }  override def unregisterAllTables(): Unit = {    tables.clear()  }  override def tableExists(tableIdent: TableIdentifier): Boolean = {    tables.containsKey(getTableName(tableIdent))  }  override def lookupRelation(      tableIdent: TableIdentifier,      alias: Option[String] = None): LogicalPlan = {    val tableName = getTableName(tableIdent)    val table = tables.get(tableName)    if (table == null) {      throw new NoSuchTableException    }    val tableWithQualifiers = Subquery(tableName, table)    // If an alias was specified by the lookup, wrap the plan in a subquery so that attributes are    // properly qualified with this alias.    alias.map(a => Subquery(a, tableWithQualifiers)).getOrElse(tableWithQualifiers)  }  override def getTables(databaseName: Option[String]): Seq[(String, Boolean)] = {    tables.keySet().asScala.map(_ -> true).toSeq  }  override def refreshTable(tableIdent: TableIdentifier): Unit = {    throw new UnsupportedOperationException  }}

3. 核心成员变量 ——sqlParser

sqlParser在SQLContext的定义:

protected[sql] val sqlParser = new SparkSQLParser(getSQLDialect().parse(_))

SparkSQLParser为顶级的Spark SQL解析器,对Spark SQL支持的SQL语法进行解析,其定义如下:

private[sql] class SparkSQLParser(fallback: String => LogicalPlan) extends AbstractSparkSQLParser

fallback函数用于解析其它非Spark SQL Dialect的语法。
Spark SQL Dialect支持的关键字包括:

protected val AS = Keyword("AS")  protected val CACHE = Keyword("CACHE")  protected val CLEAR = Keyword("CLEAR")  protected val DESCRIBE = Keyword("DESCRIBE")  protected val EXTENDED = Keyword("EXTENDED")  protected val FUNCTION = Keyword("FUNCTION")  protected val FUNCTIONS = Keyword("FUNCTIONS")  protected val IN = Keyword("IN")  protected val LAZY = Keyword("LAZY")  protected val SET = Keyword("SET")  protected val SHOW = Keyword("SHOW")  protected val TABLE = Keyword("TABLE")  protected val TABLES = Keyword("TABLES")  protected val UNCACHE = Keyword("UNCACHE")

4. 核心成员变量 ——ddlParser

用于解析DDL(Data Definition Language 数据定义语言)

 protected[sql] val ddlParser = new DDLParser(sqlParser.parse(_))

其支持的关键字有:

  protected val CREATE = Keyword("CREATE")  protected val TEMPORARY = Keyword("TEMPORARY")  protected val TABLE = Keyword("TABLE")  protected val IF = Keyword("IF")  protected val NOT = Keyword("NOT")  protected val EXISTS = Keyword("EXISTS")  protected val USING = Keyword("USING")  protected val OPTIONS = Keyword("OPTIONS")  protected val DESCRIBE = Keyword("DESCRIBE")  protected val EXTENDED = Keyword("EXTENDED")  protected val AS = Keyword("AS")  protected val COMMENT = Keyword("COMMENT")  protected val REFRESH = Keyword("REFRESH")

主要做三件事,分别是创建表、描述表和更新表

protected lazy val ddl: Parser[LogicalPlan] = createTable | describeTable | refreshTable

createTable方法具有如下(具体功能参考注释说明):

/**   * `CREATE [TEMPORARY] TABLE avroTable [IF NOT EXISTS]   * USING org.apache.spark.sql.avro   * OPTIONS (path "../hive/src/test/resources/data/files/episodes.avro")`   * or   * `CREATE [TEMPORARY] TABLE avroTable(intField int, stringField string...) [IF NOT EXISTS]   * USING org.apache.spark.sql.avro   * OPTIONS (path "../hive/src/test/resources/data/files/episodes.avro")`   * or   * `CREATE [TEMPORARY] TABLE avroTable [IF NOT EXISTS]   * USING org.apache.spark.sql.avro   * OPTIONS (path "../hive/src/test/resources/data/files/episodes.avro")`   * AS SELECT ...   */  protected lazy val createTable: Parser[LogicalPlan] = {    // TODO: Support database.table.    (CREATE ~> TEMPORARY.? <~ TABLE) ~ (IF ~> NOT <~ EXISTS).? ~ tableIdentifier ~      tableCols.? ~ (USING ~> className) ~ (OPTIONS ~> options).? ~ (AS ~> restInput).? ^^ {      case temp ~ allowExisting ~ tableIdent ~ columns ~ provider ~ opts ~ query =>        if (temp.isDefined && allowExisting.isDefined) {          throw new DDLException(            "a CREATE TEMPORARY TABLE statement does not allow IF NOT EXISTS clause.")        }        val options = opts.getOrElse(Map.empty[String, String])        if (query.isDefined) {          if (columns.isDefined) {            throw new DDLException(              "a CREATE TABLE AS SELECT statement does not allow column definitions.")          }          // When IF NOT EXISTS clause appears in the query, the save mode will be ignore.          val mode = if (allowExisting.isDefined) {            SaveMode.Ignore          } else if (temp.isDefined) {            SaveMode.Overwrite          } else {            SaveMode.ErrorIfExists          }          val queryPlan = parseQuery(query.get)          CreateTableUsingAsSelect(tableIdent,            provider,            temp.isDefined,            Array.empty[String],            mode,            options,            queryPlan)        } else {          val userSpecifiedSchema = columns.flatMap(fields => Some(StructType(fields)))          CreateTableUsing(            tableIdent,            userSpecifiedSchema,            provider,            temp.isDefined,            options,            allowExisting.isDefined,            managedIfNoPath = false)        }    }  }

describeTable及refreshTable代码如下:

 /*   * describe [extended] table avroTable   * This will display all columns of table `avroTable` includes column_name,column_type,comment   */  protected lazy val describeTable: Parser[LogicalPlan] =    (DESCRIBE ~> opt(EXTENDED)) ~ tableIdentifier ^^ {      case e ~ tableIdent =>        DescribeCommand(UnresolvedRelation(tableIdent, None), e.isDefined)    }  protected lazy val refreshTable: Parser[LogicalPlan] =    REFRESH ~> TABLE ~> tableIdentifier ^^ {      case tableIndet =>        RefreshTable(tableIndet)    }
1楼qq_33502747昨天 08:41
学习一下n昭通:http://www.ztoynk.com
  相关解决方案