当前位置:  开发笔记 > 运维 > 正文

如何使zeromq PUB/SUB删除旧邮件而不是新邮件(用于实时订阅源)?

如何解决《如何使zeromqPUB/SUB删除旧邮件而不是新邮件(用于实时订阅源)?》经验,为你挑选了1个好方法。

假设我有一台PUB服务器,它zmq_send()SUB客户端的实时消息.如果客户端很忙并且zmq_recv()消息不够快,则消息将在客户端(和/或服务器)中缓冲.

如果缓冲区变得太大(高水位线),则将丢弃NEW消息.对于实时消息,这与人们想要的相反.应该删除旧消息以使其成为新消息.

有办法做到这一点吗?

理想情况下,我希望SUB客户端的接收队列为空或仅包含最新消息.当收到新消息时,它将替换旧消息.(我想这里的问题是客户端会zmq_recv()在队列为空时阻塞,浪费时间这样做.)

那么实时饲料通常如何实施ZeroMQ呢?



1> someone else..:

我会在这里回答我自己的问题.设置ZMQ_CONFLATE"仅保留最后一条消息"似乎很有希望,但它不适用于订阅过滤器.它只会在队列中保留一条消息.如果您有多个过滤器,则其他过滤器类型的旧消息和新消息都将被丢弃.

同样,zeromq指南的建议只是为了杀死慢速订阅者,但这似乎不是现实的解决方案.拥有不同阅读速度的订阅者,订阅相同的快速发布者,应该是正常的用例.其中一些订阅者可能生活在速度较慢的计算机上,其他订阅者也可能生活在快速计算机上等.ZeroMQ应该能够以某种方式处

http://zguide.zeromq.org/page:all#Slow-Subscriber-Detection-Suicidal-Snail-Pattern

我最终在客户端手动删除旧的排队消息.它似乎工作正常.我通过那种方式获得订阅的消息,这些消息的时间小于3毫秒(通过tcp localhost).即使在我有五千条10秒旧消息的情况下,这也适用于在后面那些少量实时消息前面的队列中.这对我来说已经足够了.

我不能帮助,但认为这是应该由图书馆提供的东西.它可能会做得更好.

无论如何这里是客户端,旧消息丢失,代码:

bool Empty(zmq::socket_t& socket) {
    bool ret = true;
    zmq::pollitem_t poll_item = { socket, 0, ZMQ_POLLIN, 0 };
    zmq::poll(&poll_item, 1, 0); //0 = no wait
    if (poll_item.revents & ZMQ_POLLIN) {
        ret = false;
    }
    return ret;
}

std::vector GetRealtimeSubscribedMessageVec(zmq::socket_t& socket_sub, int timeout_ms)
{
    std::vector ret;

    struct MessageTmp {
        int id_ = 0;
        std::string data_;
        boost::posix_time::ptime timestamp_;
    };

    std::map msg_map;

    int read_msg_count = 0;
    int time_in_loop = 0;
    auto start_of_loop = boost::posix_time::microsec_clock::universal_time();
    do {
        read_msg_count++;

        //msg format sent by publisher is: filter, timestamp, data
        MessageTmp msg;
        msg.id_ = boost::lexical_cast(s_recv(socket_sub));
        msg.timestamp_ = boost::posix_time::time_from_string(s_recv(socket_sub));
        msg.data_ = s_recv(socket_sub);

        msg_map[msg.id_] = msg;

        auto now = boost::posix_time::microsec_clock::universal_time();
        time_in_loop = (now - start_of_loop).total_milliseconds();
        if (time_in_loop > timeout_ms) {
            std::cerr << "Timeout reached. Publisher is probably sending messages quicker than we can drop them." << std::endl;
            break;
        }
    } while ((Empty(socket_sub) == false)); 

    if (read_msg_count > 1) {
        std::cout << "num of old queued up messages dropped: " << (read_msg_count - 1) << std::endl;
    }

    for (const auto &pair: msg_map) {
        const auto& msg_tmp = pair.second;

        auto now = boost::posix_time::microsec_clock::universal_time();
        auto message_age_ms = (now - msg_tmp.timestamp_).total_milliseconds();

        if (message_age_ms > timeout_ms) {
            std::cerr << "[SUB] Newest message too old. f:" << msg_tmp.id_ << ", age: " << message_age_ms << "ms, s:" << msg_tmp.data_.size() << std::endl;
        }
        else {
            std::cout << "[SUB] f:" << msg_tmp.id_ << ", age: " << message_age_ms << "ms, s:" << msg_tmp.data_.size() << std::endl;
            ret.push_back(msg_tmp.data_);
        }
    }

    return ret;
}

推荐阅读
oDavid_仔o_880
这个屌丝很懒,什么也没留下!
DevBox开发工具箱 | 专业的在线开发工具网站    京公网安备 11010802040832号  |  京ICP备19059560号-6
Copyright © 1998 - 2020 DevBox.CN. All Rights Reserved devBox.cn 开发工具箱 版权所有