我们在使用的spark 的在做数据统计的时,实现的数据的排序的过程中,使用的在RDD提供的的算子的,往往的不能我们的开发的条件,需要我们自行的定义的排序的规则。在这里暂时提供三种的三种的自动的排序
- 使用的默认的sortBy 的排序规则
直接利用元组来封装排序的条件
package org.yonggganimport org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}/*** spark 排序**/
object SortedDemo01 {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("app").setMaster("local")val sc = new SparkContext(conf)val data = Array("iphone 5788.12 199","ak-47 3588 100","awt 2788.12 19","m16 2788.12 100")val rdd = sc.makeRDD(data)val spiltGoods = rdd.map(f => {val spilt = f.split(" ")(spilt(0), spilt(1).toDouble, spilt(2).toInt)})/*** 使用 sort sortBy排序规则* 价格降序 库存升序**/val sorted: RDD[(String, Double, Int)] = spiltGoods.sortBy(f => (- f._2,f._3))sorted.foreach(println)}}
这里我们是使用sortBy 进行排序 排序的规则是 先按照的第一个字段 降序 再按按照第二字段升序排列
2. 自定定义的排序对象或者是案例类 实现的Ordered或者把是Ordering
package org.yonggganimport org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}/*** spark 排序**/
object SortedDemo01 {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("app").setMaster("local")val sc = new SparkContext(conf)val data = Array("iphone 5788.12 199", "ak-47 3588 100", "awt 2788.12 19", "m16 2788.12 100")val rdd = sc.makeRDD(data)val spiltGoods = rdd.map(f => {val spilt = f.split(" ")
// (spilt(0), spilt(1).toDouble, spilt(2).toInt)new Goods(spilt(0),spilt(1).toDouble,spilt(2).toInt)})/*** 使用 sort sortBy排序规则** 价格降序 库存升序**/val sorted: RDD[Goods] = spiltGoods.sortBy(f => f)sorted.foreach(println)}}class Goods(val name: String, val price: Double, val stock: Int) extends Ordered[Goods] {override def compare(that: Goods): Int = {val r1 = this.price.compareTo(that.price)if (r1 == 0) {return this.stock compareTo (that.stock)}return -r1}override def toString = s"Goods(name=$name, price=$price, stock=$stock)"
}
3. 使用隐式转换 (函数或者方法即可)
package org.yonggganimport org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}/*** spark 排序**/
object SortedDemo01 {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("app").setMaster("local")val sc = new SparkContext(conf)val data = Array("iphone 5788.12 199", "ak-47 3588 100", "awt 2788.12 19", "m16 2788.12 100")val rdd = sc.makeRDD(data)val spiltGoods = rdd.map(f => {val spilt = f.split(" ")// (spilt(0), spilt(1).toDouble, spilt(2).toInt)new Goods(spilt(0), spilt(1).toDouble, spilt(2).toInt)})/*** 使用 sort sortBy排序规则** 使用隐式转换 加上混入** 使用函数的方式, 当然也可以使用的方法*/implicit val goods2OrderedGoods: (Goods) => (Ordered[Goods]) = (g) => {new Goods(g.name, g.price, g.stock) with Ordered[Goods] {override def compare(that: Goods): Int = {val r1 = this.price.compareTo(that.price)if (r1 == 0) {return this.stock compareTo (that.stock)}return -r1}}}val sorted: RDD[Goods] = spiltGoods.sortBy(f => f)sorted.foreach(println)}}class Goods(val name: String, val price: Double, val stock: Int) {override def toString = s"Goods(name=$name, price=$price, stock=$stock)"
}
实现的排序的方式 有很多,需要的根据自己的特定的业务的选择适合自己的