From this question I know that I can call epoll_ctl(2)
while another thread is blocking on epoll_wait(2)
. I still have a question though.
When using epoll
with the EPOLLONESHOT
flag only one event is fired and the fd has to be rearmed using epoll_ctl(2)
. This is necessary so only one thread
will read from the fd and handle the result appropriately.
The following is a timeline that somewhat visualizes my supposed problem:
Thread1: Thread2: Kernel:
-----------------------------------------------------------------------
epoll_wait();
Receives chunk
dispatch chunk to thread 2
epoll_wait(); Handle chunk
Still handle chunk Receives chunk
Rearm fd for epoll
?
What happens on the question mark when the fd is rearmed after a chunk is received? Will epoll
fire an EPOLLIN
event, or will it block indefinitely although the socket is readable? Is my architecture at all sensible?
Your architecture is sensible, and it will work: epoll
will mark the file descriptor as readable and fire an EPOLLIN
event.
The documentation on this is scarce and subtle; the Q/A section of man 7 epoll
briefly mentions this:
Q8 Does an operation on a file descriptor affect the already collected but not yet reported events?
A8 You can do two operations on an existing file descriptor. Remove would be meaningless for this case. Modify will reread available I/O.
The two operations that you can do on an existing file descriptor (an existing file descriptor is a file descriptor that has been added to the epoll set in the past - this includes file descriptors that are waiting to be rearmed) are delete and modify. As the manpage mentions, delete is meaningless here, and modify will re-evaluate the conditions in the file descriptor.
Nothing beats a real world experiment though. The following program tests this edge case:
#include <stdio.h>
#include <pthread.h>
#include <signal.h>
#include <stdlib.h>
#include <assert.h>
#include <semaphore.h>
#include <sys/epoll.h>
#include <unistd.h>
static pthread_t tids[2];
static int epoll_fd;
static char input_buff[512];
static sem_t chunks_sem;
void *dispatcher(void *arg) {
struct epoll_event epevent;
while (1) {
printf("Dispatcher waiting for more chunks\n");
if (epoll_wait(epoll_fd, &epevent, 1, -1) < 0) {
perror("epoll_wait(2) error");
exit(EXIT_FAILURE);
}
ssize_t n;
if ((n = read(STDIN_FILENO, input_buff, sizeof(input_buff)-1)) <= 0) {
if (n < 0)
perror("read(2) error");
else
fprintf(stderr, "stdin closed prematurely\n");
exit(EXIT_FAILURE);
}
input_buff[n] = '\0';
sem_post(&chunks_sem);
}
return NULL;
}
void *consumer(void *arg) {
sigset_t smask;
sigemptyset(&smask);
sigaddset(&smask, SIGUSR1);
while (1) {
sem_wait(&chunks_sem);
printf("Consumer received chunk: %s", input_buff);
/* Simulate some processing... */
sleep(2);
printf("Consumer finished processing chunk.\n");
printf("Please send SIGUSR1 after sending more data to stdin\n");
int signo;
if (sigwait(&smask, &signo) < 0) {
perror("sigwait(3) error");
exit(EXIT_FAILURE);
}
assert(signo == SIGUSR1);
struct epoll_event epevent;
epevent.events = EPOLLIN | EPOLLONESHOT;
epevent.data.fd = STDIN_FILENO;
if (epoll_ctl(epoll_fd, EPOLL_CTL_MOD, STDIN_FILENO, &epevent) < 0) {
perror("epoll_ctl(2) error when attempting to readd stdin");
exit(EXIT_FAILURE);
}
printf("Readded stdin to epoll fd\n");
}
}
int main(void) {
sigset_t sigmask;
sigfillset(&sigmask);
if (pthread_sigmask(SIG_SETMASK, &sigmask, NULL) < 0) {
perror("pthread_sigmask(3) error");
exit(EXIT_FAILURE);
}
if ((epoll_fd = epoll_create(1)) < 0) {
perror("epoll_create(2) error");
exit(EXIT_FAILURE);
}
struct epoll_event epevent;
epevent.events = EPOLLIN | EPOLLONESHOT;
epevent.data.fd = STDIN_FILENO;
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, STDIN_FILENO, &epevent) < 0) {
perror("epoll_ctl(2) error");
exit(EXIT_FAILURE);
}
if (sem_init(&chunks_sem, 0, 0) < 0) {
perror("sem_init(3) error");
exit(EXIT_FAILURE);
}
if (pthread_create(&tids[0], NULL, dispatcher, NULL) < 0) {
perror("pthread_create(3) error on dispatcher");
exit(EXIT_FAILURE);
}
if (pthread_create(&tids[1], NULL, consumer, NULL) < 0) {
perror("pthread_create(3) error on consumer");
exit(EXIT_FAILURE);
}
size_t i;
for (i = 0; i < sizeof(tids)/sizeof(tids[0]); i++) {
if (pthread_join(tids[i], NULL) < 0) {
perror("pthread_join(3) error");
exit(EXIT_FAILURE);
}
}
return 0;
}
It works as follows: the dispatcher thread adds stdin
to an epoll set and then uses epoll_wait(2)
to fetch input from stdin
whenever it becomes readable. When input arrives, the dispatcher wakes up the worker thread, who prints the input and simulates some processing time by sleeping 2 seconds. In the meantime, the dispatcher goes back to the main loop and blocks in epoll_wait(2)
again.
The worker thread won't rearm stdin
until you tell it to by sending it SIGUSR1
. So, we just write some more stuff into stdin
, and then send SIGUSR1
to the process. The worker thread receives the signal, and only then it rearms stdin
- which is already readable by that time, and the dispatcher was already waiting on epoll_wait(2)
.
You can see from the output that the dispatcher is correctly awaken and everything works like a charm:
Dispatcher waiting for more chunks
testing 1 2 3 // Input
Dispatcher waiting for more chunks // Dispatcher notified worker and is waiting again
Consumer received chunk: testing 1 2 3
Consumer finished processing chunk.
Please send SIGUSR1 after sending more data to stdin
hello world // Input
Readded stdin to epoll fd // Rearm stdin; dispatcher is already waiting
Dispatcher waiting for more chunks // Dispatcher saw new input and is now waiting again
Consumer received chunk: hello world
Consumer finished processing chunk.
Please send SIGUSR1 after sending more data to stdin