1 Flink 中的Window 概述
streaming 流式计算是一种被设计用于处理无限数据集的数据处理引擎,而无限数据集是指一种不断增长的本质上无限的数据集,而 window 是一种切割无限数据为有限块进行处理的手段。
2 Window 可以分成两类:
? CountWindow:根据窗口中相同 key 元素的数量来触发执行,执行时只计算元素数量达到窗口大小的 key 对应的结果。
? TimeWindow:将指定时间范围内的所有数据组成一个 window,一次对一个window 里面的所有数据进行计算
3 TimeWindow 分为三类:
滚动窗口(Tumbing):依据固定的窗口长度对数据进行切片,一个数据可以被统计一次,所有的数据只能落在一个窗口里面。适用场景:适合做 BI 统计等(做每个时间段的聚合计算)
滑动窗口(Sliding):固定的窗口长度和滑动间隔,一个数据可以被统计多次,元素会被分配到多个窗口中。 适用场景:对最近一个时间段内的统计(求某接口最近 5min 的失败率来决定是否要报警)。
会话窗口(Session):不会有重叠和固定的开始时间和结束时间的情况,当它在一个固定的时间周期内不再收到元素,即非活动间隔产生,那这个窗口就会关闭
4 window function
定义对窗口收集数据的计算操作
增量聚合函数(incremental aggregation functions)
每条数据到来就进行计算,保持一个简单的状态。典型的增量聚合函数有ReduceFunction, AggregateFunction。
全窗口函数(full window functions)
先把窗口所有数据收集起来,等到计算的时候会遍历所有数据。ProcessWindowFunction 就是一个全窗口函数。
5 代码
滚动窗口
package chapter3import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment, WindowedStream}
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
object Window_Tumbling_Demo {def main(args: Array[String]): Unit = {//创建程序入口val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment//接收数据val data: DataStream[String] = env.socketTextStream("hadoop001",9999)//切分数据val spliFile: DataStream[String] = data.flatMap(_.split(" "))//每个单词记为1次val wordAndOne: DataStream[(String, Int)] = spliFile.map((_,1))//分流val keyed: KeyedStream[(String, Int), Tuple] = wordAndOne.keyBy(0)//指定时间窗口val window: WindowedStream[(String, Int), Tuple, TimeWindow] = keyed.timeWindow(Time.seconds(5))//对window里的数据进行聚合val wordAndCount: DataStream[(String, Int)] = window.sum(1)//打印输出wordAndCount.print()//调用execute方法env.execute()}}
滑动窗口
package chapter3
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment, WindowedStream}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
object Window_Sliding_Demo {def main(args: Array[String]): Unit = {//创建程序入口val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment//接收数据val data: DataStream[String] = env.socketTextStream("hadoop01",9999)//切分val spliFile: DataStream[String] = data.flatMap(_.split(" "))//记为1次val wordAndOne: DataStream[(String, Int)] = spliFile.map((_,1))//分流val keyed: KeyedStream[(String, Int), Tuple] = wordAndOne.keyBy(0)//指定窗口val window: WindowedStream[(String, Int), Tuple, TimeWindow] = keyed.timeWindow(Time.seconds(15),Time.seconds(5))//聚合val wordAndCount: DataStream[(String, Int)] = window.sum(1)//打印输出wordAndCount.print()//调用execute方法env.execute()}}
滚动窗口
package chapter3
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment, WindowedStream}
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow
object CountWindow_Tumbling_Demo {def main(args: Array[String]): Unit = {//创建程序入口val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment//接收数据val data: DataStream[String] = env.socketTextStream("hadoop001",9999)//切分val spliFile: DataStream[String] = data.flatMap(_.split(" "))//每个单词记为1次val wordAndOne: DataStream[(String, Int)] = spliFile.map((_,1))//分流val keyed: KeyedStream[(String, Int), Tuple] = wordAndOne.keyBy(0)//指定窗口val window: WindowedStream[(String, Int), Tuple, GlobalWindow] = keyed.countWindow(3)//聚合val wordAndCount: DataStream[(String, Int)] = window.sum(1)//打印输出wordAndCount.print()//调用execute方法env.execute()}
滑动窗口
package chapter3
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment, WindowedStream}
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow
object CountWindow_Sliding_Demo {def main(args: Array[String]): Unit = {//创建程序入口val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment//接收数据val data: DataStream[String] = env.socketTextStream("node01",9999)//切分val spliFile: DataStream[String] = data.flatMap(_.split(" "))//每个单词记为1val wordAndOne: DataStream[(String, Int)] = spliFile.map((_,1))//分流val keyed: KeyedStream[(String, Int), Tuple] = wordAndOne.keyBy(0)//指定窗口val window: WindowedStream[(String, Int), Tuple, GlobalWindow] = keyed.countWindow(5,2)//聚合val wordAndCount: DataStream[(String, Int)] = window.sum(1)//打印输出wordAndCount.print()//调用execute方法env.execute()}}
增量聚合函数
package chapter3
import org.apache.flink.api.common.functions.ReduceFunction
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment, WindowedStream}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
object Reduce_Demo {def main(args: Array[String]): Unit = {//创建程序入口val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment//接收数据val data: DataStream[String] = env.socketTextStream("node01",8888)//切分val spliFile: DataStream[String] = data.flatMap(_.split(" "))//每个单词记为1次val wordAndOne: DataStream[(String, Int)] = spliFile.map((_,1))//分流val keyed: KeyedStream[(String, Int), Tuple] = wordAndOne.keyBy(0)//指定时间窗口val window: WindowedStream[(String, Int), Tuple, TimeWindow] = keyed.timeWindow(Time.seconds(5))//使用增量聚合函数进行聚合val wordAndCount: DataStream[(String, Int)] = window.reduce(new ReduceFunction[(String, Int)] {override def reduce(value1: (String, Int), value2: (String, Int)): (String, Int) = {(value1._1, value1._2 + value2._2)}})//打印输出wordAndCount.print()//调用execute方法env.execute()}}
全窗口函数
package chapter3
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment, WindowedStream}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
object ProcessWindow_Demo {def main(args: Array[String]): Unit = {//创建程序入口val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment//接收数据val data: DataStream[String] = env.socketTextStream("hadoop001",8888)//切分val spliFile: DataStream[String] = data.flatMap(_.split(" "))//每个单词记为1次val wordAndOne: DataStream[(String, Int)] = spliFile.map((_,1))//分流val keyed = wordAndOne.keyBy(_._1)//指定窗口val window = keyed.timeWindow(Time.seconds(5))//调用全窗口函数val wordAndCount: DataStream[(String, Int)] = window.process(new ProcessWindowFunction[(String, Int), (String, Int), String, TimeWindow] {override def process(key: String, context: Context, elements: Iterable[(String, Int)], out: Collector[(String, Int)]): Unit = {var sum = 0for (i <- elements) {sum += i._2}//输出之前收集结果out.collect(key, sum)}})//打印输出wordAndCount.print()//调用execute方法env.execute()}}