问题描述
我正在尝试将大型 JSON 文件目录转换为 CSV 文件。
但是,在将这些 JSON 文件转换为 CSV 或 Dataframe 友好格式之前,我需要对它们进行转换和剪辑。
这是由transform_json
函数完成的。
下面是一个有效的解决方案,但由于json.loads
/ json.dumps
的后面和第四个,感觉很愚蠢和缓慢。
rdd = (spark_context.textFile('*.json')
.map(json.loads)
.flatMap(transform_json)
.map(json.dumps))
(spark_session.read.json(rdd)
.write.format("com.databricks.spark.csv")
.option("header", "true")
.save("output_dir"))
我需要将它们放入 PySpark Dataframe 中,因为我事先并不知道所有的列,Spark 会为我处理。
如何改进此代码?
1楼
您似乎想合并架构? 您可以使用 Parquet 并阅读此内容: :