当前位置: 代码迷 >> 综合 >> 快学Scala-单词计数程序、并行计算
  详细解决方案

快学Scala-单词计数程序、并行计算

热度:26   发布时间:2023-11-07 04:43:55.0

hadoop和strom都有介绍过怎么进行单词计数,这里使用Scala来实现个简易的单词计数程序,在这之前补充几个常用方法

1、排序
排序在前面有介绍过,这里是因为单词计数用到了排序就带过一下

scala> val lst = List(2,3,1,5,7,6,4,9,8)
lst: List[Int] = List(2, 3, 1, 5, 7, 6, 4, 9, 8)scala> val lst2 = lst.sorted
lst2: List[Int] = List(1, 2, 3, 4, 5, 6, 7, 8, 9)scala> lst
res0: List[Int] = List(2, 3, 1, 5, 7, 6, 4, 9, 8)scala> lst2
res1: List[Int] = List(1, 2, 3, 4, 5, 6, 7, 8, 9)

2、分组
使用grouped(n),每n个为一组,最后不足的略过

scala> val lst3 = lst2.grouped(5)
lst3: Iterator[List[Int]] = non-empty iteratorscala> lst3
res2: Iterator[List[Int]] = non-empty iterator//将Iterator转为List
scala> val lst4 = lst3.toList
lst4: List[List[Int]] = List(List(1, 2, 3, 4, 5), List(6, 7, 8, 9))scala> lst4
res3: List[List[Int]] = List(List(1, 2, 3, 4, 5), List(6, 7, 8, 9))

3、平铺
flattern

scala> val lst5 = lst4.flatten
lst5: List[Int] = List(1, 2, 3, 4, 5, 6, 7, 8, 9)scala> lst5
res4: List[Int] = List(1, 2, 3, 4, 5, 6, 7, 8, 9)

wordcount程序编写
注:思路是跟mapreduce做单词统计一样而不是跟常规的java编写一样。

//对三个字符串中的单词做单词数量统计
scala> val lines = List("hadoop hdfs mr hive","hdfs hive hbase storm kafka","hive hbase storm kafka spark")
lines: List[String] = List(hadoop hdfs mr hive, hdfs hive hbase storm kafka, hive hbase storm kafka spark)//3个字符串中的单词均根据空格切割
scala> lines.map(_.split(" "))
res18: List[Array[String]] = List(Array(hadoop, hdfs, mr, hive), Array(hdfs, hive, hbase, storm, kafka), Array(hive, hbase, storm, kafka, spark))//平铺
scala> lines.map(_.split(" ")).flatten
res19: List[String] = List(hadoop, hdfs, mr, hive, hdfs, hive, hbase, storm, kafka, hive, hbase, storm, kafka, spark)scala> val words = lines.map(_.split(" ")).flatten
words: List[String] = List(hadoop, hdfs, mr, hive, hdfs, hive, hbase, storm, kafka, hive, hbase, storm, kafka, spark)//每个元素都写为(单词,1)的对偶格式
scala> words.map((_,1))
res20: List[(String, Int)] = List((hadoop,1), (hdfs,1), (mr,1), (hive,1), (hdfs,1), (hive,1), (hbase,1), (storm,1), (kafka,1), (hive,1), (hbase,1), (storm,1), (kafka,1), (spark,1))根据单词进行分组
scala> words.map((_,1)).groupBy(_._1)
res21: scala.collection.immutable.Map[String,List[(String, Int)]] = Map(storm -> List((storm,1), (storm,1)), kafka -> List((kafka,1), (kafka,1)), hadoop -> List((hadoop,1)), spark -> List((spark,1)), hive -> List((hive,1), (hive,1), (hive,1)), mr -> List((mr,1)), hbase -> List((hbase,1), (hbase,1)), hdfs -> List((hdfs,1), (hdfs,1)))scala> val grouped = words.map((_,1)).groupBy(_._1)
grouped: scala.collection.immutable.Map[String,List[(String, Int)]] = Map(storm -> List((storm,1), (storm,1)), kafka -> List((kafka,1), (kafka,1)), hadoop -> List((hadoop,1)), spark -> List((spark,1)), hive -> List((hive,1), (hive,1), (hive,1)), mr -> List((mr,1)), hbase -> List((hbase,1), (hbase,1)), hdfs -> List((hdfs,1), (hdfs,1)))scala> grouped.map(t => (t._1,t._2.size))
res22: scala.collection.immutable.Map[String,Int] = Map(storm -> 2, kafka -> 2, hadoop -> 1, spark -> 1, hive -> 3, mr -> 1, hbase -> 2, hdfs -> 2)//单词统计和排序,但是必须转List才能使用sortBy
scala> grouped.map(t => (t._1,t._2.size)).sortedBy(_._2)
<console>:14: error: value sortedBy is not a member of scala.collection.immutable.Map[String,Int]grouped.map(t => (t._1,t._2.size)).sortedBy(_._2)^scala> grouped.map(t => (t._1,t._2.size)).toList
res25: List[(String, Int)] = List((storm,2), (kafka,2), (hadoop,1), (spark,1), (hive,3), (mr,1), (hbase,2), (hdfs,2))scala> grouped.map(t => (t._1,t._2.size)).toList.sortBy(_._2)
res26: List[(String, Int)] = List((hadoop,1), (spark,1), (mr,1), (storm,2), (kafka,2), (hbase,2), (hdfs,2), (hive,3))scala> val result = grouped.map(t => (t._1,t._2.size)).toList.sortBy(_._2)
result: List[(String, Int)] = List((hadoop,1), (spark,1), (mr,1), (storm,2), (kafka,2), (hbase,2), (hdfs,2), (hive,3))

并行计算的一些方法

scala> val a = Array(1,2,3,4,5,6)
a: Array[Int] = Array(1, 2, 3, 4, 5, 6)scala> a.sum
res41: Int = 21scala> a.reduce(_+_)  //reduce调的是reduceLeft,从左往右操作
res42: Int = 21scala> a.reduce(_-_)
res43: Int = -19scala> a.par //转换为并行化集合
res44: scala.collection.parallel.mutable.ParArray[Int] = ParArray(1, 2, 3, 4, 5, 6)scala> a.par.reduce(_+_)//会将集合切分为好几块然后并行计算最后汇总
res45: Int = 21scala> a.fold(10)(_+_)  //先给初始值10,加上a中所有值之和。fold是并行计算,第一个_表示初始值或者累加过后的结果,foldLeft或者foldRight则不是并行计算了
res46: Int = 31scala> a.par.fold(10)(_+_) //并行化之后可能就不一样了,每份都要+10
res47: Int = 51scala> a.par.fold(0)(_+_)
res49: Int = 21
  相关解决方案