当前位置:  开发笔记 > 运维 > 正文

我是否可以使用Boost.Asio同步读取套接字并在多线程I/O服务上超时?

如何解决《我是否可以使用Boost.Asio同步读取套接字并在多线程I/O服务上超时?》经验,为你挑选了1个好方法。

我有一个使用Boost.Asio进行TCP和UDP套接字通信的应用程序.我知道"Asio"中的"A"代表异步,因此库倾向于鼓励您尽可能使用异步I/O. 我有一些情况下,优选同步套接字读取.但是,与此同时,我想在所述接收调用上设置超时,因此不可能无限期地进行读取阻塞.

这似乎是Boost.Asio用户中一个非常常见的问题,以下是关于该主题的以下Stack Overflow问题:

C++ Boost ASIO:如何读取/写入超时?

asio :: read with timeout

提升asio超时

如何在boost asio中阻止套接字设置超时?

甚至可能还有更多.文档中甚至有关于如何使用超时实现同步操作的示例.他们归结为将同步操作转换为异步操作,然后与a并行启动asio::deadline_timer.然后,计时器的到期处理程序可以在超时到期时取消异步读取.这看起来像这样(从上面链接的示例中获取的片段):

std::size_t receive(const boost::asio::mutable_buffer& buffer,
      boost::posix_time::time_duration timeout, boost::system::error_code& ec)
  {
    // Set a deadline for the asynchronous operation.
    deadline_.expires_from_now(timeout);

    // Set up the variables that receive the result of the asynchronous
    // operation. The error code is set to would_block to signal that the
    // operation is incomplete. Asio guarantees that its asynchronous
    // operations will never fail with would_block, so any other value in
    // ec indicates completion.
    ec = boost::asio::error::would_block;
    std::size_t length = 0;

    // Start the asynchronous operation itself. The handle_receive function
    // used as a callback will update the ec and length variables.
    socket_.async_receive(boost::asio::buffer(buffer),
        boost::bind(&client::handle_receive, _1, _2, &ec, &length));

    // Block until the asynchronous operation has completed.
    do io_service_.run_one(); while (ec == boost::asio::error::would_block);

    return length;
  }

这实际上是一个相对干净的解决方案:启动异步操作,然后手动轮询asio::io_service以一次执行一个异步处理程序,直到async_receive()完成或计时器到期.

但是,套接字的底层I/O服务已经在一个或多个后台线程中运行的情况呢?在这种情况下,不能保证异步操作的处理程序将由上述代码段中的前台线程运行,因此run_one()在稍后执行一些可能不相关的处理程序之前不会返回.这会使套接字读起来反应迟钝.

asio::io_service有一个poll_one()函数将检查服务的队列而不会阻塞,但我没有看到阻止前台线程(模拟同步调用行为)的好方法,直到处理程序执行,除了没有后台线程的情况asio::io_service::run()已经执行了.

我看到两种可能的解决方案,我都不喜欢:

    在启动异步操作后,使用条件变量或类似构造来创建前台线程块.在async_receive()调用的处理程序中,通知条件变量以取消阻塞线程.这会导致每次读取都有一些锁定,我想避免,因为我想在UDP套接字读取上实现最大可能的吞吐量.否则,它是可行的,并且可能是我会做的,除非出现一种优越的方法.

    确保套接字有自己的套接字asio::io_service没有被任何后台线程运行.这使得在需要的情况下使用套接字的异步I/O变得更加困难.

有关其他方式以安全方式实现此目的的任何想法吗?


旁白:以前的SO问题有一些答案,主张使用SO_RCVTIMEO套接字选项来实现套接字读取超时.这在理论上听起来很棒,但它似乎至少在我的平台上不起作用(Ubuntu 12.04,Boost v1.55).我可以设置套接字超时,但它不会给Asio带来预期的效果.相关代码在/boost/asio/detail/impl/socket_ops.ipp:

size_t sync_recvfrom(socket_type s, state_type state, buf* bufs,
    size_t count, int flags, socket_addr_type* addr,
    std::size_t* addrlen, boost::system::error_code& ec)
{
  if (s == invalid_socket)
  {
    ec = boost::asio::error::bad_descriptor;
    return 0;
  }

  // Read some data.
  for (;;)
  {
    // Try to complete the operation without blocking.
    signed_size_type bytes = socket_ops::recvfrom(
        s, bufs, count, flags, addr, addrlen, ec);

    // Check if operation succeeded.
    if (bytes >= 0)
      return bytes;

    // Operation failed.
    if ((state & user_set_non_blocking)
        || (ec != boost::asio::error::would_block
          && ec != boost::asio::error::try_again))
      return 0;

    // Wait for socket to become ready.
    if (socket_ops::poll_read(s, 0, ec) < 0)
      return 0;
  }
}

如果套接字读取超时,则对recvfrom()上面的调用将返回EAGAINEWOULDBLOCK转换为boost::asio::error::try_againboost::asio::error::would_block.在这种情况下,上面的代码将调用该poll_read()函数,对于我的平台看起来像:

int poll_read(socket_type s, state_type state, boost::system::error_code& ec)
{
  if (s == invalid_socket)
  {
    ec = boost::asio::error::bad_descriptor;
    return socket_error_retval;
  }

  pollfd fds;
  fds.fd = s;
  fds.events = POLLIN;
  fds.revents = 0;
  int timeout = (state & user_set_non_blocking) ? 0 : -1;
  clear_last_error();
  int result = error_wrapper(::poll(&fds, 1, timeout), ec);
  if (result == 0)
    ec = (state & user_set_non_blocking)
      ? boost::asio::error::would_block : boost::system::error_code();
  else if (result > 0)
    ec = boost::system::error_code();
  return result;
}

我剪掉了为其他平台有条件编译的代码.正如您所看到的,如果套接字不是非阻塞套接字,它最终会poll()以无限超时进行调用,因此阻塞直到套接字有数据被读取(并在超时时阻止尝试).因此,该SO_RCVTIMEO选项无效.



1> Tanner Sansb..:

Boost.Asio的支持futures可能会提供一个优雅的解决方案.当提供异步操作时,该boost::asio::use_future值作为其完成处理程序,启动函数将返回std::future将接收操作结果的对象.此外,如果操作以失败结束,error_code则转换为a system_error并通过传递给调用者future.

在Boost.Asio C++ 11 Futures datytime客户端示例中,一个专用线程运行io_service,主线程启动异步操作,然后在完成操作时同步等待,如下所示:

std::array recv_buf;
udp::endpoint sender_endpoint;
std::future recv_length =
  socket.async_receive_from(
      boost::asio::buffer(recv_buf),
      sender_endpoint,
      boost::asio::use_future);

// Do other things here while the receive completes.

std::cout.write(
    recv_buf.data(),
    recv_length.get()); // Blocks until receive is complete.

使用futures时,实现具有超时的同步读取的整体方法与以前相同.不使用同步读取,而是使用异步读取并异步等待计时器.唯一的小改动是,不是阻止io_service或定期检查谓词,而是调用future::get()阻塞直到操作完成或失败(例如超时).

如果C++ 11不可用,则可以为Boost.Thread定制异步操作​​的返回类型future,如本答案所示.

推荐阅读
ar_wen2402851455
这个屌丝很懒,什么也没留下!
DevBox开发工具箱 | 专业的在线开发工具网站    京公网安备 11010802040832号  |  京ICP备19059560号-6
Copyright © 1998 - 2020 DevBox.CN. All Rights Reserved devBox.cn 开发工具箱 版权所有