我发现后端消息处理中出现了常见的模式:
ServiceA生成大量消息.
ServiceB一次处理一条消息.
ServiceC发出对数据库或Web服务调用的调用,这可以通过批量调用获得显着的性能和可靠性.
在某些情况下,从ServiceA预批处理消息或在ServiceB中批量处理消息是不可行的,因此首选项将是单独处理所有消息,直到在ServiceC上进行最终调用.这需要在ServiceC调用之前进行批处理.
看似理想的情况是拥有一个NServiceBus处理程序签名,该签名可选择在批处理中传递消息,例如:
public void Handle(FooMessage[] messageBatch) { }
其中,在处理程序执行之前,messageBatch中的所有消息都不会被提交.
在NServiceBus中似乎没有本机支持.我可以在队列中一次处理消息并写入内存直到批量刷新.但在这种情况下,消息是在刷新之前提交的,如果进程崩溃,我们不会保留批处理中所有消息的传递保证.
所以问题是:由于某种原因我不会想到这是一个糟糕的模式吗?我知道知道何时刷新批次存在一个固有的问题,但似乎至少有一些传输实现缓冲了已经在引擎盖下的批处理中的消息,并且只是一次一个地交付.在这个级别进行批处理或者为定期刷新设置一个简单的超时似乎会起作用.
是否有解决方法或我缺少的首选模式?
Upfront/Disclaimer:我为特别软件公司工作,这是NServiceBus的制造商.我还写了学习NServiceBus.
在我为特工工作之前,我曾经发现自己处于你的确切状况.我有一种分析类型的情况,其中12个Web服务器通过MSMQ发送相同类型的命令以指示文章被查看.需要在数据库中跟踪这些计数,以便可以基于视图的数量生成"最流行的"列表.但是每个页面视图的插入都不能很好地执行,所以我介绍了服务总线.
插入器可以通过使用表值参数一次插入多达50-100的好处,但NServiceBus在事务中一次只给出一条消息.
在NServiceBus中,对多个消息进行操作的任何内容通常都需要使用Saga.(Saga基本上是一堆相关的消息处理程序,它们在处理每条消息之间保持一些存储状态.)
但是Saga必须将其数据存储在某个地方,这通常意味着数据库.那么让我们比较一下:
现在使用NServiceBus,50条消息将意味着50个数据库插入.
通过假设的批量接收,50条消息将意味着1个数据库批量插入.
使用Sagas,50条消息意味着50条Saga数据读取+ 50条Saga数据更新,然后是单个数据库批量插入.
因此,Saga使"持久性负载"更加糟糕.
当然,您可以选择使用Saga中的内存持久性.这将为您提供批处理而无需额外的持久性开销,但如果Saga端点崩溃,您可能会丢失部分批处理.因此,如果您不愿意丢失数据,那么这不是一种选择.
所以即使在几年前,我也想象过这样的事情:
// Not a real NServiceBus thing! Only exists in my imagination! public interface IHandleMessageBatches{ void Handle(TMessage[] messages); int MaxBatchSize { get; } }
我们的想法是,如果消息传输可以提前查看并看到许多消息可用,它可以开始接收MaxBatchSize并且您可以立即获取所有消息.当然,如果队列中只有一条消息,那么你将得到一个包含1条消息的数组.
几年前我和NServiceBus代码库坐了下来,以为我会尝试实现它.好吧,我失败了.当时,尽管MSMQ是唯一的传输(在NServiceBus V3中),但API的架构使得传输代码在队列中查看并一次拉出一条消息,从而将消息处理逻辑的内存中事件提升到如果没有大规模的改变,就不可能改变它.
更新版本中的代码更加模块化,这在很大程度上是因为现在支持多个消息传输.但是,仍然存在一次处理一条消息的假设.
进入V6的当前实现是在IPushMessages
界面中.在该Initialize
方法中,Core将a推Func
入到传输的实现中IPushMessages
.
或者用英语,"嘿运输,当你有消息可用时,执行此操作将其移交给Core,我们将从那里接收它."
简而言之,这是因为NServiceBus适用于一次可靠地处理一条消息.从更详细的角度来看,有很多理由说明批量接收会很困难:
当交易正在进行时,接收批处理需要处理该交易中的所有消息.如果交易增长太大,这很容易失控.
消息类型可以在队列中混合.毕竟,消息类型只是一个标题.没有办法说"给我一批T型消息" 如果您收到批次并且包含其他消息类型怎么办?
多个处理程序可以在相同的消息类型上运行.例如,如果消息SuperMessage
继承BaseMessage
,则两种类型的处理程序可以在同一消息上运行.在考虑一批消息时,多个处理程序和多态消息处理程序的这种可能性变得非常复杂.
关于多态消息的更多信息,如果批处理是什么,Handle(BaseMessage[] batch)
但进来的消息是所有继承自的不同超类型BaseMessage
?
还有很多其他事情,我敢肯定,我甚至都没有想过.
总而言之,将NServiceBus更改为接受批次将需要针对批次优化整个管道.单个消息(当前规范)将是一个专门的批处理,其中数组大小为1.
从本质上讲,这对于它所提供的有限商业价值的变化来说风险太大了.
我发现每个消息单个插入并不像我想象的那么昂贵.有害的是,多个Web服务器上的多个线程尝试一次写入数据库并被卡在RPC操作中,直到完成为止.
当这些操作被序列化为一个队列,并且有限的,设定数量的线程处理这些消息并以数据库可以处理的速率执行数据库插入时,事情往往在大多数情况下都非常顺利地运行.
另外,请仔细考虑您在数据库中执行的操作.现有行的更新比插入更便宜.在我的情况下,我真的只关心计数,并且不需要每个页面视图的记录.因此,根据内容ID和5分钟时间窗口更新记录更便宜,并更新该记录的读取计数,而不是每次读取插入记录并迫使自己进入大量的聚合查询.
如果这绝对不起作用,您需要考虑可以在可靠性方面做出哪些权衡.您可以使用具有内存持久性的Saga,但随后您可以(并且最有可能最终)丢失整个批次.根据您的使用情况,这很可能是可以接受的.
您还可以使用消息处理程序写入Redis,这比数据库便宜,然后让Saga更像调度程序,将批量数据迁移到数据库.你可以用Kafka或其他一些技术做类似的事情.在这些情况下,您可以自行决定确保所需的可靠性,并设置可以实现这一目标的工具.