UPDATE:
I realized using the producer_done
flag to determine the exit point from the consumer would cause trouble, so I rewrote the test code as below.
However, the mismatch still occurred, and I could not understand the output messages, especially
Output messages:
Jan 8 17:12:57 DESKTOP-VC5TFK0 LFQueue[378834]: Integrated concurrency test with 2 producer(s)/2 consumer(s), 2 items to enqueue/dequeue
Jan 8 17:12:57 DESKTOP-VC5TFK0 LFQueue[378834]: thread(140251564394048) total_items_consumed move forward to 1
Jan 8 17:12:57 DESKTOP-VC5TFK0 LFQueue[378834]: thread(140251564394048) total_items_consumed back to 0 due to empty
Jan 8 17:12:57 DESKTOP-VC5TFK0 LFQueue[378834]: thread(140251564394048) total_items_consumed move forward to 2
Jan 8 17:12:57 DESKTOP-VC5TFK0 LFQueue[378834]: thread(140251564394048) dequeue succeed! now total_items_consumed: 2
Jan 8 17:12:57 DESKTOP-VC5TFK0 LFQueue[378834]: thread(140251572786752) total_items_consumed move forward to 1
Jan 8 17:12:57 DESKTOP-VC5TFK0 LFQueue[378834]: thread(140251572786752) total_items_consumed step back to 2 due to empty queue
Jan 8 17:12:57 DESKTOP-VC5TFK0 LFQueue[378834]: thread(140251572786752) total_items_consumed move forward to 3
Jan 8 17:12:57 DESKTOP-VC5TFK0 LFQueue[378834]: thread(140251572786752) total_items_consumed step back to 2, then break;
Jan 8 17:12:57 DESKTOP-VC5TFK0 LFQueue[378834]: thread(140251564394048) total_items_consumed move forward to 3
Jan 8 17:12:57 DESKTOP-VC5TFK0 LFQueue[378834]: thread(140251564394048) total_items_consumed step back to 1, then break;
Jan 8 17:12:57 DESKTOP-VC5TFK0 LFQueue[378834]: FAILED
Jan 8 17:12:57 DESKTOP-VC5TFK0 LFQueue[378834]: Mismatch: Expected Consumed (2), Actual Consumed (1)
updated test code:
typedef struct
{
pthread_mutex_t lock;
pthread_cond_t start_cond;
int start_flag;
atomic_ulong total_items_produced;
atomic_ulong total_items_consumed;
} sync_primitives_t;
typedef struct
{
sync_primitives_t sync;
lf_queue_t queue;
unsigned long total_items;
} thread_args_t;
void *producer_thread(void *arg)
{
thread_args_t *args = (thread_args_t *)arg;
pthread_mutex_lock(&(args->sync.lock));
while (!(args->sync.start_flag))
{
pthread_cond_wait(&(args->sync.start_cond), &(args->sync.lock));
}
pthread_mutex_unlock(&(args->sync.lock));
int value = 1;
while (1)
{
unsigned long old_count = atomic_fetch_add(&(args->sync.total_items_produced), 1);
if (old_count >= args->total_items)
{
atomic_fetch_sub(&(args->sync.total_items_produced), 1);
break;
}
enqueueLF(&(args->queue), value);
}
return NULL;
}
void *consumer_thread(void *arg)
{
thread_args_t *args = (thread_args_t *)arg;
pthread_mutex_lock(&(args->sync.lock));
while (!(args->sync.start_flag))
{
pthread_cond_wait(&(args->sync.start_cond), &(args->sync.lock));
}
pthread_mutex_unlock(&(args->sync.lock));
int value = 0;
while (1)
{
unsigned long old_count = atomic_fetch_add(&(args->sync.total_items_consumed), 1);
syslog(LOG_DEBUG, "thread(%lu) total_items_consumed move forward to %lu",
(pthread_self()), atomic_load(&(args->sync.total_items_consumed)));
if (old_count >= args->total_items)
{
atomic_fetch_sub(&(args->sync.total_items_consumed), 1);
syslog(LOG_DEBUG, "thread(%lu) total_items_consumed step back to %lu, then break;",
(pthread_self()), atomic_load(&(args->sync.total_items_consumed)));
break;
}
int ret = dequeueLF(&(args->queue), &value);
if (ret != 0)
{
atomic_fetch_sub(&(args->sync.total_items_consumed), 1);
syslog(LOG_DEBUG, "thread(%lu) total_items_consumed step back to %lu due to empty queue",
(pthread_self()), atomic_load(&(args->sync.total_items_consumed)));
}
else
{
syslog(LOG_DEBUG, "thread(%lu) dequeue succeed! now total_items_consumed: %lu",
(pthread_self()), atomic_load(&(args->sync.total_items_consumed)));
}
}
return NULL;
}
int integrated_test(unsigned num_producers, unsigned num_consumers, unsigned long total_items)
{
syslog(LOG_INFO, "Integrated concurrency test with %d producer(s)/%d consumer(s), %lu items to enqueue/dequeue",
num_producers, num_consumers, total_items);
thread_args_t args = {
.sync = {.lock = PTHREAD_MUTEX_INITIALIZER,
.start_cond = PTHREAD_COND_INITIALIZER,
.start_flag = 0,
.total_items_produced = ATOMIC_VAR_INIT(0),
.total_items_consumed = ATOMIC_VAR_INIT(0)},
.queue = {0},
.total_items = total_items,
};
LFQueue_init(&args.queue);
pthread_t producer_threads[num_producers];
for (unsigned i = 0; i < num_producers; i++)
{
if (pthread_create(&producer_threads[i], NULL, producer_thread, &args) !=
0)
{
fprintf(stderr, "Failed to create producer thread %d.\n", i);
exit(EXIT_FAILURE);
}
}
pthread_t consumer_threads[num_consumers];
for (unsigned i = 0; i < num_consumers; i++)
{
if (pthread_create(&consumer_threads[i], NULL, consumer_thread, &args) !=
0)
{
fprintf(stderr, "Failed to create consumer thread %d.\n", i);
exit(EXIT_FAILURE);
}
}
pthread_mutex_lock(&args.sync.lock);
args.sync.start_flag = 1;
pthread_cond_broadcast(&args.sync.start_cond);
pthread_mutex_unlock(&args.sync.lock);
for (unsigned i = 0; i < num_producers; i++)
{
pthread_join(producer_threads[i], NULL);
}
for (unsigned i = 0; i < num_consumers; i++)
{
pthread_join(consumer_threads[i], NULL);
}
LFQueue_destory(&args.queue);
/* Determine test result */
unsigned long expected_produced = total_items;
unsigned long expected_consumed = total_items;
unsigned long actual_produced = atomic_load(&(args.sync.total_items_produced));
unsigned long actual_consumed = atomic_load(&(args.sync.total_items_consumed));
if (expected_produced != actual_produced)
{
syslog(LOG_ERR, "FAILED\n");
syslog(LOG_ERR, "Mismatch: Expected Produced (%lu), Actual Produced (%lu)",
expected_produced, actual_produced);
exit(EXIT_FAILURE);
return -1;
}
if (actual_consumed != expected_consumed)
{
syslog(LOG_ERR, "FAILED\n");
syslog(LOG_ERR, "Mismatch: Expected Consumed (%lu), Actual Consumed (%lu)",
expected_consumed, actual_consumed);
exit(EXIT_FAILURE);
return -1;
}
printf("SUCCESS\n");
return 0;
}
int main(void)
{
openlog("LFQueue", LOG_PID, LOG_USER);
unsigned long total_items = 2;
unsigned num_producers = 2;
unsigned num_consumers = 2;
integrated_test(num_producers, num_consumers, total_items);
closelog();
return EXIT_SUCCESS;
}
I've implemented a lock-free queue in C, mimicking the pseudo-code of enqueue and dequeue from Maged M. Michael's paper, and a test code for it. Most of the time it works, but sometimes it's failed due to a mismatch: message like
Mismatch: Expected Consumed (100), Actual Consumed (65)
, meaning there are still 35 nodes in the queue.
Although it's causing a memory leak, I deliberately choose not to free dequeued nodes and solely focus on the mismatch in the number of nodes enqueued/dequeued. Because no node is freed, there is no ABA problem.
You should able to run the code by simple copy/paste(s).
I use this shell command to run the program simultaneously:
for i in {1..10}; do ./main; done
source code:
#include <pthread.h>
#include <stdatomic.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
typedef struct node node_t;
struct node {
int data;
_Atomic(node_t *) next;
};
typedef struct lockFreeQueue lf_queue_t;
struct lockFreeQueue {
_Atomic(node_t *) head;
_Atomic(node_t *) tail;
};
int LFQueue_init(lf_queue_t *me) {
node_t *dummy = malloc(sizeof(struct node));
atomic_init(&(dummy->next), NULL);
atomic_init(&(me->head), dummy);
atomic_init(&(me->tail), dummy);
return 0;
}
int LFQueue_destory(lf_queue_t *me) {
node_t *curr = atomic_load_explicit(&me->head, memory_order_seq_cst);
node_t *next = NULL;
while (curr) {
next = atomic_load_explicit(&curr->next, memory_order_seq_cst);
free(curr);
curr = next;
}
return 0;
}
void enqueueLF(lf_queue_t *me, int data) {
node_t *newNode = malloc(sizeof(struct node));
newNode->data = data;
atomic_store_explicit(&newNode->next, NULL, memory_order_seq_cst);
node_t *t = NULL;
node_t *next = NULL;
while (1) {
t = atomic_load_explicit(&me->tail, memory_order_seq_cst);
next = atomic_load_explicit(&t->next, memory_order_seq_cst);
if (atomic_load_explicit(&me->tail, memory_order_seq_cst) != t) {
continue;
}
if (next != NULL) {
atomic_compare_exchange_strong_explicit(
&me->tail, &t, next, memory_order_seq_cst, memory_order_seq_cst);
continue;
}
node_t *expected = NULL;
if (atomic_compare_exchange_strong_explicit(&t->next, &expected, newNode,
memory_order_seq_cst,
memory_order_seq_cst)) {
break;
}
}
atomic_compare_exchange_strong_explicit(
&me->tail, &t, newNode, memory_order_seq_cst, memory_order_seq_cst);
}
int dequeueLF(lf_queue_t *me, int *output) {
node_t *h = NULL;
node_t *t = NULL;
node_t *next = NULL;
while (1) {
h = atomic_load_explicit(&me->head, memory_order_seq_cst);
t = atomic_load_explicit(&me->tail, memory_order_seq_cst);
next = atomic_load_explicit(&h->next, memory_order_seq_cst);
if (atomic_load_explicit(&me->head, memory_order_seq_cst) != h) {
continue;
}
if (next == NULL) { /*is empty*/
return -1;
}
if (h == t) { // help enqueue to advance tail pointer
atomic_compare_exchange_strong_explicit(
&me->tail, &t, next, memory_order_seq_cst, memory_order_seq_cst);
continue;
}
*output = next->data;
if (atomic_compare_exchange_strong_explicit(
&me->head, &h, next, memory_order_seq_cst, memory_order_seq_cst)) {
break;
}
}
return 0;
}
typedef struct {
pthread_mutex_t lock;
pthread_cond_t start_cond;
int start_flag;
atomic_bool producer_done;
} sync_primitives_t;
typedef struct {
sync_primitives_t sync;
lf_queue_t queue;
unsigned long num_items;
} thread_args_t;
void *producer_thread(void *arg) {
thread_args_t *args = (thread_args_t *)arg;
pthread_mutex_lock(&(args->sync.lock));
while (!(args->sync.start_flag)) {
pthread_cond_wait(&(args->sync.start_cond), &(args->sync.lock));
}
pthread_mutex_unlock(&(args->sync.lock));
unsigned long per_thread_produced = 0;
int value = 0;
for (unsigned long i = 0; i < args->num_items; i++) {
value++;
while (1) {
enqueueLF(&args->queue, value);
per_thread_produced++;
break;
}
}
unsigned long *ret = malloc(sizeof(unsigned long));
*ret = per_thread_produced;
return (void *)ret;
}
void *consumer_thread(void *arg) {
thread_args_t *args = (thread_args_t *)arg;
pthread_mutex_lock(&(args->sync.lock));
while (!(args->sync.start_flag)) {
pthread_cond_wait(&(args->sync.start_cond), &(args->sync.lock));
}
pthread_mutex_unlock(&(args->sync.lock));
unsigned long per_thread_consumed = 0;
int value = 0;
while (1) {
int ret = dequeueLF(&args->queue, &value);
if (ret == -1) {
/*empty queue*/
if (atomic_load(&(args->sync.producer_done)) == 1) {
break;
}
} else if (ret == 0) {
per_thread_consumed++;
} else {
}
}
unsigned long *ret = malloc(sizeof(unsigned long));
*ret = per_thread_consumed;
return (void *)ret;
}
int integrated_test(unsigned num_producers, unsigned long items_per_producer,
unsigned num_consumers) {
printf("Integrated concurrency test with %d producer(s)/%d consumer(s), %ld "
"per producer, %lu items to dequeue: ",
num_producers, num_consumers, items_per_producer,
(items_per_producer * num_producers));
thread_args_t args = {
.sync = {.lock = PTHREAD_MUTEX_INITIALIZER,
.start_cond = PTHREAD_COND_INITIALIZER,
.start_flag = 0,
.producer_done = ATOMIC_VAR_INIT(0)},
.queue = {0},
.num_items = items_per_producer,
};
LFQueue_init(&args.queue);
pthread_t producer_threads[num_producers];
for (unsigned i = 0; i < num_producers; i++) {
if (pthread_create(&producer_threads[i], NULL, producer_thread, &args) !=
0) {
fprintf(stderr, "Failed to create producer thread %d.\n", i);
for (unsigned j = 0; j < i; j++) {
pthread_join(producer_threads[j], NULL);
}
return -1;
}
}
pthread_t consumer_threads[num_consumers];
for (unsigned i = 0; i < num_consumers; i++) {
if (pthread_create(&consumer_threads[i], NULL, consumer_thread, &args) !=
0) {
fprintf(stderr, "Failed to create consumer thread %d.\n", i);
for (unsigned j = 0; j < i; j++) {
pthread_join(consumer_threads[j], NULL);
}
return -1;
}
}
pthread_mutex_lock(&args.sync.lock);
args.sync.start_flag = 1;
pthread_cond_broadcast(&args.sync.start_cond);
pthread_mutex_unlock(&args.sync.lock);
unsigned long actual_produced = 0;
void *ret = NULL;
for (unsigned i = 0; i < num_producers; i++) {
pthread_join(producer_threads[i], &ret);
if (ret) {
actual_produced = actual_produced + *((unsigned long *)ret);
free(ret);
}
}
atomic_store(&(args.sync.producer_done), 1);
unsigned long actual_consumed = 0;
ret = NULL;
for (unsigned i = 0; i < num_consumers; i++) {
pthread_join(consumer_threads[i], &ret);
if (ret) {
actual_consumed = actual_consumed + *((unsigned long *)ret);
free(ret);
}
}
LFQueue_destory(&args.queue);
/* Validate the total number of consumed items */
unsigned long expected_produced =
(unsigned long)num_producers * items_per_producer;
unsigned long expected_consumed = expected_produced;
/* Determine test result */
if (actual_produced != expected_produced) {
printf("FAILED\n");
printf("Mismatch: Expected Produced (%lu), Actual Produced (%lu)\n",
expected_produced, actual_produced);
exit(EXIT_FAILURE);
return -1;
}
if (actual_consumed != expected_consumed) {
printf("FAILED\n");
printf("Mismatch: Expected Consumed (%lu), Actual Consumed (%lu)\n",
expected_consumed, actual_consumed);
exit(EXIT_FAILURE);
return -1;
}
printf("SUCCESS\n");
return 0;
}
int main(void) {
int num_producers = 1;
int num_consumers = 1;
unsigned long items_per_producer = 100;
integrated_test(num_producers, items_per_producer, num_consumers);
return EXIT_SUCCESS;
}
However, the mismatch still occurred, and I could not understand the output messages, especially
- Why T48 is able to dequeue the second item without the first item being dequeued first?
It doesn't.
- Why T52 steps back total_items_consumed from 1 to 2?
It doesn't.
It looks like the order of the log messages probably is not consistent with the order of atomic operations upon which they are reporting. That would not be particularly surprising. You cannot judge directly from the order of the log how the actions of one thread are ordered with respect to the actions of the other.
Let's separate them, then:
thread(140251564394048) / T48
thread(140251572786752) / T52
There is a total order over all of the program's atomic operations. Here are the relevant highlights of one such order that is consistent with the per-thread log ordering and the logged results of the atomic operations:
atomic_fetch_add()
lands, advancing the consumed counter from 0 to 1 (T48.1)atomic_fetch_sub()
lands (T48.2)atomic_fetch_add()
lands, advancing the consumed counter from 0 to 1 (T52.1)atomic_fetch_add()
lands, advancing the consumed counter from 1 to 2 (T48.3)atomic_fetch_add()
lands, advancing the consumed counter from 2 to 3 (T48.5). It concludes that all expected items have been produced.atomic_fetch_add()
lands, advancing the consumed counter from 2 back to 3 (T52.3)So what's wrong?
Your consumer threads increment the consumed item count speculatively, before they actually consume an item. That's ok for them internally, in that they soon discover whether they need to decrement it again, but it causes problems for other consumer threads, which do not know that the consumed count they observe includes a not-yet-performed dequeue. Note that the final consumption count presented in the question reports correctly that only one item was dequeued. This reflects each thread doing the correct thing individually, based on what it concludes from the data available to it.
Often in programming, the best approach is to just attempt an action and deal appropriately with the result, whether success or failure, rather than trying to predict success or failure in advance. Here, the consumer should just attempt the dequeue. If that succeeds, then the consumer increments the consumed count. Only if the dequeue fails does the thread check the consumed count to see whether all expected items have already been consumed.
That may leave consumers spinning for a little while as they wait for the thread that consumes the last item to update the consumed count, but it avoids any thread observing a consumed count in excess of the number of items actually consumed. And the consumers already spin while waiting for items to be produced, so a little extra spinning does not make a qualitative difference.
Suggested consumer loop:
while (1) {
int ret = dequeueLF(&(args->queue), &value);
if (ret == 0) {
unsigned long old_count = atomic_fetch_add(&(args->sync.total_items_consumed), 1);
syslog(LOG_DEBUG, "thread(%lu) dequeue succeed! new total_items_consumed: %lu",
(unsigned long) pthread_self(), old_count + 1);
} else {
unsigned long consumed_count = atomic_load(&(args->sync.total_items_consumed));
syslog(LOG_DEBUG, "thread(%lu) dequeue failed with total_items_consumed: %lu",
(unsigned long) pthread_self(), consumed_count);
if (consumed_count >= args->total_items) {
syslog(LOG_DEBUG, "thread(%lu) observed all expected items consumed; terminating",
(unsigned long) pthread_self());
break;
}
}
}