问题描述
我必须处理的文件夹中有很多子目录。 我决定找到linux实用程序来查找所有文件,例如
find . -name *.gz -exec zcat {} \;
现在我必须将stdout数据提供给spark脚本作为输入,如下所示
find . -name *.gz -exec zcat {} \; | ./run_pyspark.py
Spark是否可以读取stdin数据(如简单的python)。 请至少给出一些例子或方法?
1楼
也许试试这个:
import subprocess
file_data=subprocess.call(['find', '.', '-name','*.gz',.....])
2楼
您可以使用的技巧是通过管道过程传递参数。 参考文档
pipe(command,env = None,checkCode = False)将由管道元素创建的RDD返回到派生的外部进程。
>>> sc.parallelize(['1', '2', '', '3']).pipe('cat').collect()`
[u'1', u'2', u'', u'3']`
因此,您可以在脚本中添加以下内容:
test = sc.parallelize([1])
data = test.pipe("find . -name *.gz -exec zcat {}")
但是,这不是很有效。 如果您只想读取目录中的所有gzip文件,只需将路径传递给文件即可,例如
data = sc.textFile("/path/to/files/*gz")
data.take(3) ## take a sample of records
就这么简单。 也适用于bz2文件。