Recently, I've been working on a tool which has to repeatedly launch executables and pipe large amounts of data into them and examine the result. As one aspect of this process is computationally intensive, I've also made it multithreaded using pthreads.
The software is only made to launch what I'll call "digesting" applications which consume arbitrary data from stdin until EOF, then print a simple output (like sha256sum
).
I have provided the most minimal and complete example as I can below. Unfortunately it is still quite large. High-level description:
sha256sum
with stdin/stdout pipes and use a simple RNG to fill a buffer, write to the child, close the write pipe, read the result, and call wait.This can be run with the thread count as the first argument. When using more than one thread, I find that the code rapidly gets stuck, with one or more worker threads stuck in calls to read()
, close()
or wait()
.
Valgrind's DRD and Helgrind tools find no issues/warnings with this code (and it works under them, somehow... - more deterministic scheduling I guess?).
A big step forward was discovering that threads share FDs created with pipe()
, so it is clear that each child after its creation must close any pipes that it inherited which were not created by its parent. Otherwise, duplicate pipes will prevent EOF from being delivered as intended.
However, this isn't sufficient to make the code work.
Why/how is there a race condition in this code?
I understand that there are easier ways to achieve the intended result, like using a library instead of using fork-exec of programs. But I wish to identify exactly why this isn't working as a learning experience. It has been very interesting already, but it's not quite there yet.
Code:
#define _GNU_SOURCE
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <inttypes.h>
#include <stdint.h>
#include <stdarg.h>
#include <sys/stat.h>
#include <sys/wait.h>
#include <fcntl.h>
#include <pthread.h>
#include <sched.h>
#include <assert.h>
#define MIN(a, b) (((a) > (b)) ? (b) : (a))
#define LOG_PERROR(func, failCond) \
do { \
if (failCond) { \
perror(#func); \
abort(); \
} \
} \
while (0)
#define LOG_PTHREAD(funcCall) \
do { \
const int res = (funcCall); \
if (res != 0) { \
fprintf(stderr, #funcCall " failed with ret code %" PRId32 "\n", \
res); \
abort(); \
} \
} \
while (0)
typedef struct
{
uint32_t thread_count;
pthread_mutex_t print_mutex;
pthread_mutex_t master_mutex;
pthread_cond_t master_cond;
/* Guarded by `master_mutex` */
uint32_t threads_finished_work;
pthread_mutex_t gate_mutex;
pthread_cond_t gate_cond;
/* Guarded by `gate_mutex` */
uint32_t gate_waiting;
int running;
} context_t;
typedef struct
{
context_t* ctx;
uint32_t thread_id;
uint32_t seed;
char in_buf[1024];
} thread_data_t;
static void printf_guarded(context_t* ctx, const char* fmt, ...)
{
va_list ap;
LOG_PTHREAD(pthread_mutex_lock(&ctx->print_mutex));
va_start(ap, fmt);
vprintf(fmt, ap);
va_end(ap);
LOG_PTHREAD(pthread_mutex_unlock(&ctx->print_mutex));
}
static void do_work(thread_data_t* const td)
{
const int32_t count = 100000000;
const uint32_t lcg_a = UINT32_C(1664525);
const uint32_t lcg_c = UINT32_C(1013904223);
char out_buf[65536];
char* args[2] = {"sha256sum", NULL};
int pipe_in[2];
int pipe_out[2];
pid_t child;
pid_t wait_res;
ssize_t cumul_bytes = 0;
ssize_t this_bytes;
ssize_t res;
assert((sizeof(count) % 4) == 0);
assert((sizeof(out_buf) % 4) == 0);
/*
printf_guarded(ctx, "Thread %2" PRIu32 " | seed: %10" PRIu32 "\n",
td->thread_id, td->seed);
*/
LOG_PERROR(pipe, pipe(pipe_in) == -1);
LOG_PERROR(pipe, pipe(pipe_out) == -1);
printf_guarded(td->ctx, "Thread %2" PRIu32
" | pipe_in = {%d, %d}, pipe_out = {%d, %d}\n",
td->thread_id, pipe_in[0], pipe_in[1], pipe_out[0], pipe_out[1]);
child = fork();
LOG_PERROR(fork, child == -1);
if (child == 0)
{
int i;
struct stat statbuf;
/* Search for and close pipes which my parent did not create. */
for (i = 0; i < getdtablesize(); i++)
{
if ((i != pipe_in[0]) && (i != pipe_in[1]) &&
(i != pipe_out[0]) && (i != pipe_out[1]))
{
if (fcntl(i, F_GETFD) != -1)
{
if (fstat(i, &statbuf) != -1)
{
if (S_ISFIFO(statbuf.st_mode))
{
printf("Thread %2" PRIu32 " | CHILD | "
"closing unexpected pipe: fd = %d\n",
td->thread_id, i);
}
}
}
}
}
LOG_PERROR(dup2, dup2(pipe_in[0], STDIN_FILENO) == -1);
LOG_PERROR(dup2, dup2(pipe_out[1], STDOUT_FILENO) == -1);
LOG_PERROR(close, close(pipe_in[0]) == -1);
LOG_PERROR(close, close(pipe_in[1]) == -1);
LOG_PERROR(close, close(pipe_out[0]) == -1);
LOG_PERROR(close, close(pipe_out[1]) == -1);
/* Never returns */
LOG_PERROR(execvp, execvp(args[0], args) == -1);
perror("execvp");
abort();
}
LOG_PERROR(close, close(pipe_in[0]) == -1);
LOG_PERROR(close, close(pipe_out[1]) == -1);
while (cumul_bytes < count)
{
int32_t i;
const ssize_t to_write_bytes =
MIN(count - cumul_bytes, (ssize_t) sizeof(out_buf));
assert((to_write_bytes % 4) == 0);
for (i = 0; i < (to_write_bytes / 4); i++)
{
const int32_t strided_idx = i * 4;
uint32_t* casted = (uint32_t*) &out_buf[strided_idx];
td->seed = lcg_a * td->seed + lcg_c;
*casted = td->seed;
}
this_bytes = 0;
while (this_bytes < to_write_bytes)
{
res = write(
pipe_in[1],
&out_buf[this_bytes],
to_write_bytes - this_bytes);
LOG_PERROR(write, res == -1);
this_bytes += res;
}
cumul_bytes += this_bytes;
}
/* Finished writing; close the write side of the in pipe. */
LOG_PERROR(close, close(pipe_in[1]) == -1);
this_bytes = 0;
do
{
res = read(
pipe_out[0],
&td->in_buf[this_bytes],
sizeof(td->in_buf) - this_bytes);
LOG_PERROR(read, res == -1);
if (this_bytes >= (ssize_t) sizeof(td->in_buf))
{
fprintf(stderr, "Soemthing went wrong!\n");
abort();
}
this_bytes += res;
}
while (res > 0);
LOG_PERROR(close, close(pipe_out[0]) == -1);
wait_res = waitpid(child, NULL, 0);
LOG_PERROR(waitpid, wait_res == -1);
if (wait_res != child)
{
fprintf(stderr, "waitpid returned unexpected result (%"
PRId32 ")\n", wait_res);
abort();
}
td->in_buf[this_bytes] = 0;
}
static void* thread_entry(void* arg)
{
thread_data_t* const td = (thread_data_t*) arg;
context_t* const ctx = td->ctx;
while (1)
{
/* Stop all threads at gate */
LOG_PTHREAD(pthread_mutex_lock(&ctx->gate_mutex));
ctx->gate_waiting++;
printf_guarded(ctx, "Thread %2" PRIu32 " | gate, WAITING\n",
td->thread_id);
LOG_PTHREAD(pthread_cond_wait(&ctx->gate_cond, &ctx->gate_mutex));
printf_guarded(ctx, "Thread %2" PRIu32 " | gate, RELEASED\n",
td->thread_id);
LOG_PTHREAD(pthread_mutex_unlock(&ctx->gate_mutex));
/* All threads are now released from the gate */
if (!ctx->running)
break;
do_work(td);
LOG_PTHREAD(pthread_mutex_lock(&ctx->master_mutex));
ctx->threads_finished_work++;
assert(ctx->threads_finished_work <= ctx->thread_count);
/* Last thread to finish should signal the master */
if (ctx->threads_finished_work == ctx->thread_count)
{
printf_guarded(ctx, "Thread %2" PRIu32
" | finished, signalling master\n",
td->thread_id);
LOG_PTHREAD(pthread_cond_signal(&ctx->master_cond));
}
else
{
printf_guarded(ctx, "Thread %2" PRIu32 " | finished\n", td->thread_id);
}
LOG_PTHREAD(pthread_mutex_unlock(&ctx->master_mutex));
}
return NULL;
}
int main(int argc, char** argv)
{
context_t ctx;
pthread_t* threads;
thread_data_t* tds;
uint32_t i, j;
if (argc > 1)
ctx.thread_count = strtoul(argv[1], NULL, 10);
else
ctx.thread_count = 1;
LOG_PTHREAD(pthread_mutex_init(&ctx.print_mutex, NULL));
LOG_PTHREAD(pthread_mutex_init(&ctx.master_mutex, NULL));
LOG_PTHREAD(pthread_cond_init(&ctx.master_cond, NULL));
ctx.threads_finished_work = 0;
LOG_PTHREAD(pthread_mutex_init(&ctx.gate_mutex, NULL));
LOG_PTHREAD(pthread_cond_init(&ctx.gate_cond, NULL));
ctx.gate_waiting = 0;
ctx.running = 1;
threads = malloc(sizeof(pthread_t) * ctx.thread_count);
tds = malloc(sizeof(thread_data_t) * ctx.thread_count);
for (i = 0; i < ctx.thread_count; i++)
{
tds[i].ctx = &ctx;
tds[i].thread_id = i;
tds[i].seed = i;
LOG_PTHREAD(pthread_create(&threads[i], NULL, thread_entry, &tds[i]));
}
for (i = 0; i < 5; i++)
{
int all_threads_at_gate = 0;
while (!all_threads_at_gate)
{
LOG_PTHREAD(pthread_mutex_lock(&ctx.gate_mutex));
assert(ctx.gate_waiting <= ctx.thread_count);
if (ctx.gate_waiting == ctx.thread_count)
all_threads_at_gate = 1;
LOG_PTHREAD(pthread_mutex_unlock(&ctx.gate_mutex));
if (!all_threads_at_gate)
sched_yield();
}
printf_guarded(&ctx, "====================\nStart of run %" PRIu32
"\nThread master has seen all threads at gate - "
"signalling them\n", i);
ctx.gate_waiting = 0;
/* The threads are now good to go. */
LOG_PTHREAD(pthread_mutex_lock(&ctx.gate_mutex));
LOG_PTHREAD(pthread_cond_broadcast(&ctx.gate_cond));
LOG_PTHREAD(pthread_mutex_unlock(&ctx.gate_mutex));
/* Wait for threads to do the work... */
LOG_PTHREAD(pthread_mutex_lock(&ctx.master_mutex));
assert(ctx.threads_finished_work <= ctx.thread_count);
if (ctx.threads_finished_work < ctx.thread_count)
{
/* Master beat the threads to this point (very likely). */
printf_guarded(&ctx, "Thread master is waiting for signal...\n");
LOG_PTHREAD(pthread_cond_wait(
&ctx.master_cond, &ctx.master_mutex));
}
else
{
printf_guarded(&ctx,
"Thread master didn't need to wait for signal!\n");
}
printf_guarded(&ctx, "Thread master is proceeding\n");
ctx.threads_finished_work = 0;
LOG_PTHREAD(pthread_mutex_unlock(&ctx.master_mutex));
for (j = 0; j < ctx.thread_count; j++)
{
printf_guarded(&ctx, "Thread %2" PRIu32 " | got output: %s",
tds[j].thread_id, tds[j].in_buf);
}
}
ctx.running = 0;
printf_guarded(&ctx, "Signalling threads...\n");
LOG_PTHREAD(pthread_mutex_lock(&ctx.gate_mutex));
assert(ctx.gate_waiting == ctx.thread_count);
ctx.running = 0;
pthread_cond_broadcast(&ctx.gate_cond);
LOG_PTHREAD(pthread_mutex_unlock(&ctx.gate_mutex));
printf_guarded(&ctx, "Joining threads...\n");
for (i = 0; i < ctx.thread_count; i++)
{
LOG_PTHREAD(pthread_join(threads[i], NULL));
printf_guarded(&ctx, "Thread %2" PRIu32 " | final seed: %10"
PRIu32 "\n", i, tds[i].seed);
}
printf_guarded(&ctx, "Now freeing...\n");
free(tds);
free(threads);
LOG_PTHREAD(pthread_mutex_destroy(&ctx.gate_mutex));
LOG_PTHREAD(pthread_cond_destroy(&ctx.gate_cond));
LOG_PTHREAD(pthread_mutex_destroy(&ctx.master_mutex));
LOG_PTHREAD(pthread_cond_destroy(&ctx.master_cond));
LOG_PTHREAD(pthread_mutex_destroy(&ctx.print_mutex));
return 0;
}
I apologise; in a rather embarrassing turn, I realise that my code was missing one line which I remembered adding but somehow managed to undo before testing and pasting the code into the question.
The close of the unexpected pipe is missing. Adding it:
/* Search for and close pipes which my parent did not create. */
for (i = 0; i < getdtablesize(); i++)
{
if ((i != pipe_in[0]) && (i != pipe_in[1]) &&
(i != pipe_out[0]) && (i != pipe_out[1]))
{
if (fcntl(i, F_GETFD) != -1)
{
if (fstat(i, &statbuf) != -1)
{
if (S_ISFIFO(statbuf.st_mode))
{
printf("Thread %2" PRIu32 " | CHILD | "
"closing unexpected pipe: fd = %d\n",
td->thread_id, i);
LOG_PERROR(close, close(i) == -1); /* WAS MISSING!! */
}
}
}
}
}
This means that the code seems to work properly.
As far as I am aware, there are no other issues with this code.
With regard to mutexes: they are copied on fork but are not used by the child, so I am sure that they have no effect.