我有一个场景,其中我需要一个nservicebus消息处理程序,以防止同一个saga的多个消息同时被执行.
为了论证的处理程序做这样的事情(在这个例子中过于简化的方式(
信息:
public class MyMessage : IMessage { public int OrderId {get;set;} public int NewQuantityLevel {get;set;} }
佐贺:
public void Handle(MyMessage message) { // call remote service to get current order quantity // do some logic and update remote service with difference between original and new quantity Bus.Send(new MyOtherMessage()) }
现在我是我的流程我可以随时收到2条或更多这些消息,我不想让它们检索可能已经在其他地方更新或修改过程中的订单数量.
我考虑了一些解决方案:
获取订单的互斥锁(目前我们只在一台机器上运行一个工人实例,但将来有可能有多个,在这种情况下我们可能会使用redis锁或类似的东西)
在服务中使用sql锁定对行/数据进行序列化锁定(不确定这是否会起作用)
这些都不是真的看起来最优,而且确实感觉我正在反对这个框架
传奇是锁.
正如@Hadi所提到的,NServiceBus将使用乐观并发来确保一次只有一条消息可以更新一个saga实例.
不是直接在saga中进行更新,而是存储您正在进行更新的事实,并发送消息以对远程服务调用执行到另一个端点中的单独消息处理程序.将事实存储在传奇上并发送消息以执行它将完成或不完成.如果两条消息同时尝试执行此操作,则只有一条消息将成功完成.另一条消息将获得并发异常,回滚到队列并最终重试.
那时它将看到已经发生了数量更新操作.然后,您可以丢弃第二条消息或在传奇上存储一些状态,以确保第一次完成后第二次数量更新.
将传输之外的远程服务调用与全双工请求/响应消息传递一起移动,可确保将作为流程管理器的传奇与消息处理程序之间的关注点作为集成点进行良好分离.
伪代码
public class MySaga { public void Handle(MyMessage message) { if(Data.CurrentlyUpdatingQuantity) return; //or schedule for later Data.CurrentlyUpdatingQuantity = true; Bus.Send(new PerformQuantityUpdateMessage(message.OrderId)); } public void Handle(QuantityUpdateResponse message) { Data.CurrentlyUpdatingQuantity = false; Bus.Send(new MyOtherMessage()); } }
独立的消息处理程序(不是SAGA的一部分)
public void Handle(PerformQuantityUpdateMessage message) { // call remote service to get current order quantity // do some logic and update remote service with difference between original and new quantity Bus.Reply(new QuantityUpdateResponse(message.OrderId)); }