当前位置:  开发笔记 > 编程语言 > 正文

epoll的EPOLLEXCLUSIVE模式如何与电平触发相互作用?

如何解决《epoll的EPOLLEXCLUSIVE模式如何与电平触发相互作用?》经验,为你挑选了1个好方法。

假设发生以下一系列事件:

我们设置了一个监听套接字

线程A等待侦听套接字变为可读的块,使用 EPOLLIN | EPOLLEXCLUSIVE

线程B也阻止等待侦听套接字变得可读,也使用 EPOLLIN | EPOLLEXCLUSIVE

传入连接到达侦听套接字,使套接字可读,内核选择唤醒线程A.

但是,在线程实际唤醒并调用之前accept,第二个传入连接到达侦听套接字.

这里,套接字已经可读,因此第二个连接不会改变它.这是级别触发的epoll,因此根据正常规则,第二个连接可以被视为无操作,第二个线程不需要被唤醒.......当然,不是第二个线程的醒来就会打败整个目的EPOLLEXCLUSIVE吗?但我对API设计师做正确事情的信任并不像以前那么强烈,我在文档中找不到任何可以排除这一点的东西.

问题

a)上面的场景是否可能,两个连接到达但只有线程被唤醒?或者是否保证侦听套接字上的每个不同的传入连接都会唤醒另一个线程?

b)是否有预测如何EPOLLEXCLUSIVE和水平触发的epoll相互作用的一般规则?

b)中什么EPOLLIN | EPOLLEXCLUSIVE以及EPOLLOUT | EPOLLEXCLUSIVE用于字节流FDS,像连接的TCP套接字或管道?例如,如果在管道已经可读的情况下有更多数据到达会发生什么?



1> Myst..:

编辑(原始答案是用于测试的代码之后)

为了确保事情清楚,我将讨论EPOLLEXCLUSIVE边缘触发事件(EPOLLET)以及水平触发事件,以显示这些影响预期行为.

如你所知:

边沿触发:设置后EPOLLET,只有当事件改变状态fd时才触发事件 - 意味着只触发第一个事件,并且在完全处理该事件之前不会触发新事件.

此设计明确意味着防止epoll_wait由于正在处理的事件而返回(即,当新数据EPOLLIN已经被提升但read尚未被调用但未读取所有数据时).

边缘触发事件规则很简单,所有相同类型(即EPOLLIN)事件被合并,直到处理所有可用数据.

对于侦听套接字,在使用EPOLLIN所有现有的listen"积压"套接字之前,不会再次触发事件accept.

在字节流的情况下,直到从流中读取了所有可用字节(缓冲区被清空)之后才会触发新事件.

触发级别:另一方面,级别触发事件将更接近传统select(或poll)操作的方式,允许epoll与旧代码一起使用.

事件合并规则更复杂:相同类型的事件仅在没有人等待事件(没有人等待epoll_wait返回)时合并,或者如果多个事件发生之前epoll_wait可以返回 ...否则任何事件都会导致epoll_wait回来.

在侦听套接字的情况下,EPOLLIN每次客户端连接时都会触发事件...除非没有人等待epoll_wait返回,在这种情况下,下一次调用epoll_wait将立即返回,并且在EPOLLIN此期间发生的所有事件将已被合并为一个单一事件.

在字节流的情况下,每次新数据进入时都会触发新事件......除非当然没有人等待epoll_wait返回,在这种情况下,下一次调用将立即返回所有到达的数据util epoll_wait返回(即使它到达不同的块/事件).

独占返回:该EPOLLEXCLUSIVE标志用于防止"雷鸣般的听觉"行为,因此epoll_wait每个fd唤醒事件只唤醒一个呼叫者.

正如我前面指出的那样,边沿触发状态,一个fd唤醒事件是一个变化fd状态.因此,所有EPOLLIN事件都将被提升,直到读取所有数据(侦听套接字的待办事项已清空).

另一方面,对于关卡触发的事件,每个事件EPOLLIN都会调用一个唤醒事件.如果没有人在等待,这些事件将被合并.

按照您的问题中的示例:

对于级别触发事件:每次客户端连接时,单个线程将从epoll_wait... 返回...但是,如果两个线程忙于接受前两个客户端时连接另外两个客户端,则这些EPOLLIN事件将合并为单个事件,并且下一次调用epoll_wait将立即返回该合并事件.

在问题中给出的示例的上下文中,线程B预期由于epoll_wait返回而"唤醒" .

在这种情况下,两个线程都将"竞争" accept.

但是,这并没有打败EPOLLEXCLUSIVE指令或意图.

EPOLLEXCLUSIVE指令旨在防止"雷鸣般的听觉"现象.在这种情况下,两个线程竞相接受两个连接.每个线程都可以(可能)accept安全地调用,没有错误.如果使用三个线程,第三个线程将继续睡眠.

如果EPOLLEXCLUSIVE未使用,则epoll_wait只要连接可用,就会唤醒所有线程,这意味着一旦第一个连接到达,两个线程就会竞相接受单个连接(导致一个连接可能出错)他们).

对于边缘触发事件:预计只有一个线程接收"唤醒"调用.该线程应该是accept所有等待的连接(清空listen"积压").EPOLLIN在积压清空之前,不会再为该套接字引发任何事件.

这同样适用于可读套接字和管道.被唤醒的线程预计会处理所有可读数据.这可以防止等待线程同时尝试读取数据并遇到文件锁定竞争条件.

我建议(这就是我所做的)将侦听套接字设置为非阻塞模式并调用accept循环直到引发EAGAIN(或EWOULDBLOCK)错误,表明积压是空的.没有办法避免事件被合并的风险.从套接字读取也是如此.

使用代码测试:

我写了一个简单的测试,有一些sleep命令和阻塞套接字.仅在两个线程开始等待后才启动客户端套接字epoll.

客户端线程启动被延迟,因此客户端1和客户端2开始分开.

Once a server thread is woken up, it will sleep for a second (allowing the second client to do it's thing) before calling accept. Maybe the servers should sleep a little more, but it seems close enough to manage the scheduler without resorting to conditional variables.

Here are the results of my test code (which might be a mess, I'm not the best person for test design)...

On Ubuntu 16.10, which supports EPOLLEXCLUSIVE, the test results show that the listening threads are woken up one after the other, in response to the clients. In the example in the question, thread B is woken up.

Test address: :8000
Server thread 2 woke up with 1 events
Server thread 2 will sleep for a second, to let things happen.
client number 1 connected
Server thread 1 woke up with 1 events
Server thread 1 will sleep for a second, to let things happen.
client number 2 connected
Server thread 2 accepted a connection and saying hello.
client 1: Hello World - from server thread 2.
Server thread 1 accepted a connection and saying hello.
client 2: Hello World - from server thread 1.

To compare with Ubuntu 16.04 (without EPOLLEXCLUSIVE support), than both threads are woken up for the first connection. Since I use blocking sockets, the second thread hangs on accept until client # 2 connects.

main.c:178:2: warning: #warning EPOLLEXCLUSIVE undeclared, test is futile [-Wcpp]
 #warning EPOLLEXCLUSIVE undeclared, test is futile
  ^
Test address: :8000
Server thread 1 woke up with 1 events
Server thread 1 will sleep for a second, to let things happen.
Server thread 2 woke up with 1 events
Server thread 2 will sleep for a second, to let things happen.
client number 1 connected
Server thread 1 accepted a connection and saying hello.
client 1: Hello World - from server thread 1.
client number 2 connected
Server thread 2 accepted a connection and saying hello.
client 2: Hello World - from server thread 2.

For one more comparison, the results for level triggered kqueue show that both threads are awoken for the first connection. Since I use blocking sockets, the second thread hangs on accept until client # 2 connects.

Test address: :8000
client number 1 connected
Server thread 2 woke up with 1 events
Server thread 1 woke up with 1 events
Server thread 2 will sleep for a second, to let things happen.
Server thread 1 will sleep for a second, to let things happen.
Server thread 2 accepted a connection and saying hello.
client 1: Hello World - from server thread 2.
client number 2 connected
Server thread 1 accepted a connection and saying hello.
client 2: Hello World - from server thread 1.

My test code was (sorry for the lack of comments and the messy code, I wasn't writing for future maintenance):

#ifndef _GNU_SOURCE
#define _GNU_SOURCE
#endif

#define ADD_EPOLL_OPTION 0 // define as EPOLLET or 0

#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 

#if !defined(__linux__) && !defined(__CYGWIN__)
#include 
#define reactor_epoll 0
#else
#define reactor_epoll 1
#include 
#include 
#endif

int sock_listen(const char *address, const char *port);
void *listen_threard(void *arg);
void *client_thread(void *arg);
int server_fd;
char const *address = NULL;
char const *port = "8000";

int main(int argc, char const *argv[]) {
  if (argc == 2) {
    port = argv[1];
  } else if (argc == 3) {
    port = argv[2];
    address = argv[1];
  }
  fprintf(stderr, "Test address: %s:%s\n", address ? address : "", port);
  server_fd = sock_listen(address, port);
  /* code */
  pthread_t threads[4];

  for (size_t i = 0; i < 2; i++) {
    if (pthread_create(threads + i, NULL, listen_threard, (void *)i))
      perror("couldn't initiate server thread"), exit(-1);
  }
  for (size_t i = 2; i < 4; i++) {
    sleep(1);
    if (pthread_create(threads + i, NULL, client_thread, (void *)i))
      perror("couldn't initiate client thread"), exit(-1);
  }
  // join only server threads.
  for (size_t i = 0; i < 2; i++) {
    pthread_join(threads[i], NULL);
  }
  close(server_fd);
  sleep(1);
  return 0;
}

/**
Sets a socket to non blocking state.
*/
inline int sock_set_non_block(int fd) // Thanks to Bjorn Reese
{
/* If they have O_NONBLOCK, use the Posix way to do it */
#if defined(O_NONBLOCK)
  /* Fixme: O_NONBLOCK is defined but broken on SunOS 4.1.x and AIX 3.2.5. */
  int flags;
  if (-1 == (flags = fcntl(fd, F_GETFL, 0)))
    flags = 0;
  // printf("flags initial value was %d\n", flags);
  return fcntl(fd, F_SETFL, flags | O_NONBLOCK);
#else
  /* Otherwise, use the old way of doing it */
  static int flags = 1;
  return ioctl(fd, FIOBIO, &flags);
#endif
}

/* open a listenning socket */
int sock_listen(const char *address, const char *port) {
  int srvfd;
  // setup the address
  struct addrinfo hints;
  struct addrinfo *servinfo;       // will point to the results
  memset(&hints, 0, sizeof hints); // make sure the struct is empty
  hints.ai_family = AF_UNSPEC;     // don't care IPv4 or IPv6
  hints.ai_socktype = SOCK_STREAM; // TCP stream sockets
  hints.ai_flags = AI_PASSIVE;     // fill in my IP for me
  if (getaddrinfo(address, port, &hints, &servinfo)) {
    perror("addr err");
    return -1;
  }
  // get the file descriptor
  srvfd =
      socket(servinfo->ai_family, servinfo->ai_socktype, servinfo->ai_protocol);
  if (srvfd <= 0) {
    perror("socket err");
    freeaddrinfo(servinfo);
    return -1;
  }
  // // keep the server socket blocking for the test.
  // // make sure the socket is non-blocking
  // if (sock_set_non_block(srvfd) < 0) {
  //   perror("couldn't set socket as non blocking! ");
  //   freeaddrinfo(servinfo);
  //   close(srvfd);
  //   return -1;
  // }

  // avoid the "address taken"
  {
    int optval = 1;
    setsockopt(srvfd, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval));
  }
  // bind the address to the socket
  {
    int bound = 0;
    for (struct addrinfo *p = servinfo; p != NULL; p = p->ai_next) {
      if (!bind(srvfd, p->ai_addr, p->ai_addrlen))
        bound = 1;
    }

    if (!bound) {
      // perror("bind err");
      freeaddrinfo(servinfo);
      close(srvfd);
      return -1;
    }
  }
  freeaddrinfo(servinfo);
  // listen in
  if (listen(srvfd, SOMAXCONN) < 0) {
    perror("couldn't start listening");
    close(srvfd);
    return -1;
  }
  return srvfd;
}
/* will start listenning, sleep for 5 seconds, then accept all the backlog and
 * finish */
void *listen_threard(void *arg) {

  int epoll_fd;
  ssize_t event_count;
#if reactor_epoll

#ifndef EPOLLEXCLUSIVE
#warning EPOLLEXCLUSIVE undeclared, test is futile
#define EPOLLEXCLUSIVE 0
#endif
  // create the epoll wait fd
  epoll_fd = epoll_create1(0);
  if (epoll_fd < 0)
    perror("couldn't create epoll fd"), exit(1);
  // add the server fd to the epoll watchlist
  {
    struct epoll_event chevent = {0};
    chevent.data.ptr = (void *)((uintptr_t)server_fd);
    chevent.events =
        EPOLLOUT | EPOLLIN | EPOLLERR | EPOLLEXCLUSIVE | ADD_EPOLL_OPTION;
    epoll_ctl(epoll_fd, EPOLL_CTL_ADD, server_fd, &chevent);
  }
  // wait with epoll
  struct epoll_event events[10];
  event_count = epoll_wait(epoll_fd, events, 10, 5000);
#else
  // testing on BSD, use kqueue
  epoll_fd = kqueue();
  if (epoll_fd < 0)
    perror("couldn't create kqueue fd"), exit(1);
  // add the server fd to the kqueue watchlist
  {
    struct kevent chevent[2];
    EV_SET(chevent, server_fd, EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0,
           (void *)((uintptr_t)server_fd));
    EV_SET(chevent + 1, server_fd, EVFILT_WRITE, EV_ADD | EV_ENABLE, 0, 0,
           (void *)((uintptr_t)server_fd));
    kevent(epoll_fd, chevent, 2, NULL, 0, NULL);
  }
  // wait with kqueue
  static struct timespec reactor_timeout = {.tv_sec = 5, .tv_nsec = 0};
  struct kevent events[10];
  event_count = kevent(epoll_fd, NULL, 0, events, 10, &reactor_timeout);
#endif

  close(epoll_fd);

  if (event_count <= 0) {
    fprintf(stderr, "Server thread %lu wakeup no events / error\n",
            (size_t)arg + 1);
    perror("errno ");
    return NULL;
  }

  fprintf(stderr, "Server thread %lu woke up with %lu events\n",
          (size_t)arg + 1, event_count);
  fprintf(stderr,
          "Server thread %lu will sleep for a second, to let things happen.\n",
          (size_t)arg + 1);
  sleep(1);
  int connfd;
  struct sockaddr_storage client_addr;
  socklen_t client_addrlen = sizeof client_addr;
  /* accept up all connections. we're non-blocking, -1 == no more connections */
  if ((connfd = accept(server_fd, (struct sockaddr *)&client_addr,
                       &client_addrlen)) >= 0) {
    fprintf(stderr,
            "Server thread %lu accepted a connection and saying hello.\n",
            (size_t)arg + 1);
    if (write(connfd, arg ? "Hello World - from server thread 2."
                          : "Hello World - from server thread 1.",
              35) < 35)
      perror("server write failed");
    close(connfd);
  } else {
    fprintf(stderr, "Server thread %lu failed to accept a connection",
            (size_t)arg + 1);
    perror(": ");
  }
  return NULL;
}

void *client_thread(void *arg) {

  int fd;
  // setup the address
  struct addrinfo hints;
  struct addrinfo *addrinfo;       // will point to the results
  memset(&hints, 0, sizeof hints); // make sure the struct is empty
  hints.ai_family = AF_UNSPEC;     // don't care IPv4 or IPv6
  hints.ai_socktype = SOCK_STREAM; // TCP stream sockets
  hints.ai_flags = AI_PASSIVE;     // fill in my IP for me
  if (getaddrinfo(address, port, &hints, &addrinfo)) {
    perror("client couldn't initiate address");
    return NULL;
  }
  // get the file descriptor
  fd =
      socket(addrinfo->ai_family, addrinfo->ai_socktype, addrinfo->ai_protocol);
  if (fd <= 0) {
    perror("client couldn't create socket");
    freeaddrinfo(addrinfo);
    return NULL;
  }
  // // // Leave the socket blocking for the test.
  // // make sure the socket is non-blocking
  // if (sock_set_non_block(fd) < 0) {
  //   freeaddrinfo(addrinfo);
  //   close(fd);
  //   return -1;
  // }

  if (connect(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) < 0 &&
      errno != EINPROGRESS) {
    fprintf(stderr, "client number %lu FAILED\n", (size_t)arg - 1);
    perror("client connect failure");
    close(fd);
    freeaddrinfo(addrinfo);
    return NULL;
  }
  freeaddrinfo(addrinfo);
  fprintf(stderr, "client number %lu connected\n", (size_t)arg - 1);
  char buffer[128];
  if (read(fd, buffer, 35) < 35) {
    perror("client: read error");
    close(fd);
  } else {
    buffer[35] = 0;
    fprintf(stderr, "client %lu: %s\n", (size_t)arg - 1, buffer);
    close(fd);
  }
  return NULL;
}

P.S.

As a final recommendation, I would consider having no more than a single thread and a single epoll fd per process. This way the "thundering heard" is a non-issue and EPOLLEXCLUSIVE (which is still very new and isn't widely supported) can be disregarded... the only "thundering heard" this still exposes is for the limited amount of shared sockets, where the race condition might be good for load balancing.


Original Answer

I'm not sure I understand the confusion, so I'll go over EPOLLET and EPOLLEXCLUSIVE to show their combined expected behavior.

As you well know:

Once you set EPOLLET (edge triggered), events are triggered on fd state changes rather than fd events.

This design is explicitly meant to prevent epoll_wait from returning due to an event that is in the process of being handled (i.e., when new data arrives while the EPOLLIN was already raised but read hadn't been called or not all of the data was read).

In the case of a listening socket, the EPOLLIN event won't be triggered again until all existing listen "backlog" sockets have been accepted using accept.

The EPOLLEXCLUSIVE flag is used to prevent the "thundering heard" behavior, so only a single epoll_wait caller is woken up for each fd wake-up event.

As I pointed out before, for edge-triggered states, an fd wake-up event is a change in the fd state. So all EPOLLIN events will be raised until all data was read (the listening socket's backlog was emptied).

When merging these behaviors, and following the example in your question, only one thread is expected to receive the "wake up" call. That thread is expected to accept all waiting connections (empty the listen "backlog") or no more EPOLLIN events will be raised for that socket.

The same applies to readable sockets and pipes. The thread that was woken up is expected to deal with all the readable data. This prevents to waiting threads from attempting to read the data concurrently and experiencing file lock race conditions.

I would recommend that you consider avoiding the edge triggered events if you mean to call accept only once for each epoll_wait wake-up event. Regardless of using EPOLLEXCLUSIVE, you run the risk of not emptying the existing "backlog", so that no new wake-up events will be raised.

Alternatively, I would recommend (and this is what I do) to set the listening socket to non-blocking mode and calling accept in a loop until and an EAGAIN (or EWOULDBLOCK) error is raised, indicating that the backlog is empty.


EDIT 1: Level Triggered Events

It seems, as Nathaniel pointed out in the comment, that I totally misunderstood the question... I guess I'm used to EPOLLET being the misunderstood element.

So, what happens with normal, level-triggered, events (NOT EPOLLET)?

Well... the expected behavior is the exact mirror image (opposite) of edge triggered events.

For listenning sockets, the epoll_wait is expected return whenever a new connected is available, whether accept was called after a previous event or not.

Events are only "merged" if no-one is waiting with epoll_wait... in which case the next call for epoll_wait will return immediately.

In the context of the example given in the question, thread B is expected to "wake up" due to epoll_wait returning.

In this case, both threads will "race" towards accept.

However, this doesn't defeat the EPOLLEXCLUSIVE directive or intent.

The EPOLLEXCLUSIVE directive is meant to prevent the "thundering heard" phenomenon. In this case, two threads are racing to accept two connections. Each thread can (presumably) call accept safely, with no errors. If three threads were used, the third would keep on sleeping.

If the EPOLLEXCLUSIVE weren't used, all the epoll_wait threads would have been woken up whenever a connection was available, meaning that as soon as the first connection arrived, both threads would have been racing to accept a single connection (resulting in a possible error for one of them).

推荐阅读
可爱的天使keven_464
这个屌丝很懒,什么也没留下!
DevBox开发工具箱 | 专业的在线开发工具网站    京公网安备 11010802040832号  |  京ICP备19059560号-6
Copyright © 1998 - 2020 DevBox.CN. All Rights Reserved devBox.cn 开发工具箱 版权所有