broadcast
1.官网解释
广播状态被引入以支持这样的用例:来自一个流的一些数据需要广播到所有下游任务,在那里它被本地存储,并用于处理另一个流上的所有传入元素。作为广播状态自然适合出现的一个例子,我们可以想象一个低吞吐量流,其中包含一组规则,我们希望根据来自另一个流的所有元素对这些规则进行评估。考虑到上述类型的用例,广播状态与其他操作符状态的区别在于:
(1)它是一个map格式
(2)它只对输入有广播流和无广播流的特定操作符可用
(3)这样的操作符可以具有具有不同名称的多个广播状态。
2.scala代码示例
import com.streamingkmeans.utils.EuclideanDistanceMeasure
import org.apache.flink.api.common.state.{BroadcastState, ListState, ListStateDescriptor, MapStateDescriptor, ReadOnlyBroadcastState}
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.api.scala._
import org.apache.flink.configuration.Configuration
import org.apache.flink.ml.math.DenseVector
import org.apache.flink.streaming.api.datastream.BroadcastStream
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector
/*** @Author: ch* @Date: 25/05/2020 2:55 PM* @Version 1.0* @Describe:*/
object MyTest {/*** 测试广播流* @param args*/def main(args: Array[String]): Unit = {// the port to connect tovar port = 0try {ParameterTool.fromArgs(args).getInt("port")} catch {case e: Exception => {port = 9000}}val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentval socketText: DataStream[String] = env.socketTextStream("127.0.0.1", port, '\n')val arr1 = DenseVector(Array[Double](1,1,1))val arr2 = DenseVector(Array[Double](-1,-1,-1))val arr = Array[DenseVector](arr1,arr2)val k = 2val centerDetail:Array[Center] = Array.fill(k)(null)for(i <- 0 to k-1){centerDetail.update(i,Center(i,arr(i),1))}
// centerDetail.foreach(println)val centers: DataStream[Center] = env.fromCollection(centerDetail)//广播状态的描述符,广播流只支持MapState的结构val broadcastStateDescritor = new MapStateDescriptor[Integer,Center]("centers",classOf[Integer],classOf[Center])//使用 广播状态的描述符 创建 广播流val centersBroadcast: BroadcastStream[Center] = centers.broadcast(broadcastStateDescritor)val result: DataStream[String] = socketText.connect(centersBroadcast).process(new UpdateCenter(k))result.print()env.execute()}
}
//定义广播处理函数,可以传递参数进行
class UpdateCenter(k:Int) extends BroadcastProcessFunction[String,Center,String]{//IN1, IN2, OUT。也就是非广播流类型,广播流类型,输出流类型//广播状态的描述符private lazy val broadcastStateDescritor = new MapStateDescriptor[Integer,Center]("centers",classOf[Integer],classOf[Center])//处理广播流元素,value是传进来的广播流元素,通过ctx可以获取可修改的广播状态override def processBroadcastElement(value: Center, ctx: BroadcastProcessFunction[String, Center, String]#Context, out: Collector[String]): Unit = {val centers: BroadcastState[Integer, Center] = ctx.getBroadcastState(broadcastStateDescritor)if(centers.contains(value.id)){centers.remove(value.id)}centers.put(value.id,value)//把广播流元素添加到广播状态中,状态会保存在本地内存中}//处理非广播流元素,value是传进来的非广播流元素,通过ctx只能获取只读的广播状态override def processElement(value: String, ctx: BroadcastProcessFunction[String, Center, String]#ReadOnlyContext, out: Collector[String]): Unit = {//读取广播状态val centers: ReadOnlyBroadcastState[Integer, Center] = ctx.getBroadcastState(broadcastStateDescritor)val centersArr: Array[Center] = Array.fill(k)(null)for(i <- 0 to k-1){val currCenter = centers.get(i)centersArr.update(currCenter.id,currCenter)}out.collect(centersArr.toString)//将需要的结果传出}
}