当前位置: 代码迷 >> 综合 >> 大数据之spark_spark中的特殊算子cache、persist、checkpoint
  详细解决方案

大数据之spark_spark中的特殊算子cache、persist、checkpoint

热度:97   发布时间:2024-02-22 18:04:01.0

cache、persist

将数据缓存到内存,第一次触发Action,才会将数据放入内存,以后在触发Action,可以复用前面内存中缓存的数据,可以提升技术效率
cache和persist的使用场景:一个application多次触发Action,为了复用前面RDD的数据,避免反复读取HDFS(数据源)中的数据和重复计算,persist可以将数据缓存到内存或磁盘【executor所在的内存或磁盘】,第一次触发action才放入到内存或磁盘,以后会缓存的RDD进行操作可以复用缓存的数据。
一个RDD多次触发Action缓存才有意义,如果将数据缓存到内存,内存不够,以分区位单位,只缓存部分分区的数据,cache底层调用persist,persist可以指定更加丰富的存储基本,支持多种StageLevel,并可以将数据按指定的序列化格式存入,不指定默认放入内存使用的是java对象存储,但是占用空间大,优点速度快,通常指定的序列化格式为kryo格式,如果是自定义的Bean,想要用Kryo格式进行序列化,需要手动注册
cache和persist方法,严格来说,不是Transformation,也不是action, 因为没有生成新的RDD,只是标记当前rdd要cache或persist
最佳实践:1.原始的数据,经过整理过滤后或轻度聚合后再进行cache或persist效果更佳;
2.推荐使用的存储级别为MEMORY_AND_DISK_SER(先存在内存中,内存存不下再存在磁盘中,并序列化)

 val conf: SparkConf = new SparkConf().setAppName("TeacherTopN")val sc = new SparkContext(conf)val rdd1 = sc.parallelize(List(("a", 10), ("b", 15), ("c", 6), ("d", 4),("a", 5),("a", 3),("g", 7),("b", 8)),4)//设置序列化格式为Kryosc.getConf.set("spark.serializer","org.apache.spark.serializer.KryoRegistrator")//对rdd1做标记,下次触发action时,将rdd1算子计算后的结果缓存到内存中//注意上面设置的序列化格式,cache()算子不会对生效//对数据整理筛选后再缓存起来,节省内存的空间rdd1.filter(_.equals("a")).cache()//调用persist(StorageLevel.MEMORY_AND_DISK_SER)算子//括号的参数表示,先存入内存中,内存中不够再存入磁盘中,并按指定的序列化格式序列化rdd1.filter(_.equals("a")).persist(StorageLevel.MEMORY_AND_DISK_SER)//第一次触发action,正常收集数据的同时,将RDD1的计算结果也储存到Executor的内存中rdd1.count()//第二次触发action,就会直接从内存中读取rdd1计算的数据,运算速度会提升几十倍rdd1.count()

所有的StorageLevel 标准如下:

object StorageLevel {
    
//第一个参数为是否存内存
//第二个参数为是否存磁盘
//第三个参数为是否存堆外内存
//第四个参数为是否按指定序列化格式序列化,注意这里的false表示要序列化
//第五个参数表示存两份val NONE = new StorageLevel(false, false, false, false)val DISK_ONLY = new StorageLevel(true, false, false, false)val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)val MEMORY_ONLY = new StorageLevel(false, true, false, true)val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)val OFF_HEAP = new StorageLevel(true, true, true, false, 1)

checkpoint

checkpoint使用场景:适合复杂的计算【机器学习、迭代计算】,为了避免中间结果数据丢失重复计算,可以将宝贵的中间结果保存到hdfs中,保证中间结果安全。
在调用rdd的checkpint方法之前,一定要指定checkpoint的目录sc.setCheckPointDir,指的HDFS存储目录,为保证中间结果安全,将数据保存到HDFS中
第一次触发Action,才做checkpoint,会额外触发一个job,这个job的目的就是将结果保存到HDFS中
如果RDD做了checkpoint,这个RDD以前的依赖关系就不在使用了,没有了依赖关系话,假如hdfs中的数据丢失了,再调用Action就会报错,因为它已经和前面的RDD断了联系,只能将前面的RDD从头再运算一遍
触发多次Action,checkpoint才有意义,多用于迭代计算
checkpoint严格的说,不是Transformation,也不是action,只是标记当前RDD要做checkpoint
如果checkpoint前,对rdd进行了cache,可以避免数据重复计算,如果有cache的数据优先使用cache,cache的数据没有再使用checkpoint

	//设置触发checkpoint后计算的中间结果要去往的hdfs路径//也可以不使用hdfs存储,但hdfs中更安全sc.setCheckpointDir("hdfs文件夹路径名")//给rdd1打上checkpoint标记rdd1.checkpoint()//触发action,会生成两个Job,一个负责生成checkpoint文件,一个往Driver中写入rdd1.collect()//再次触发action,就会去hdfs中读取上次存储中间结果的checkpoint文件rdd1.collect()
  相关解决方案