当前位置: 代码迷 >> 综合 >> Scala zio-actors与akka-actor集成
  详细解决方案

Scala zio-actors与akka-actor集成

热度:60   发布时间:2023-09-23 10:05:39.0

zio-actors与akka-actor集成

zio-actors 与 akka-actor 是两种不同实现,分两种情况:

  • zio actor 发消息给 akka actor
  • akka actor 发消息给 zio actor

依赖

不包括 akka actor 和 zio-actors 依赖,只是集成所需的

"dev.zio" %% "zio-actors-akka-interop" % <VERSION>"

所需的导入如下:

import zio.actors.Actor.Stateful
import zio.actors.{
     ActorSystem, ActorRef, Context, Supervisor }
import zio.actors.akka.{
     AkkaTypedActor, AkkaTypedActorRefLocal }
import zio.{
     IO, Runtime }import akka.actor.typed
import akka.actor.typed.Behavior
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.Scheduler
import akka.util.Timeoutimport scala.concurrent.duration._

本章例子的样例类:

sealed trait TypedMessage[+_]
case class PingToZio(zioReplyToActor: ActorRef[ZioMessage], msg: String) extends TypedMessage[Unit]
case class PingFromZio(zioSenderActor: ActorRef[ZioMessage]) extends TypedMessage[Unit]sealed trait ZioMessage[+_]
case class PongFromAkka(msg: String) extends ZioMessage[Unit]
case class Ping(akkaActor: AkkaTypedActorRefLocal[TypedMessage]) extends ZioMessage[Unit]

基本的 actors 使用需要定义一个Stateful来描述 actor 的行为。然后通过监督方式、初始状态和提到的Stateful来完成 actor 的创建。

在 zio actor 与 akka actor 通信

zio actor Stateful 实现如下:

val handler = new Stateful[Any, String, ZioMessage] {
    override def receive[A](state: String, msg: ZioMessage[A], context: Context): IO[Throwable, (String, A)] =msg match {
                 case PongFromAkka(msg) => IO.succeed((msg, ())) // zio actor接收akka actor的消息case Ping(akkaActor) => // akkaActor的类型是AkkaTypedActorRefLocal,而不是 akka actor 的ActorReffor {
    self <- context.self[ZioMessage]_    <- akkaActor ! PingFromZio(self) // 把self带上用于收回复} yield (state, ())case _=> IO.fail(new Exception("fail"))}
}

在 akka actor 中 发送消息到 zio actor

akka actor,需要一个行为(behavior)来定义要处理的消息,在这种情况下向 zio actor 发送和接收消息:

object TestBehavior {
    lazy val zioRuntime = Runtime.defaultdef apply(): Behavior[TypedMessage[_]] =Behaviors.receiveMessage {
     message =>message match {
                      case PingToZio(zioReplyToActor, msgToZio) => // 在akka 中发消息,需要unsafeRun执行ZIO effectzioRuntime.unsafeRun(zioReplyToActor ! PongFromAkka(msgToZio)) case PingFromZio(zioSenderActor)          => zioRuntime.unsafeRun(zioSenderActor ! PongFromAkka("Pong from Akka"))}Behaviors.same}} 

主程序

我们已经准备好开始从 zio 向 akka 发送消息,或者通过fire-and-forget交互模式反过来,但首先我们需要用创建的 akka ActorRef(或ActorSystem)创建一个 ZIO 值,可以使用AkkaTypedActor.make

for {
    akkaSystem <- IO(typed.ActorSystem(TestBehavior(), "akkaSystem")) // akka actor 的 ActorSystemsystem     <- ActorSystem("zioSystem") // zio actor 的 ActorSystemakkaActor  <- AkkaTypedActor.make(akkaSystem) // 使用interop提供的AkkaTypedActor,对akka actor做一次包装zioActor   <- system.make("zioActor", Supervisor.none, "", handler) // 使用zio的ActorSystem创建zio actor_          <- akkaActor ! PingToZio(zioActor, "Ping from Akka")  // 发消息给akka actor,并带上zioActor,用于接收回复_          <- zioActor ! Ping(akkaActor) // 发消息给zio actor,并带上akkaActor,用于接收回复
} yield ()

zim 中应用

zim 不涉及到2种 actor 通信,websocket 使用的是 akka actor,而在定时任务处使用了 zio actor,实现一个基于 zio actor 的定时器如下:

object ScheduleStateful {
    val stateful: Stateful[Any, Unit, Command] = new Stateful[Any, Unit, Command] {
    override def receive[A](state: Unit, msg: Command[A], context: Context): UIO[(Unit, A)] = {
    val taskIO = msg match {
    case OnlineUserMessage(descr) =>WsService.getConnections.flatMap {
     i =>LogUtil.debug(s"${
      descr.getOrElse("receive")} Total online user => $i")}case _ => UIO.unit}// 这里返回的类型按照zio-actors官网的写法返回(Unit, A) idea会提示语法错误,目前还不知道是谁的问题,只能强制转换了taskIO.foldM(e => LogUtil.error(s"ScheduleStateful $e").as(() -> "".asInstanceOf[A]),_ => ZIO.succeed(() -> "".asInstanceOf[A]))}}
}

根据Stateful创建 actor

  lazy val scheduleActor: ZIO[Any, Throwable, ActorRef[protocol.Command]] =actorSystem.flatMap(_.make(Constants.SCHEDULE_JOB_ACTOR, zio.actors.Supervisor.none, (), ScheduleStateful.stateful)).provideLayer(Clock.live ++ InfrastructureConfiguration.live)

启动 actor,只需要像使用普通方法一样调用该方法即可:

  def scheduleTask: Task[Unit] = {
    val task = ZioActorSystemConfiguration.scheduleActor.flatMap(f => f ! OnlineUserMessage(Some("scheduleTask"))) repeat Schedule.secondOfMinute(0)// secondOfMinute类似于Cron的时间表,每分钟的指定秒数重复出现。此处为0秒task.foldM(e => LogUtil.error(s"error => $e").unit,_ => UIO.unit).provideLayer(Clock.live)}

zim 是一个web端即时通讯系统,使用scala2语言,基于zio、tapir、akka,scallikejdbc等库实现。

  相关解决方案