问题描述
我正在尝试在Akka actor系统中实现基于 ,根据 , 是必经之路。 阅读其官方文档后,我仍然对如何使用此内置哈希路由器感到困惑。 我认为这是因为路由器本身是基于哈希/密钥的,并且Akka文档作者选择使用的示例是一个场景,该场景涉及基于键值的缓存…因此我无法确定缓存使用了哪些密钥,哪些缓存了由路由器使用!
让我们举一个简单的例子。 说我们有以下消息:
interface Notification {
// Doesn’t matter what’s here.
}
// Will eventually be emailed to someone.
class EmailNotification implements Notification {
// Doesn’t matter what’s here.
}
// Will eventually be sent to some XMPP client and on to a chatroom somewhere.
class ChatOpsNotifications implements Notification {
// Doesn’t matter what’s here.
}
等。理论上,我们可能有20个Notification
impls。
我希望能够在运行时将Notification
发送给演员/路由器,并使该路由器将其路由到正确的NotificationPubisher
:
interface NotificationPublisher<NOTIFICATION implements Notification> {
void send(NOTIFICATION notification)
}
class EmailNotificationPublisher extends UntypedActor implements NotificationPubisher<EmailNotification> {
@Override
void onReceive(Object message) {
if(message instanceof EmailNotification) {
send(message as EmailNotification)
}
}
@Override
void send(EmailNotification notification) {
// Use Java Mail, etc.
}
}
class ChatOpsNotificationPublisher extends UntypedActor implements NotificationPubisher<ChatOpsNotification> {
@Override
void onReceive(Object message) {
if(message instanceof ChatOpsNotification) {
send(message as ChatOpsNotification)
}
}
@Override
void send(ChatOpsNotification notification) {
// Use XMPP/Jabber client, etc.
}
}
现在,我可以自己手动进行路由:
class ReinventingTheWheelRouter extends UntypedActor {
// Inject these via constructor
ActorRef emailNotificationPublisher
ActorRef chatOpsNotificationPublisher
// ...20 more publishers, etc.
@Override
void onReceive(Object message) {
ActorRef publisher
if(message instanceof EmailNotification) {
publisher = emailNotificationPublisher
} else if(message instanceof ChatOpsNotification) {
publisher = chatOpsNotificationPublisher
} else if(...) { ... } // 20 more publishers, etc.
publisher.tell(message, self)
}
}
或者,我可以使用来定义基于Camel的路由器,然后将Notifications
发送到Camel路由器,但是Akka Aready似乎具有此内置解决方案,那么为什么不使用它呢?
我只是无法弄清楚如何将这些Akka文档中的Cache
示例转换为此处的Notification
示例。
ConsistentHashingRouter
“密钥”的目的是什么?
该代码将如何工作?
当然,如果有任何答案可以帮助我解决此问题,我将不胜感激,但如果有可能,我将非常希望使用基于Java的代码片段。 对我来说,Scala看起来像象形文字。
我同意比ConsistentHashingRouter
更合适。
阅读有关自定义路由器的文档后,看来我会:
-
创建一个
GroupBase
impl并直接向其发送消息(notificationGroup.tell(notification, self)
); 然后 -
GroupBase
隐含,例如,NotificationGroup
将提供一个Router
实例,该实例被我的自定义RoutingLogic
注入 -
NotificationGroup
收到消息时,将执行我的自定义RoutingLogic#select
方法,该方法确定将消息发送至的Routee
(我Routee
是某种actor?)。
如果这是正确的( 如果我错了 , 请纠正我 ),那么路由选择魔术就在这里发生:
class MessageBasedRoutingLogic implements RoutingLogic {
@Override
Routee select(Object message, IndexedSeq<Routee> candidates) {
// How can I query the Routee interface and deterine whether the message at-hand is in fact
// appropriate to be routed to the candidate?
//
// For instance I'd like to say "If message is an instance of
// an EmailNotification, send it to EmailNotificationPublisher."
//
// How do I do this here?!?
if(message instanceof EmailNotification) {
// Need to find the candidate/Routee that is
// the EmailNotificationPublisher, but how?!?
}
}
}
但是正如您所看到的,我在实现上还存在一些障碍。
Routee
界面并没有真正给我任何我可以用来确定特定Routee
(候选人)是否适合手头消息的信息。
所以我问:(1)如何将消息映射到Routees
(有效地执行路由选择/逻辑)?
(2)首先如何添加发布者作为路线?
并且(3)我的NotificationPublisher
是否仍然需要扩展UntypedActor
或现在应该实现Routee
?
1楼
这是Scala中的一个简单的小型A / B路由器。 我希望这对您有所帮助,即使您想要基于Java的答案。 首先是路由逻辑:
class ABRoutingLogic(a:ActorRef, b:ActorRef) extends RoutingLogic{
val aRoutee = ActorRefRoutee(a)
val bRoutee = ActorRefRoutee(b)
def select(msg:Any, routees:immutable.IndexedSeq[Routee]):Routee = {
msg match{
case "A" => aRoutee
case _ => bRoutee
}
}
}
这里的关键是我要在构造函数中传递我的a
和b
actor引用,然后这些是我要在select
方法中路由的引用。
然后,为此逻辑创建一个Group
:
case class ABRoutingGroup(a:ActorRef, b:ActorRef) extends Group {
val paths = List(a.path.toString, b.path.toString)
override def createRouter(system: ActorSystem): Router =
new Router(new ABRoutingLogic(a, b))
val routerDispatcher: String = Dispatchers.DefaultDispatcherId
}
同样,在这里,我通过构造函数使要路由的actor可用。
现在是一个简单的actor类来充当a
和b
:
class PrintingActor(letter:String) extends Actor{
def receive = {
case msg => println(s"I am $letter and I received letter $msg")
}
}
我将为此创建两个实例,每个实例具有不同的字母分配,以便我们可以验证正确的对象是否按照路由逻辑获得了正确的消息。 最后,一些测试代码:
object RoutingTest extends App{
val system = ActorSystem()
val a = system.actorOf(Props(classOf[PrintingActor], "A"))
val b = system.actorOf(Props(classOf[PrintingActor], "B"))
val router = system.actorOf(Props.empty.withRouter(ABRoutingGroup(a,b)))
router ! "A"
router ! "B"
}
如果运行此命令,则会看到:
I am A and I received letter A
I am B and I received letter B
这是一个非常简单的示例,但它显示了一种执行您要执行的操作的方法。 我希望您可以将此代码桥接到Java中并用它来解决您的问题。