当前位置: 代码迷 >> python >> 如何输入标准输入数据以将Spark脚本作为输入
  详细解决方案

如何输入标准输入数据以将Spark脚本作为输入

热度:43   发布时间:2023-06-13 15:34:09.0

我必须处理的文件夹中有很多子目录。 我决定找到linux实用程序来查找所有文件,例如

find . -name *.gz -exec zcat {} \;

现在我必须将stdout数据提供给spark脚本作为输入,如下所示

find . -name *.gz -exec zcat {} \; | ./run_pyspark.py

Spark是否可以读取stdin数据(如简单的python)。 请至少给出一些例子或方法?

也许试试这个:

import subprocess
file_data=subprocess.call(['find', '.', '-name','*.gz',.....])

您可以使用的技巧是通过管道过程传递参数。 参考文档

pipe(command,env = None,checkCode = False)将由管道元素创建的RDD返回到派生的外部进程。

>>> sc.parallelize(['1', '2', '', '3']).pipe('cat').collect()`
[u'1', u'2', u'', u'3']`

因此,您可以在脚本中添加以下内容:

test = sc.parallelize([1])
data = test.pipe("find . -name *.gz -exec zcat {}")

但是,这不是很有效。 如果您只想读取目录中的所有gzip文件,只需将路径传递给文件即可,例如

data = sc.textFile("/path/to/files/*gz")
data.take(3) ## take a sample of records

就这么简单。 也适用于bz2文件。

  相关解决方案