我正在尝试Azure Service Bus队列.我有以下代码:
队列发送:
string strConnectionString = ConfigurationManager.AppSettings["Microsoft.ServiceBus.ConnectionString"]; var namespaceManager = NamespaceManager.CreateFromConnectionString(strConnectionString); if (!namespaceManager.QueueExists("Test")) { QueueDescription qD = new QueueDescription("Test"); qD.DefaultMessageTimeToLive = new TimeSpan(05, 00, 00); qD.LockDuration = new TimeSpan(00, 02, 30); qD.MaxSizeInMegabytes = 5120; namespaceManager.CreateQueue(qD); } if (namespaceManager.QueueExists("Test")) { QueueClient client = QueueClient.CreateFromConnectionString(strConnectionString, "Test", ReceiveMode.PeekLock); var qMessage = Console.ReadLine(); using (MemoryStream strm = new MemoryStream(Encoding.UTF8.GetBytes(qMessage))) { BrokeredMessage bMsg = new BrokeredMessage(strm); bMsg.MessageId = Guid.NewGuid().ToString(); bMsg.TimeToLive = new TimeSpan(05, 00, 00); client.Send(bMsg); Console.WriteLine("Message sent"); } } Console.ReadLine();
接收代码:
string strConnectionString = ConfigurationManager.AppSettings["Microsoft.ServiceBus.ConnectionString"]; var namespaceManager = NamespaceManager.CreateFromConnectionString(strConnectionString); if (namespaceManager.QueueExists("Test")) { QueueClient client = QueueClient.CreateFromConnectionString(strConnectionString, "Test",ReceiveMode.PeekLock); if (client != null) { OnMessageOptions options = new OnMessageOptions(); options.AutoComplete = false; options.AutoRenewTimeout = TimeSpan.FromSeconds(31); client.OnMessage((message) => { Console.WriteLine(message.State.ToString()); Console.WriteLine("Message Id: " + message.MessageId); Stream stream = message.GetBody(); StreamReader reader = new StreamReader(stream); Console.WriteLine("Message: " + reader.ReadToEnd()); Console.WriteLine("***************"); message.Abandon(); }); Console.ReadLine(); } }
我看到每当我打电话给Abandon时,消息就会变成DeadLettered.我的假设是它应该变为活动状态并且可以被另一个客户端接收.
您对BrokeredMessage.Abandon Api的理解是正确的.它旨在放弃在消息上获取的peek-lock(但不放弃消息本身),因此,它可供其他接收者选择消息.
首先是基础知识
"为什么":如果客户需要竞争 - 消费者(作业队列)语义 - 他们需要多个工作人员同时处理来自具有完全一次保证的队列的不同消息- 那么他们使用ReceiveMode.PeekLock.在此模型中,每个工作人员(队列接收者)都需要一种方法将其当前消息(作业)的进度传达给其他工作人员.因此,brokeredMessage提供了4种表达状态的功能.
'什么':
如果当前Worker成功处理了一条消息 - 调用BrokeredMessage.Complete()
如果当前工作程序无法处理BrokeredMessage,并希望在另一个Worker上重试处理 - 则放弃该消息.但是,这里的问题是:让我们说,有两个工人,他们每个人都认为另一个人可以处理这个消息并调用Abandon - 很快他们就会进入一个无限循环的重试来处理这个消息!因此,为了避免这种情况,我们在QueueDescription上提供了一个名为MaxDeliveryCount的配置.此设置保护消息从队列传递到接收方的次数限制.在上面的示例中,每次收到(并放弃)邮件时,ServiceBus服务上的"deliveryCount"都会递增.当它达到10时 - 消息已达到最大值.交付,因此,将是无法实现的.
如果当前接收者(工作者)确切知道,则该消息无法处理,BrokeredMessage.DeadLetter().这里的目标是让消费者应用程序定期审核死信消息.
如果当前接收者(工作者)无法处理此消息,但是,知道此消息可以在稍后的时间点处理BrokeredMessage.Defer().
HTH!SREE