当前位置:  开发笔记 > 编程语言 > 正文

我可以使用RabbitMQ使用自定义算法而不是循环法调度消息吗?

如何解决《我可以使用RabbitMQ使用自定义算法而不是循环法调度消息吗?》经验,为你挑选了1个好方法。

我正在使用RabbitMQ的循环功能在多个消费者之间分发消息,但一次只有一个消息接收实际消息.

我的问题是我的消息代表任务,我希望我的消费者有本地会话(状态).我事先知道哪些消息属于哪个会话,但我不知道使用我指定的算法将RabbitMQ调度给消费者的最佳方法是什么(或者有办法吗?).

我不想编写自己的编排服务,因为它将成为瓶颈,我不希望我的生产者知道哪个消费者会接收他们的消息,因为我将失去使用Rabbit的解耦.

有没有办法让RabbitMQ根据预定义的算法/规则而不是循环法向消费者发送消息?

澄清:我使用了几种用不同语言编写的微服务,每项服务都有自己的工作.我使用protobuf消息在他们之间进行通信.我给每条新消息一个UUID.如果消费者收到消息,它可以从消息中创建响应消息(这可能不是正确的术语,因为生产者和消费者是分离的,他们彼此不了解)并且这UUID被复制到新消息中.这形成了一个数据转换管道,这个"过程"UUID(processId)标识.我的问题是,它是可能的,我有多个工作的消费者,我需要一个工人UUID,如果它之前已经看到过它.我有这个需要,因为

    每个进程可能都有本地状态

    完成该过程后,我想清理本地状态

    微服务可能会收到同一进程的多条消息,我需要区分哪条消息属于哪个进程

由于RabbitMQ使用循环法在工作人员之间分配任务,因此无法强制我的进程粘在工作者身上.我有几点需要注意:

生产者与消费者脱钩,因此直接消息传递不是一种选择

工作者数量不是恒定的(有一个负载均衡器可能会启动一个工人的新实例)

如果有一个解决方法,不涉及更改循环算法,并没有打破我的约束,它也没关系!



1> zeppelin..:

如果您不想使用业务流程服务,可以尝试使用这样的拓扑:

在此输入图像描述

为简单起见,我假设您processId被用作路由密钥(在现实世界中,您可能希望将其存储在标头中并使用header交换).

传入的Exchange(类型:直接)将接受传入的消息,其alternative-exchange属性设置为指向No Session Exchange(扇出).

以下是RabbitMQ文档在"替代交易所"中所说的内容:

有时候希望让客户端处理交换无法路由的消息(即,因为没有绑定队列我们没有匹配的绑定).

典型的例子是

检测客户端何时意外或恶意发布无法路由的消息

"或者"路由语义,其中一些消息是专门处理的,其余的是通用处理程序

RabbitMQ的备用交换("AE")功能解决了这些用例.

(我们对or else这里的用例特别感兴趣)

每个使用者都将创建自己的队列并将其绑定到Incoming Exchange,使用processId(s)目前为止知道的会话作为绑定的路由键.

这样它只会获得它感兴趣的会话的消息.

此外,所有使用者都将绑定到共享的No Session Queue.

如果有一条以前未知的消息processId进入,则对于在Incoming Exchange中注册的消息将没有特定的绑定,因此它将被重新路由到No Session Exchange => No Session Queue并被分派给其中一个消费者通常的(循环)方式.

然后,消费者将使用Incoming Exchange为其注册新的绑定(即启动新的"会话"),以便随后将获得所有后续消息processId.

一旦"会话"结束,它将不得不删除相应的绑定(即关闭"会话").


@Works的@AdamArold _yEd_图形编辑器:https://www.yworks.com/products/yed
推荐阅读
coco2冰冰
这个屌丝很懒,什么也没留下!
DevBox开发工具箱 | 专业的在线开发工具网站    京公网安备 11010802040832号  |  京ICP备19059560号-6
Copyright © 1998 - 2020 DevBox.CN. All Rights Reserved devBox.cn 开发工具箱 版权所有