当前位置:  开发笔记 > 编程语言 > 正文

在Node.js中使用RabbitMQ进行标头交换示例

如何解决《在Node.js中使用RabbitMQ进行标头交换示例》经验,为你挑选了1个好方法。

我到处都在寻找在Node.js中headers exchange使用RabbitMQ的示例。如果有人能指出我正确的方向,那就太好了。这是我到目前为止的内容:

发布者方法(创建发布者)

RabbitMQ.prototype.publisher = function(exchange, type) {
 console.log('New publisher, exchange: '+exchange+', type: '+type);
 amqp.then(function(conn) {
    conn.createConfirmChannel().then(function(ch) {
        publishers[exchange] = {};
        publishers[exchange].assert = ch.assertExchange(exchange, type, {durable: true});
        publishers[exchange].ch = ch;
    });
 },function(err){
    console.error("[AMQP]", err.message);
    return setTimeout(function(){
        self.connect(URI);
    }, 1000);
 }).then(null, console.log);
};

发布方法

RabbitMQ.prototype.publish = function(exchange, routingKey, content, headers) {
 try {    
    publishers[exchange].assert.then(function(){
        publishers[exchange].ch.publish(exchange, routingKey, new Buffer(content), { persistent: true, headers: headers }, function(err, ok) {
            if (err) {
                console.error("[AMQP] publish", err);
                offlinePubQueue.push([exchange, routingKey, content]);
                publishers[exchange].ch.connection.close();
            }
        });
    });
 } catch (e) {                                                                                                                               
    console.error("[AMQP] publish", e.message);
    offlinePubQueue.push([exchange, routingKey, content]);
 }
};

消费者方法(创建消费者)

RabbitMQ.prototype.consumer = function(exchange, type, routingKey, cb) {
 amqp.then(function(conn) {
  conn.createChannel().then(function(ch) {

    var ok = ch.assertExchange(exchange, type, {durable: true});

    ok.then(function() {
      ch.assertQueue('', {exclusive: true});
    });

    ok = ok.then(function(qok) {
      var queue = qok.queue;
      ch.bindQueue(queue,exchange,routingKey)
    });

    ok = ok.then(function(queue) {
      ch.consume(queue, function(msg){
            cb(msg,ch);
      }, {noAck: false});
    });

    ok.then(function() {
      console.log(' [*] Waiting for logs. To exit press CTRL+C.');
    });

  });
 }).then(null, console.warn);
};

上面的例子适用于上面的例子topics,但是我不知道如何过渡到headers。我很确定我需要更改绑定方法,但是还没有找到任何有关如何精确完成此操作的示例。

任何帮助将不胜感激!



1> Xunnamius..:

我偶然发现了这个问题,为amqplib寻找相同的答案。不幸的是,像您一样,我发现缺少所有可用的文档 。在查看了源代码,仔细阅读了协议并尝试了几种组合之后,这终于为我完成了。

...
let opts = { headers: { 'asd': 'request', 'efg': 'test' }};
chan.publish(XCHANGE, '', Buffer.from(output), opts);
...

...
let opts = { 'asd': 'request', 'efg': 'test', 'x-match': 'all' };
chan.bindQueue(q.queue, XCHANGE, '', opts);
...

完整的工作代码如下。下面的身份验证信息是伪造的,因此您必须使用自己的身份验证信息。我还使用了ES6,nodejs 6.5版和amqplib。给您的标题加上x-前缀和/或使用保留字作为标题名称可能会出现问题,但是我不太确定(我必须查看RabbitMQ源代码)。

emit.js:

...
let opts = { headers: { 'asd': 'request', 'efg': 'test' }};
chan.publish(XCHANGE, '', Buffer.from(output), opts);
...

receive.js:

...
let opts = { 'asd': 'request', 'efg': 'test', 'x-match': 'all' };
chan.bindQueue(q.queue, XCHANGE, '', opts);
...

推荐阅读
爱唱歌的郭少文_
这个屌丝很懒,什么也没留下!
DevBox开发工具箱 | 专业的在线开发工具网站    京公网安备 11010802040832号  |  京ICP备19059560号-6
Copyright © 1998 - 2020 DevBox.CN. All Rights Reserved devBox.cn 开发工具箱 版权所有