JavaPairRDD的flatMap方法讲解
官方文档
/*** Return a new RDD by first applying a function to all elements of this* RDD, and then flattening the results.*/
说明
首先将一个函数应用于该函数的所有元素,从而返回一个新的RDD
然后将结果展平。
函数原型
// java
public static <U> JavaRDD<U> flatMap(FlatMapFunction<T,U> f)
// scala
def flatMap[U](f: FlatMapFunction[(K, V), U]): JavaRDD[U]
示例
public class FlatMap {
public static void main(String[] args) {
System.setProperty("hadoop.home.dir", "E:\\hadoop-2.7.1");SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("Spark_DEMO");JavaSparkContext sc = new JavaSparkContext(sparkConf);JavaPairRDD<String, String> javaPairRDD1 = sc.parallelizePairs(Lists.newArrayList(new Tuple2<String, String>("cat", "11"), new Tuple2<String, String>("dog", "22"),new Tuple2<String, String>("cat", "13"), new Tuple2<String, String>("pig", "44")), 2);// 数据扁平化JavaRDD<String> javaRDD = javaPairRDD1.flatMap(new FlatMapFunction<Tuple2<String, String>, String>() {
public Iterator<String> call(Tuple2<String, String> stringStringTuple2) throws Exception {
List<String> list = Lists.newArrayList();list.add(stringStringTuple2._1);list.add(stringStringTuple2._2);return list.iterator();}});// 输出数据javaRDD.foreach(new VoidFunction<String>() {
public void call(String s) throws Exception {
System.out.println(s);}});}
}
结果
19/03/21 20:49:54 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
cat
11
dog
22
19/03/21 20:49:54 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 751 bytes result sent to driver
19/03/21 20:49:54 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, localhost, executor driver, partition 1, PROCESS_LOCAL, 4900 bytes)
19/03/21 20:49:54 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)
19/03/21 20:49:54 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 158 ms on localhost (executor driver) (1/2)
cat
13
pig
44
19/03/21 20:49:54 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 665 bytes result sent to driver
扁平化的效果就是每次获取一个元素然后返回一个迭代器
就比如
1,3
5,4
6,9
啪,一巴掌摁下去 就成了
1
3
5
4
6
9
JavaPairRDD的flatMapToDouble方法讲解
官方文档
/*** Return a new RDD by first applying a function to all elements of this* RDD, and then flattening the results.*/
说明
首先将一个函数应用于该函数的所有元素,从而返回一个新的RDD,然后将结果展平。
函数原型
// java
public static JavaDoubleRDD flatMapToDouble(DoubleFlatMapFunction<T> f)
// scala
def flatMapToDouble(f: DoubleFlatMapFunction[(K, V)]): JavaDoubleRDD
示例
public class FlatMapToDouble {
public static void main(String[] args) {
System.setProperty("hadoop.home.dir", "E:\\hadoop-2.7.1");SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("Spark_DEMO");JavaSparkContext sc = new JavaSparkContext(sparkConf);JavaPairRDD<String, String> javaPairRDD1 = sc.parallelizePairs(Lists.newArrayList(new Tuple2<String, String>("cat", "11"), new Tuple2<String, String>("dog", "22"),new Tuple2<String, String>("cat", "13"), new Tuple2<String, String>("pig", "44")), 2);JavaDoubleRDD javaDoubleRDD = javaPairRDD1.flatMapToDouble(new DoubleFlatMapFunction<Tuple2<String, String>>() {
public Iterator<Double> call(Tuple2<String, String> stringStringTuple2) throws Exception {
return Arrays.asList(Double.parseDouble(stringStringTuple2._2)).iterator();}});// 输出javaDoubleRDD.foreach(new VoidFunction<Double>() {
public void call(Double aDouble) throws Exception {
System.out.println(aDouble);}});}
}
结果
19/03/22 11:17:01 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
11.0
22.0
19/03/22 11:17:01 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 708 bytes result sent to driver
19/03/22 11:17:01 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, localhost, executor driver, partition 1, PROCESS_LOCAL, 4900 bytes)
19/03/22 11:17:01 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)
19/03/22 11:17:01 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 265 ms on localhost (executor driver) (1/2)
13.0
44.0
19/03/22 11:17:01 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 708 bytes result sent to driver
JavaPairRDD的flatMapToPair方法讲解
官方文档
/*** Return a new RDD by first applying a function to all elements of this* RDD, and then flattening the results.*/
说明
首先将一个函数应用于该函数的所有元素,从而返回一个新的RDD,然后将结果展平。
函数原型
// java
public static <K2,V2> JavaPairRDD<K2,V2> flatMapToPair(PairFlatMapFunction<T,K2,V2> f)
// scala
def flatMapToPair[K2, V2](f: PairFlatMapFunction[(K, V), K2, V2]): JavaPairRDD[K2, V2]
示例
public class flatMapToPair {
public static void main(String[] args) {
System.setProperty("hadoop.home.dir", "E:\\hadoop-2.7.1");SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("Spark_DEMO");JavaSparkContext sc = new JavaSparkContext(sparkConf);JavaRDD<String> javaRDD = sc.parallelize(Arrays.asList("xiaoming,12345,1","wangxi,546546,1","liming,789789897,2"),2);// javaRDD-》javaPairRDDSystem.out.println("flatMapToPair过程:");JavaPairRDD<String,String> javaPairRDD = javaRDD.flatMapToPair(new PairFlatMapFunction<String, String, String>() {
public Iterator<Tuple2<String, String>> call(String s) throws Exception {
String[] strings = s.split(",",-1);List<Tuple2<String,String>> list = Lists.newArrayList();for (int i = 0; i < strings.length; i++) {
list.add(new Tuple2<String, String>(strings[i],"1"));}return list.iterator();}});System.out.println("输出结果:");javaPairRDD.foreach(new VoidFunction<Tuple2<String, String>>() {
public void call(Tuple2<String, String> stringStringTuple2) throws Exception {
System.out.println(stringStringTuple2);}});JavaPairRDD<String,String> javaPairRDD1 = javaRDD.mapToPair(new PairFunction<String, String, String>() {
public Tuple2<String, String> call(String s) throws Exception {
String[] strings = s.split(",",-1);return new Tuple2<String, String>(strings[0], s);}});// 一个计算字符串中相邻字符出现的次数// 填充数据String ss = "A;B;C;D;B;D;C;B;D;A;E;D;C;A;B";JavaRDD<String> javaRDDs = sc.parallelize(Arrays.asList(ss));// 遍历RDDJavaRDD<String[]> javaRDD1 = javaRDDs.map(new Function<String, String[]>() {
public String[] call(String s) throws Exception {
return s.split(";",-1);}});/*** A; B; C; D; B; D; C; B; D; A; E; D; C; A; B*/// flatMapToPair 过程System.out.println("flatMapToPair 过程:");JavaPairRDD<String,Integer> javaPairRDD2 = javaRDD1.flatMapToPair(new PairFlatMapFunction<String[], String, Integer>() {
public Iterator<Tuple2<String, Integer>> call(String[] strings) throws Exception {
List<Tuple2<String,Integer>> list = Lists.newArrayList();for (int i = 0; i < strings.length-1; i++) {
String ss = strings[i]+strings[i+1];list.add(new Tuple2<String, Integer>(ss, 1));}return list.iterator();}});// 输出中间结果System.out.println("输出中间结果:");javaPairRDD2.foreach(new VoidFunction<Tuple2<String, Integer>>() {
public void call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
System.out.println(stringIntegerTuple2);}});// 合并key 计算次数JavaPairRDD<String,Integer> javaPairRDD3 = javaPairRDD2.reduceByKey(new Function2<Integer, Integer, Integer>() {
public Integer call(Integer integer, Integer integer2) throws Exception {
return integer+integer2;}});// 输出结果System.out.println("输出结果:");javaPairRDD3.foreach(new VoidFunction<Tuple2<String, Integer>>() {
public void call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
System.out.println(stringIntegerTuple2);}});}
}
结果
19/03/22 15:40:45 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 10.124.209.6, 65099, None)
flatMapToPair过程:
输出结果:
19/03/22 15:40:46 INFO SparkContext: Starting job: foreach at flatMapToPair.java:36
19/03/22 15:40:47 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, executor driver, partition 0, PROCESS_LOCAL, 4846 bytes)
19/03/22 15:40:47 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
(xiaoming,1)
(12345,1)
(1,1)
(wangxi,1)
(546546,1)
(1,1)
(liming,1)
(789789897,1)
(2,1)
19/03/22 15:40:47 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 665 bytes result sent to driver
19/03/22 15:40:47 INFO DAGScheduler: Job 0 finished: foreach at flatMapToPair.java:36, took 0.886961 s// 小示例开始
flatMapToPair 过程:
输出中间结果:
19/03/22 15:40:47 INFO SparkContext: Starting job: foreach at flatMapToPair.java:76
19/03/22 15:40:47 INFO Executor: Running task 0.0 in stage 1.0 (TID 2)
(AB,1)
(BC,1)
(CD,1)
(DB,1)
(BD,1)
(DC,1)
(CB,1)
(BD,1)
(DA,1)
(AE,1)
(ED,1)
(DC,1)
(CA,1)
(AB,1)
19/03/22 15:40:47 INFO Executor: Finished task 0.0 in stage 1.0 (TID 2). 665 bytes result sent to driver
19/03/22 15:40:47 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 2) in 18 ms on localhost (executor driver) (1/1)
19/03/22 15:40:47 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool
19/03/22 15:40:47 INFO DAGScheduler: ResultStage 1 (foreach at flatMapToPair.java:76) finished in 0.019 s
19/03/22 15:40:47 INFO DAGScheduler: Job 1 finished: foreach at flatMapToPair.java:76, took 0.065599 s
输出结果:
19/03/22 15:40:47 INFO SparkContext: Starting job: foreach at flatMapToPair.java:89
19/03/22 15:40:47 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 5 ms
(BC,1)
(CA,1)
(CB,1)
(DA,1)
(ED,1)
(AB,2)
(AE,1)
(CD,1)
(DB,1)
(DC,2)
(BD,2)
19/03/22 15:40:47 INFO Executor: Finished task 0.0 in stage 3.0 (TID 4). 1095 bytes result sent to driver
flatMapToPair也可以看做是flatMap和MapToPair的两个过程的合并。
JavaPairRDD的flatMapValues方法讲解
官方文档
*** Pass each value in the key-value pair RDD through a flatMap function without changing the* keys; this also retains the original RDD's partitioning.*/
说明
通过flatmap函数传递键值对rdd中的每个值,而不更改键;
这还保留了原始RDD的分区。
这个方法是扁平化的value 如果value里有多个值并且以,连接 呢么每个value返回的就是这些值一个list
和mapValue不同的是 mapValue只是遍历value值 输入时什么类型输出也是什么类型
函数原型
// java
public <U> JavaPairRDD<K,U> flatMapValues(Function<V,Iterable<U>> f)
// scala
def flatMapValues[U](f: Function[V, Iterable[U]]): JavaPairRDD[K, U]
示例
public class FlatMapValues {
public static void main(String[] args) {
System.setProperty("hadoop.home.dir", "E:\\hadoop-2.7.1");SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("Spark_DEMO");JavaSparkContext sc = new JavaSparkContext(sparkConf);JavaPairRDD<String, String> javaPairRDD1 = sc.parallelizePairs(Lists.newArrayList(new Tuple2<String, String>("cat", "11,23,44"), new Tuple2<String, String>("dog", "22,11"),new Tuple2<String, String>("cat", "13,33,66,77"), new Tuple2<String, String>("pig", "44")), 2);javaPairRDD1.foreach(new VoidFunction<Tuple2<String, String>>() {
public void call(Tuple2<String, String> stringStringTuple2) throws Exception {
System.out.println(stringStringTuple2);}});// 输入的是value值 输出的是listJavaPairRDD<String,String> javaPairRDD = javaPairRDD1.flatMapValues(new Function<String, Iterable<String>>() {
public Iterable<String> call(String s) throws Exception {
String[] strings =s.split(",", -1);List<String> list = Lists.newArrayList();for (String string : strings) {
list.add(string);}return list;}});// 输出javaPairRDD.foreach(new VoidFunction<Tuple2<String, String>>() {
public void call(Tuple2<String, String> stringStringTuple2) throws Exception {
System.out.println(stringStringTuple2);}});}
}
结果
// 源数据
(cat,11,23,44)
(dog,22,11)
(cat,13,33,66,77)
(pig,44)
19/03/22 15:57:54 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
// 结果数据
(cat,11)
(cat,23)
(cat,44)
(dog,22)
(dog,11)
19/03/22 15:57:54 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 751 bytes result sent to driver
19/03/22 15:57:54 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, localhost, executor driver, partition 1, PROCESS_LOCAL, 4909 bytes)
19/03/22 15:57:54 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)
19/03/22 15:57:54 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 142 ms on localhost (executor driver) (1/2)
(cat,13)
(cat,33)
(cat,66)
(cat,77)
(pig,44)
19/03/22 15:57:54 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 665 bytes result sent to driver