我想有选择地删除AMQP队列中的消息,甚至不读取它们.
方案如下:
发送方希望根据X类型的新信息到达的事实使类型X的消息到期.因为订阅者很可能还没有使用类型X的最新消息,所以发布者应该只删除以前的X类型消息并将最新的消息放入队列中.整个操作应该对订户透明 - 实际上他应该使用像STOMP这样简单的东西来获取消息.
如何使用AMQP做到这一点?或者它可能在另一个消息传递协议中更方便?
我想避免复杂的基础设施.所需的整个消息传递如上所述:一个队列,一个订阅者,一个发布者,但发布者必须能够临时删除给定条件的消息.
发布者客户端将使用Ruby,但实际上,一旦我发现如何在协议中执行它,我就会处理任何语言.
您不需要消息队列,您需要一个键值数据库.例如,您可以使用Redis或Tokyo Tyrant来获取简单的网络可访问键值数据库.或者只使用内存缓存.
每种消息类型都是密钥.当您使用相同的密钥编写新消息时,它会覆盖以前的值,因此该数据库的读者将永远无法获得过时的信息.
此时,您只需要一个消息队列来建立应该读取密钥的顺序(如果这很重要).否则,只需不断扫描数据库.如果您不断扫描数据库,最好将数据库放在读卡器附近以减少网络流量.
我可能会做这样的事情
key: typecode
value: lastUpdated, important data
然后我会发送包含typecode, lastUpdated
这种方式的消息,
读者可以将该密钥的lastupdated与他们上次从数据库中读取的密钥进行比较,并跳过阅读,因为它们已经是最新的.
如果您确实需要使用AMQP执行此操作,请使用RabbitMQ和自定义交换类型,特别是Last Value Cache Exchange.示例代码在这里https://github.com/squaremo/rabbitmq-lvc-plugin
您目前无法在RabbitMQ(或更一般地,在AMQP中)自动执行此操作.但是,这是一个简单的解决方法.
假设您要发送三种类型的消息:Xs,Ys和Zs.如果我正确理解您的问题,当X消息到达时,您希望代理忘记所有其他尚未传递的X消息.
这在RabbitMQ中相当容易:
生产者声明三个队列:X,Y和Z(它们自动绑定到默认交换,其名称作为路由键,这正是我们想要的),
在发布消息时,生产者首先清除相关队列(因此,如果它发布X消息,它首先清除X队列); 这有效地删除了过时的消息,
消费者只是从它想要的队列中消费(X代表X消息,Y代表Y消息等); 从它的角度来看,它只需要做一个basic.get来获取下一个相关的消息.
这意味着两个生产者几乎同时发送相同类型的消息时的竞争条件.结果是,队列可能同时有两个(或更多)消息,但由于消息的数量上限于生成器的数量,并且因为多余的消息在下次发布时被清除这应该不是什么大问题.
总而言之,此解决方案只有一个额外步骤来自最佳解决方案,即在发布类型X的消息之前清除队列X.
如果您需要任何帮助来设置此配置,那么寻求建议的最佳位置是rabbitmq-discuss邮件列表.