当前位置: 代码迷 >> SQL >> Spark Shell操作
  详细解决方案

Spark Shell操作

热度:267   发布时间:2016-05-05 10:56:08.0
【Spark】Spark十: Spark SQL第一部分

Spark的One Stack to rule them all的特性,在Spark SQL即有显现。在传统的基于Hadoop的解决方案中,需要另外安装Pig或者Hive来解决类SQL的即席查询问题。

?

本文以Spark Shell交互式命令行终端简单的体验下Spark提供的类SQL的数据查询能力

?

上传数据到HDFS

首先将测试数据上传到HDFS中,本文用到的测试数据来自于Spark安装里面的people.txt文件,它位于spark-1.2.0-bin-hadoop2.4\examples\src\main\resources\people.txt。people.txt的文件内容是:

?

Michael, 29Andy, 30Justin, 19

?

使用如下命令将people.txt上传至HDFS(people.txt已经拷贝至当前目录

?

?

hdfs dfs -put people.txt /user/hadoop

?

Spark Shell操作

?

1. 创建SQLContext对象

?

val cxt = new org.apache.spark.sql.SQLContext(sc);cxt: org.apache.spark.sql.SQLContext = [email protected]

?

2. 引入隐式转化,用于把RDD转换为SchemaRDD

?

scala> import cxt._import cxt._

?

3. 创建一个POJO类Person

?

scala> case class Person(name: String, age: Int)defined class Person

?

4. 读取HDFS中的数据并ORM为Person集合

?

scala> val people = sc.textFile("people.txt").map(_.split(",")).map(p => Person(p(0),p(1).trim.toInt))

?

5. 查看people这个RDD的lineage的关系

?

scala> people.toDebugString15/01/03 06:25:17 INFO mapred.FileInputFormat: Total input paths to process : 1res0: String = (1) MappedRDD[3] at map at <console>:19 [] |  MappedRDD[2] at map at <console>:19 [] |  people.txt MappedRDD[1] at textFile at <console>:19 [] |  people.txt HadoopRDD[0] at textFile at <console>:19 []

?

6. 将people这个RDD注册为一个虚拟表People

?

scala> people.registerAsTable("People")

?

此时查看people的RDD lineage关系,结果同第5步一样

?

scala> people.toDebugStringres2: String = (1) MappedRDD[3] at map at <console>:19 [] |  MappedRDD[2] at map at <console>:19 [] |  people.txt MappedRDD[1] at textFile at <console>:19 [] |  people.txt HadoopRDD[0] at textFile at <console>:19 []

?

7. 对People表进行查询并查看查询计划和物理计划

?

scala> val teenagers = cxt.sql("select name from People where age < 20 and age > 10");teenagers: org.apache.spark.sql.SchemaRDD = SchemaRDD[6] at RDD at SchemaRDD.scala:108== Query Plan ==== Physical Plan ==Project [name#0] Filter ((age#1 < 20) && (age#1 > 10))  PhysicalRDD [name#0,age#1], MapPartitionsRDD[4] at mapPartitions at ExistingRDD.scala:36scala> teenagers.toDebugStringres3: String = (1) SchemaRDD[6] at RDD at SchemaRDD.scala:108== Query Plan ==== Physical Plan ==Project [name#0] Filter ((age#1 < 20) && (age#1 > 10))  PhysicalRDD [name#0,age#1], MapPartitionsRDD[4] at mapPartitions at ExistingRDD.scala:36 [] |  MapPartitionsRDD[8] at mapPartitions at basicOperators.scala:43 [] |  MapPartitionsRDD[7] at mapPartitions at basicOperators.scala:58 [] |  MapPartitionsRDD[4] at mapPartitions at ExistingRDD.scala:36 [] |  MappedRDD[3] at map at <console>:19 [] |  MappedRDD[2] at map at <console>:19 [] |  people.txt MappedRDD[1] at textFile at <console>:19 [] |  people.txt HadoopRDD[0] at textFile at <console>:19 []

?

8. 提交查询作业,打印结果

?

teenagers.map(t => "Name:" + t(0)).collect().foreach(println)///结果Justin

?

?

参考:http://spark.apache.org/docs/latest/sql-programming-guide.html#getting-started

?

?