当前位置: 代码迷 >> java >> 在Akka中实现基于内容的路由器模式
  详细解决方案

在Akka中实现基于内容的路由器模式

热度:74   发布时间:2023-07-25 20:07:19.0

我正在尝试在Akka actor系统中实现基于 ,根据 , 是必经之路。 阅读其官方文档后,我仍然对如何使用此内置哈希路由器感到困惑。 我认为这是因为路由器本身是基于哈希/密钥的,并且Akka文档作者选择使用的示例是一个场景,该场景涉及基于键值的缓存…因此我无法确定缓存使用了哪些密钥,哪些缓存了由路由器使用!

让我们举一个简单的例子。 说我们有以下消息:

interface Notification {
    // Doesn’t matter what’s here.
}

// Will eventually be emailed to someone.
class EmailNotification implements Notification {
    // Doesn’t matter what’s here.
}

// Will eventually be sent to some XMPP client and on to a chatroom somewhere.
class ChatOpsNotifications implements Notification {
    // Doesn’t matter what’s here.
}

等。理论上,我们可能有20个Notification impls。 我希望能够在运行时将Notification发送给演员/路由器,并使该路由器将其路由到正确的NotificationPubisher

interface NotificationPublisher<NOTIFICATION implements Notification> {
    void send(NOTIFICATION notification)
}

class EmailNotificationPublisher extends UntypedActor implements NotificationPubisher<EmailNotification> {
    @Override
    void onReceive(Object message) {
        if(message instanceof EmailNotification) {
            send(message as EmailNotification)
        }
    }

    @Override
    void send(EmailNotification notification) {
        // Use Java Mail, etc.
    }
}

class ChatOpsNotificationPublisher extends UntypedActor implements NotificationPubisher<ChatOpsNotification> {
    @Override
    void onReceive(Object message) {
        if(message instanceof ChatOpsNotification) {
            send(message as ChatOpsNotification)
        }
    }

    @Override
    void send(ChatOpsNotification notification) {
        // Use XMPP/Jabber client, etc.
    }
}

现在,我可以自己手动进行路由:

class ReinventingTheWheelRouter extends UntypedActor {
    // Inject these via constructor
    ActorRef emailNotificationPublisher
    ActorRef chatOpsNotificationPublisher
    // ...20 more publishers, etc.

    @Override
    void onReceive(Object message) {
        ActorRef publisher
        if(message instanceof EmailNotification) {
            publisher = emailNotificationPublisher
        } else if(message instanceof ChatOpsNotification) {
            publisher = chatOpsNotificationPublisher
        } else if(...) { ... } // 20 more publishers, etc.

        publisher.tell(message, self)
    }
}

或者,我可以使用来定义基于Camel的路由器,然后将Notifications发送到Camel路由器,但是Akka Aready似乎具有此内置解决方案,那么为什么不使用它呢? 我只是无法弄清楚如何将这些Akka文档中的Cache示例转换为此处的Notification示例。 ConsistentHashingRouter “密钥”的目的是什么? 该代码将如何工作?

当然,如果有任何答案可以帮助我解决此问题,我将不胜感激,但如果有可能,我将非常希望使用基于Java的代码片段。 对我来说,Scala看起来像象形文字。


我同意比ConsistentHashingRouter更合适。 阅读有关自定义路由器的文档后,看来我会:

  1. 创建一个GroupBase impl并直接向其发送消息( notificationGroup.tell(notification, self) ); 然后
  2. GroupBase隐含,例如, NotificationGroup将提供一个Router实例,该实例被我的自定义RoutingLogic注入
  3. NotificationGroup收到消息时,将执行我的自定义RoutingLogic#select方法,该方法确定将消息发送至的Routee (我Routee是某种actor?)。

如果这是正确的( 如果我错了请纠正我 ),那么路由选择魔术就在这里发生:

class MessageBasedRoutingLogic implements RoutingLogic {
    @Override
    Routee select(Object message, IndexedSeq<Routee> candidates) {
        // How can I query the Routee interface and deterine whether the message at-hand is in fact
        // appropriate to be routed to the candidate?
        //
        // For instance I'd like to say "If message is an instance of
        // an EmailNotification, send it to EmailNotificationPublisher."
        //
        // How do I do this here?!?
        if(message instanceof EmailNotification) {
            // Need to find the candidate/Routee that is
            // the EmailNotificationPublisher, but how?!?
        }
    }
}

但是正如您所看到的,我在实现上还存在一些障碍。 Routee界面并没有真正给我任何我可以用来确定特定Routee (候选人)是否适合手头消息的信息。

所以我问:(1)如何将消息映射到Routees (有效地执行路由选择/逻辑)? (2)首先如何添加发布者作为路线? 并且(3)我的NotificationPublisher是否仍然需要扩展UntypedActor或现在应该实现Routee

这是Scala中的一个简单的小型A / B路由器。 我希望这对您有所帮助,即使您想要基于Java的答案。 首先是路由逻辑:

class ABRoutingLogic(a:ActorRef, b:ActorRef) extends RoutingLogic{
  val aRoutee = ActorRefRoutee(a)
  val bRoutee = ActorRefRoutee(b)

  def select(msg:Any, routees:immutable.IndexedSeq[Routee]):Routee = {
    msg match{
      case "A" => aRoutee
      case _ => bRoutee
    }
  }
}

这里的关键是我要在构造函数中传递我的ab actor引用,然后这些是我要在select方法中路由的引用。 然后,为此逻辑创建一个Group

case class ABRoutingGroup(a:ActorRef, b:ActorRef) extends Group { 
  val paths = List(a.path.toString, b.path.toString)

  override def createRouter(system: ActorSystem): Router =
    new Router(new ABRoutingLogic(a, b))

  val routerDispatcher: String = Dispatchers.DefaultDispatcherId
}

同样,在这里,我通过构造函数使要路由的actor可用。 现在是一个简单的actor类来充当ab

class PrintingActor(letter:String) extends Actor{
  def receive = {
    case msg => println(s"I am $letter and I received letter $msg") 
  }
}

我将为此创建两个实例,每个实例具有不同的字母分配,以便我们可以验证正确的对象是否按照路由逻辑获得了正确的消息。 最后,一些测试代码:

object RoutingTest extends App{
  val system = ActorSystem()
  val a = system.actorOf(Props(classOf[PrintingActor], "A"))
  val b = system.actorOf(Props(classOf[PrintingActor], "B"))
  val router = system.actorOf(Props.empty.withRouter(ABRoutingGroup(a,b)))

  router ! "A"
  router ! "B"

}

如果运行此命令,则会看到:

I am A and I received letter A
I am B and I received letter B

这是一个非常简单的示例,但它显示了一种执行您要执行的操作的方法。 我希望您可以将此代码桥接到Java中并用它来解决您的问题。