当前位置: 代码迷 >> 综合 >> PySpark coding
  详细解决方案

PySpark coding

热度:20   发布时间:2023-12-18 05:03:37.0

记录一些pyspark的简单程序......

WordCount

读取hdfs文件,wc:

from pyspark import SparkContext, SparkConf"""
Pyspark Word Count Demo
"""def sorted_all_result(wc_rdd):"""WordCount取全量,collect()之后排序:param wc_rdd: RDD:return:"""word_list = wc_rdd.collect()sorted_list = sorted(word_list, key=lambda tuple2: tuple2[1], reverse=True)print(sorted_list)def sorted_top_n_result(wc_rdd, n):"""WordCount takeOrdered()取TopN,倒序:param wc_rdd: RDD:param n: 取数个数:return:"""top_n_list = wc_rdd.takeOrdered(n, key=lambda tuple2: tuple2[1])print(top_n_list)def main():conf = SparkConf().setAppName("PysparkDemo01").setMaster("spark://192.168.61.128:7077")sc = SparkContext(conf=conf)rdd = sc.textFile("hdfs://192.168.61.128:9000/data/wc.txt")wc_rdd = rdd.flatMap(lambda line: str(line).replace(".", "").replace(",", "").lower().split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda x, y: x + y)# sorted_all_result(wc_rdd)# print(type(wc_rdd))sorted_top_n_result(wc_rdd, 3)if __name__ == "__main__":main()

读取文件创建DataFrame以及一些简单操作

文件:C:/data/students.csv

 df.withColumn()为创建列,第一个参数为列名。之后用到了pyspark.sql.functions里面的方法进行赋值和数据处理等。

from pyspark.sql import SparkSession
import pyspark.sql.functions as Fdef gen_df():# spark = SparkSession.builder.appName("SparkSqlDemo01").master("spark://192.168.61.128:7077").getOrCreate()spark = SparkSession.builder.appName("SparkSqlDemo01").master("local").getOrCreate()df = spark.read.csv("C:/data/students.csv", sep=",", header=True)result = (df.withColumn("year", F.substring(F.col("birthday"), 0, 4)).withColumn("last_month_day", F.lit("1231")).withColumn("birth_year_last_day", F.to_date(F.concat(F.col("year"), F.col("last_month_day")), "yyyyMMdd")).select("*"))result.show()if __name__ == "__main__":gen_df()

  相关解决方案