当前位置: 代码迷 >> SQL >> SparkSQL读取HBase数据,经过自定义外部数据源
  详细解决方案

SparkSQL读取HBase数据,经过自定义外部数据源

热度:37   发布时间:2016-05-05 10:14:34.0
SparkSQL读取HBase数据,通过自定义外部数据源

关键字:SparkSQL读取HBase、SparkSQL自定义外部数据源

?

?

前面文章介绍了SparSQL通过Hive操作HBase表。

?

SparkSQL从1.2开始支持自定义外部数据源(External DataSource),这样就可以通过API接口来实现自己的外部数据源。这里基于Spark1.4.0,简单介绍SparkSQL自定义外部数据源,访问HBase表。

?

?

HBase中表如下:

?

create 'lxw1234',{NAME => 'f1',VERSIONS => 1},{NAME => 'f2',VERSIONS => 1},{NAME => 'f3',VERSIONS => 1} put 'lxw1234','lxw1234.com','f1:c1','name1'put 'lxw1234','lxw1234.com','f1:c2','name2'put 'lxw1234','lxw1234.com','f2:c1','age1'put 'lxw1234','lxw1234.com','f2:c2','age2'put 'lxw1234','lxw1234.com','f3:c1','job1'put 'lxw1234','lxw1234.com','f3:c2','job2'put 'lxw1234','lxw1234.com','f3:c3','job3' hbase(main):025:0* scan 'lxw1234'ROW COLUMN+CELLlxw1234.com column=f1:c1, timestamp=1435624625198, value=name1lxw1234.com column=f1:c2, timestamp=1435624591717, value=name2lxw1234.com column=f2:c1, timestamp=1435624608759, value=age1lxw1234.com column=f2:c2, timestamp=1435624635261, value=age2lxw1234.com column=f3:c1, timestamp=1435624662282, value=job1lxw1234.com column=f3:c2, timestamp=1435624697028, value=job2lxw1234.com column=f3:c3, timestamp=1435624697065, value=job3

?

?

进入spark-shell

?

?

sh /usr/local/spark-1.4.0-bin-hadoop2.3/bin/spark-shell --jars /tmp/sparksql-hbase.jar --total-executor-cores 30 --executor-memory 4G --master spark://lxw1234.com:7077

?

?

运行以下代码:

?

?

import sqlContext._  var hbasetable = sqlContext.read.format("com.lxw1234.sparksql.hbase").options(Map("sparksql_table_schema" -> "(row_key string, c1 string, c2 string, c3 string)","hbase_table_name" -> "lxw1234","hbase_table_schema" -> "(:key , f1:c2 , f2:c2 , f3:c3 )")).load() //sparksql_table_schema参数为sparksql中表的定义//hbase_table_name参数为HBase中表名//hbase_table_schema参数为HBase表中需要映射到SparkSQL表中的列族和列,这里映射过//去的字段要和sparksql_table_schema中定义的一致,包括顺序。  scala> hbasetable.printSchema()root|-- row_key: string (nullable = false)|-- c1: string (nullable = false)|-- c2: string (nullable = false)|-- c3: string (nullable = false) hbasetable.registerTempTable("lxw1234")  sqlContext.sql("SELECT * from lxw1234").collectres3: Array[org.apache.spark.sql.Row] = Array([lxw1234.com,name2,age2,job3]) sqlContext.sql("SELECT row_key,concat(c1,'|',c2,'|',c3) from lxw1234").collectres3: Array[org.apache.spark.sql.Row] = Array([lxw1234.com,name2|age2|job3])

?

?

可以用SQL正常访问。

?

源码和相关配置

  • 本来在SparkSQL中通过外部数据源建表的语法是:

?

CREATE TEMPORARY TABLE hbasetable

?

USING com.lxw1234.sparksql.hbase

?

OPTIONS (

?

sparksql_table_schema?? ‘(row_key string, c1 string, c2 string, c3 string)’,

?

hbase_table_name?? ‘lxw1234′,

?

hbase_table_schema ‘(:key , f1:c2 , f2:c2 , f3:c3)’

?

)

?

在我的Spark1.4中报错,会使用Hive的语法解析器解析这个DDL语句,因为Hive0.13中没有这种语法,因此报错。

?

是否是因为Spark1.4包的编译了Hive的原因?

?

  • 基于Spark1.4的程序源码com.lxw1234.sparksql.hbase见:http://lxw1234.com/archives/2015/07/332.htm
  • 源码的编译依赖HBase的相关jar包:

hbase-client-0.96.1.1-cdh5.0.0.jar

hbase-common-0.96.1.1-cdh5.0.0.jar

hbase-protocol-0.96.1.1-cdh5.0.0.jar

hbase-server-0.96.1.1-cdh5.0.0.jar

还有HBase的集群信息:

hbase.zookeeper.quorum

hbase.client.scanner.caching

我之前在配置时候已经将这几个jar包和参数加到Spark集群的CLASSPATH中了,可参考 http://lxw1234.com/archives/2015/07/330.htm

??????

  • 此程序是OopsOutOfMemory基于Spark1.2开发的,我只做了很小的修改。

https://github.com/OopsOutOfMemory/spark-sql-hbase

  • 此程序只做学习和测试使用,并未测试性能。

?

?????