当前位置: 代码迷 >> 综合 >> 10-Spark Streaming
  详细解决方案

10-Spark Streaming

热度:30   发布时间:2023-12-04 15:07:37.0

前言

因为《Spark-快速大数据分析》使用Spark 版本较老,并且1.1 下的Streaming 并未支持Python,所以书上并没有相应的Python 版本的代码。
因此,博主这节参考官网手册学习:
http://spark.apache.org/docs/latest/streaming-programming-guide.html
如果,你的浏览器不支持翻译,这里提供了翻译的版本:
链接:https://pan.baidu.com/s/113LVfghzF7UZen260u1ZPg
提取码:wg1f
这里就不详写Spark Streaming的各种功能和特点了。看官网即可。
博主在这里仅写出一些运行示例,和一些认为重要的点吧。所以,文章的顺序比较乱,但顺序是按照官网的顺序排列
博主使用Spark 版本:2.4.0 (前面的文章博主使用的是2.0.2)

示例代码见GitHub:
https://github.com/GYT0313/Spark-Learning

1. 示例

博主这里仍然使用的是Spark 集群运行。
假设从侦听TCP套接字的数据服务器接收的文本数据的运行字数。
NetworkWordCount.py

from pyspark import SparkContext
from pyspark.streaming import StreamingContext# 创建一个sc(使用本地2个core)   和时间间隔为4s 的ssc
sc = SparkContext("local[2]", "NetworkWordCount")
ssc = StreamingContext(sc, 4)# 创建DStream,并读取套接字
lines = ssc.socketTextStream("master", 9999)# 单词计数
words = lines.flatMap(lambda line: line.split(" "))
pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)# 将每个RDD的前10个元素打印
wordCounts.pprint()ssc.start() # 启动计算
ssc.awaitTermination() # 等待终止

先运行发送套节字:
在这里插入图片描述
提交应用:master 和 9999可以不用写,这里写了相当于命令行参数,后面知道
在这里插入图片描述
输入套节字内容:
在这里插入图片描述
在这里插入图片描述
Ctrl + c 结束应用。

剖析运行流程:

  1. 创建StreamingContext,设置batch interval(批次间隔)
    ssc = StreamingContext(sc, 4)
  2. 读取流数据,如套接字、文件、Kafka等
    lines = ssc.socketTextStream(“master”, 9999)
  3. 输出操作,如输出到控制台、文件、数据库等
    wordCounts.pprint()

可以说,Spark Streaming的大多数应用都将遵循上述的三个步骤。

官网解释(机器翻译可能存在语句问题):
在这里插入图片描述

2. 输入DStreams和Receivers

在这里插入图片描述
红框的内容非常重要:
在这里插入图片描述

基本来源

  1. 套接字
    上述示例展示了从套接字读取。
  2. 文件
    代码:
    ReadFromFile.py
from pyspark import SparkContext
from pyspark.streaming import StreamingContext# 创建一个sc(使用集群)   和时间间隔为5 的ssc
sc = SparkContext("spark://master:7077", "ReadFromFile")
ssc = StreamingContext(sc, 5)lines = ssc.textFileStream("hdfs://master:9000/user/hadoop/spark_data/")
lines.pprint()ssc.start() # 启动计算
ssc.awaitTermination() # 等待终止

运行:
在这里插入图片描述
上传文件:
在这里插入图片描述
结果:
在这里插入图片描述
3.自定义来源,需要用户自定义接收器
4. RDD作为流的队列
5. 等等,有兴趣可以查看官网。

3. reduceByKeyAndWindow 和 checkpoint 示例

窗口操作有很多,这里以reduceByKeyAndWindow 为例。值得一提的是:不能单独使用reduceByKeyAndWindow,必须同时设置checkpoint
官网也有相应的代码示例(reduceByKey,checkpoint、累加器、广播变量),不过博主看过源代码之后,觉得可以写一个简单的示例(毕竟自己构思过,理解和印象会更深刻)。所以,自己尝试写出一个示例来验证reduceByKeyAndWindow 和 checkpoint 的运行结果是如何,以此来理解运行原理。
官网
https://github.com/apache/spark/blob/master/examples/src/main/python/streaming/recoverable_network_wordcount.py
Spark安装包:
Spark-2.4.0/examples/src/main/python/streaming/recoverable_network_wordcount.py

编写脚本:
和第一个示例一样,同样使用套接字作为数据源。
NetworkWordCountAndWindow.py
由于博主使用的是 HDFS 为存储系统,所以先再HDFS 上创建了目录 /tmp/spark/checkpoint,如果是使用本地我文件系统,进行相应改变即可(博主没尝试过)。

from pyspark import SparkContext
from pyspark.streaming import StreamingContextdef functionToCreateContext():# 创建一个sc   和批处理时间间隔为2s 的sscsc = SparkContext("spark://master:7077", "NetworkWordCountAndWindow")ssc = StreamingContext(sc, 2)# 设置检查的目录ssc.checkpoint("hdfs://master:9000/tmp/spark/checkpoint")# 创建DStream,并读取套接字lines = ssc.socketTextStream("master", 9999)# 单词计数words = lines.flatMap(lambda line: line.split(" "))pairs = words.map(lambda word: (word, 1))# 设置逆函数(如 + 对应 -。作用是提升应用处理效率等等。逆函数博主也很迷糊,如果想要深入了解的# 可搜索其他详解讲解这部分的博客),窗口时间=12和滑动时间=4(必须为批处理间隔的倍数)wordCounts = pairs.reduceByKeyAndWindow(lambda x, y: x + y, lambda x, y: x - y, 12, 4)# 设置检查点的间隔(比如待会出现的报错就可能是因为某些# 数据未来得及被检查点保存,将会导致数据的丢失)# 官网建议:DStream的5-10个滑动间隔的检查点间隔是一个很好的设置。wordCounts.checkpoint(16)# 将每个RDD的前10个元素打印wordCounts.pprint()return ssc# 如果存在检查的目录则读取,没有则新建
ssc = StreamingContext.getOrCreate("hdfs://master:9000/tmp/spark/checkpoint", functionToCreateContext)ssc.start() # 启动计算
ssc.awaitTermination() # 等待终止

先启动nc ,再启动应用:
在这里插入图片描述
master 和9999 可以不写:
在这里插入图片描述

在套接字中输入连续很多的数字(时间应该够长保证检查点能够存储 并且数据应该尽量多样化,能够在下次启动时明显看到变化):
在这里插入图片描述
窗口和滑动的理解:
比如,下图中’1’的次数在第一个窗口中偏多,'2’的次数偏少。
但是,在第二个窗口中’1’的偏少,'2’偏多。
输出的数据示例:
在这里插入图片描述
修改官网的图片,以此解释:
从下图中就可以理解,如果套接字的输入顺序为:1、1、1、1、1、2、2、2、2。。。那么,第二个窗口的的’2’比第一个窗口多,'1’比第一个窗口少。
在这里插入图片描述
这时使用Ctrl + C 终止应用:
在这里插入图片描述
可以在 /tmp/spark/checkpoint 下看到存在很多文件,这些文件即为元数据检查点或数据检查点文件(元数据检查点用于恢复应用程序的配置如 sc、ssc等。数据检查点用于恢复如RDD等数据):

hdfs dfs -ls /tmp/spark/checkpoint

再次启动应用,并在未终止的nc 程序中输入 50 50 50 …:
在这里插入图片描述
在控制台中可以看到如下输出:
3、36、6、9等数据是检查点保存的数据,因为特殊原因被终止,并且在代码中设置了如果存在检查点目录将读取已有的配置,而不是创建新的sc、ssc等。
所以,在上次应用程序中保存的数据能够在这一次的应用中出现。
在这里插入图片描述
下面报错为上图的报错信息(对应代码中的注释):

Some blocks could not be recovered as they were not found in memory. To prevent such data loss, enable Write Ahead Log (see programming guide for more details.
有些块无法恢复,因为它们在内存中找不到。要防止此类数据丢失,请启用“预写日志”(有关详细信息,请参阅编程指南)。

当然,这里也说了可以启用“预写日志”来防止数据丢失。

4. foreachRDD

博主认为官网的foreeachRDD 比较重要,而且讲解也让人深刻。推荐阅读。
小节的标题如下:
在这里插入图片描述
下图的解释即RDD 的惰性操作,没有行为操作的话是不会进行求值计算的:
在这里插入图片描述

5. 累加器,广播变量和检查点

代码:
AccumulatorsBroadcastCheckpoint.py

from pyspark import SparkContext
from pyspark.streaming import StreamingContext# 创建一个sc   和时间间隔为4s 的ssc
sc = SparkContext("spark://master:7077", "AccumulatorsBroadcastCheckpoint")
ssc = StreamingContext(sc, 4)# 广播变量,用与筛选列表
def getWordBlacklist(sparkContext):if ("wordBlacklist" not in globals()): # 全局变量是否包含globals()["wordBlacklist"] = sparkContext.broadcast(["a", "b", "c"])return globals()["wordBlacklist"]# 累加器用于计数筛选掉的数量
def getDroppedWordsCounter(sparkContext):if ("droppedWordCounter" not in globals()):globals()["droppedWordCounter"] = sparkContext.accumulator(0)return globals()["droppedWordCounter"]def echo(time, rdd):# 创建或获取广播变量blacklist = getWordBlacklist(rdd.context)# 创建或获取累加器droppedWordCounter = getDroppedWordsCounter(rdd.context)def filterFunc(wordCount):# 如果该单词在筛选列表中,则累加器求和if wordCount[0] in blacklist.value:droppedWordCounter.add(wordCount[1])Falseelse:True# 过滤,必须调用collect() 行为操作rdd.filter(filterFunc).collect()# 输出累加器print('droppedWordCounter = %s' % droppedWordCounter)# 创建DStream,并读取套接字
lines = ssc.socketTextStream("master", 9999)# 单词计数
words = lines.flatMap(lambda line: line.split(" "))
pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)# 将每个RDD的前10个元素打印
wordCounts.foreachRDD(echo)ssc.start() # 启动计算
ssc.awaitTermination() # 等待终止

启动nc :
在这里插入图片描述
启动应用:
在这里插入图片描述
nc 中输入:
在这里插入图片描述
结果:
在这里插入图片描述
可以看到, a b c 被拦截,并且累加器做了计数。
所以,如果访问将访问日志作为数据源,不断发送。将 ERROR、WARNING等作为广播变量,就可以从日志中提取出自己想要的信息。

所有的示例就展示完了,官网的内容很多。博主还没有能力总结官网的知识,所以强烈建议阅读官方文档教程

完!

  相关解决方案