我到处都在寻找在Node.js中headers exchange
使用RabbitMQ的示例。如果有人能指出我正确的方向,那就太好了。这是我到目前为止的内容:
发布者方法(创建发布者)
RabbitMQ.prototype.publisher = function(exchange, type) { console.log('New publisher, exchange: '+exchange+', type: '+type); amqp.then(function(conn) { conn.createConfirmChannel().then(function(ch) { publishers[exchange] = {}; publishers[exchange].assert = ch.assertExchange(exchange, type, {durable: true}); publishers[exchange].ch = ch; }); },function(err){ console.error("[AMQP]", err.message); return setTimeout(function(){ self.connect(URI); }, 1000); }).then(null, console.log); };
发布方法
RabbitMQ.prototype.publish = function(exchange, routingKey, content, headers) { try { publishers[exchange].assert.then(function(){ publishers[exchange].ch.publish(exchange, routingKey, new Buffer(content), { persistent: true, headers: headers }, function(err, ok) { if (err) { console.error("[AMQP] publish", err); offlinePubQueue.push([exchange, routingKey, content]); publishers[exchange].ch.connection.close(); } }); }); } catch (e) { console.error("[AMQP] publish", e.message); offlinePubQueue.push([exchange, routingKey, content]); } };
消费者方法(创建消费者)
RabbitMQ.prototype.consumer = function(exchange, type, routingKey, cb) { amqp.then(function(conn) { conn.createChannel().then(function(ch) { var ok = ch.assertExchange(exchange, type, {durable: true}); ok.then(function() { ch.assertQueue('', {exclusive: true}); }); ok = ok.then(function(qok) { var queue = qok.queue; ch.bindQueue(queue,exchange,routingKey) }); ok = ok.then(function(queue) { ch.consume(queue, function(msg){ cb(msg,ch); }, {noAck: false}); }); ok.then(function() { console.log(' [*] Waiting for logs. To exit press CTRL+C.'); }); }); }).then(null, console.warn); };
上面的例子适用于上面的例子topics
,但是我不知道如何过渡到headers
。我很确定我需要更改绑定方法,但是还没有找到任何有关如何精确完成此操作的示例。
任何帮助将不胜感激!
我偶然发现了这个问题,为amqplib寻找相同的答案。不幸的是,像您一样,我发现缺少所有可用的文档 。在查看了源代码,仔细阅读了协议并尝试了几种组合之后,这终于为我完成了。
... let opts = { headers: { 'asd': 'request', 'efg': 'test' }}; chan.publish(XCHANGE, '', Buffer.from(output), opts); ...
... let opts = { 'asd': 'request', 'efg': 'test', 'x-match': 'all' }; chan.bindQueue(q.queue, XCHANGE, '', opts); ...
完整的工作代码如下。下面的身份验证信息是伪造的,因此您必须使用自己的身份验证信息。我还使用了ES6,nodejs 6.5版和amqplib。给您的标题加上x-
前缀和/或使用保留字作为标题名称可能会出现问题,但是我不太确定(我必须查看RabbitMQ源代码)。
emit.js:
... let opts = { headers: { 'asd': 'request', 'efg': 'test' }}; chan.publish(XCHANGE, '', Buffer.from(output), opts); ...
receive.js:
... let opts = { 'asd': 'request', 'efg': 'test', 'x-match': 'all' }; chan.bindQueue(q.queue, XCHANGE, '', opts); ...