我正在使用RabbitMQ的循环功能在多个消费者之间分发消息,但一次只有一个消息接收实际消息.
我的问题是我的消息代表任务,我希望我的消费者有本地会话(状态).我事先知道哪些消息属于哪个会话,但我不知道使用我指定的算法将RabbitMQ调度给消费者的最佳方法是什么(或者有办法吗?).
我不想编写自己的编排服务,因为它将成为瓶颈,我不希望我的生产者知道哪个消费者会接收他们的消息,因为我将失去使用Rabbit的解耦.
有没有办法让RabbitMQ根据预定义的算法/规则而不是循环法向消费者发送消息?
澄清:我使用了几种用不同语言编写的微服务,每项服务都有自己的工作.我使用protobuf消息在他们之间进行通信.我给每条新消息一个UUID
.如果消费者收到消息,它可以从消息中创建响应消息(这可能不是正确的术语,因为生产者和消费者是分离的,他们彼此不了解)并且这UUID
被复制到新消息中.这形成了一个数据转换管道,这个"过程"由UUID
(processId)标识.我的问题是,它是可能的,我有多个工作的消费者,我需要一个工人粘到UUID
,如果它之前已经看到过它.我有这个需要,因为
每个进程可能都有本地状态
完成该过程后,我想清理本地状态
微服务可能会收到同一进程的多条消息,我需要区分哪条消息属于哪个进程
由于RabbitMQ使用循环法在工作人员之间分配任务,因此无法强制我的进程粘在工作者身上.我有几点需要注意:
生产者与消费者脱钩,因此直接消息传递不是一种选择
工作者数量不是恒定的(有一个负载均衡器可能会启动一个工人的新实例)
如果有一个解决方法,不涉及更改循环算法,并没有打破我的约束,它也没关系!
如果您不想使用业务流程服务,可以尝试使用这样的拓扑:
为简单起见,我假设您processId
被用作路由密钥(在现实世界中,您可能希望将其存储在标头中并使用header
交换).
传入的Exchange(类型:直接)将接受传入的消息,其alternative-exchange
属性设置为指向No Session Exchange
(扇出).
以下是RabbitMQ文档在"替代交易所"中所说的内容:
有时候希望让客户端处理交换无法路由的消息(即,因为没有绑定队列我们没有匹配的绑定).
典型的例子是
检测客户端何时意外或恶意发布无法路由的消息
"或者"路由语义,其中一些消息是专门处理的,其余的是通用处理程序
RabbitMQ的备用交换("AE")功能解决了这些用例.
(我们对or else
这里的用例特别感兴趣)
每个使用者都将创建自己的队列并将其绑定到Incoming Exchange,使用processId(s)
目前为止知道的会话作为绑定的路由键.
这样它只会获得它感兴趣的会话的消息.
此外,所有使用者都将绑定到共享的No Session Queue.
如果有一条以前未知的消息processId
进入,则对于在Incoming Exchange中注册的消息将没有特定的绑定,因此它将被重新路由到No Session Exchange => No Session Queue并被分派给其中一个消费者通常的(循环)方式.
然后,消费者将使用Incoming Exchange为其注册新的绑定(即启动新的"会话"),以便随后将获得所有后续消息processId
.
一旦"会话"结束,它将不得不删除相应的绑定(即关闭"会话").