cmultithreadingdata-structuresthread-safetylookup-tables

How to achieve thread-safety in bidirectional tables using just 4 bits per entry, while avoiding any global locks?


I'm having trouble creating a safe and efficient locking mechanism for a particular problem. Here's an oversimplified version of the situation.

I have two tables called A and B, each of which contains space for (say) 10 entries. Of these 10 entries, 9 of them are pointers into the other table, and the remaining entry is NULL. At program start, the non-NULL entries start off in bijective correspondence. For example, if I follow a link from table A to table B and then back again, I arrive at my original location in A. Similarly, if I follow a link from table B to table A and then back again, I arrive at my original location in B. There's global variables for tracking the location of the NULL values.

After setting up this bijective correspondence, the program spawns two threads. The first thread ("Thread A") repeatedly does the following: it chooses an entry of A at random, swaps it into the NULL spot, and then proceeds to adjust the corresponding pointer in B in order to maintain the bijective correspondence. The other thread ("Thread B") does exactly the same thing, but starting with the other table. So it randomly chooses entries in B, swaps them into the NULL spot, and adjusts the corresponding pointer in A.

Question: How can we make this system thread-safe without locking the entire table, preferably using just the bottom 4 bits of each pointer?

Obviously, the actual situation I care about involves a much bigger tables, plus there'll be a (small amount) of data attached to each entry. Also, in the real thing, the movements aren't completely random; they serve a purpose. Feel free to ask if you want more details about what I'm actually trying to do, though I don't think these details are too relevant to solving the actual multithreading problem.

Addendum. I've just noticed that there's multiple versions of this problem. The easiest version of this says that if we pick an entry and realize it can't be moved, that's perfectly OK - we just randomly pick another entry and move that instead. The medium-difficulty version says that we're allowed to do this for A only. Meaning that in table B, we have to block and wait until movement is possible, and cannot simply choose another entry instead. That hardest version of this problem says that we're denied the right to simply give up and rerandomize for both tables. FWIW, I'm interested in all versions of the problem; thus, even if you can just solve the "easy" version, I would still value your answer.

Addendum 2. Here's some example code targeting x86-64/Linux, albeit without any thread safety mechanisms. The current design gives just 3 bits per pointer due to the way qwords are packed, but we can upgrade this to 4 bits if needed by using 128-bit entries instead.

#include <pthread.h>
#include <stdint.h>
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include <unistd.h>

// Clear the bottom 3 bits of a 64-bit value
#define GET_POINTER(val) (uint64_t*) ((val) & ~0x7ULL)

// Concatenates the top 61 bits of new_ptr with the bottom 3 bits of old_val
// so long as the bottom 3 bits of new_ptr are 0's
#define MODIFY_VALUE(new_ptr, old_val) ((uint64_t) (new_ptr)) ^ ((old_val) & 0x7ULL) 

// Declare globals
uint64_t A[10], B[10];
int index_of_NULL_value_A, index_of_NULL_value_B;

// Initialize tables
void init_globals() {

    // Initialize A[0] and B[0] to NULL pointers
    A[0] = (uint64_t) NULL; // Note that (uint64_t) NULL == 0;
    B[0] = (uint64_t) NULL; // Note that (uint64_t) NULL == 0;

    // Record the initial indexes of the NULL values
    index_of_NULL_value_A = 0;
    index_of_NULL_value_B = 0;

    // Create pointers from A to B
    for (int i = 1; i < 10; ++i) {
        A[i] = (uint64_t) &B[i];
    }

    // Create pointers from B to A
    for (int i = 1; i < 10; ++i) {
        B[i] = (uint64_t) &A[i];
    }

}

void verify_integrity_of_table(uint64_t* A, int index_of_NULL_value_A) {

    // Verify the forward direction
    for (int i = 0; i < 10; ++i) {

        if (i == index_of_NULL_value_A) {

            // Check that the NULL value is at this spot
            if (A[i] != (uint64_t) NULL) {
                fprintf(stderr, "Integrity check A failed! Missing NULL at i: %d\n", i);
                exit(1);
            }

        } else {

            // Check link integrity
            if (&A[i] != GET_POINTER(*GET_POINTER(A[i]))) {
                fprintf(stderr, "Integrity check A failed! Dodgy link at i: %d\n", i);
                exit(1);
            }

        }

    }

}

void verify_integrity() {

    // Verify the forward direction
    verify_integrity_of_table(A, index_of_NULL_value_A);

    // Verify the backward direction
    verify_integrity_of_table(B, index_of_NULL_value_B);

}

typedef void *(*start_routine_t)(void *);
pthread_t pthread_create_or_exit(start_routine_t start_routine) {

    // Declare variables
    pthread_t thread_id;
    int result;

    // Create the thread
    result = pthread_create(&thread_id, NULL, start_routine, NULL);
    if (result != 0) {
        perror("Failed to create thread!\n");
        exit(EXIT_FAILURE);
    }

    // Return the thread id
    return thread_id;

}

void do_a_random_swap(uint64_t* A, int* addr_of_index_of_NULL_value) {
    
    // Get the index of the NULL value
    int index_of_NULL = *addr_of_index_of_NULL_value;

    // Choose a random index
    int i = rand() % 10;
    while (i == index_of_NULL) {
        i = rand() % 10;
    }

    // Update the backpointer
    uint64_t* new_ptr = &A[index_of_NULL];
    uint64_t  old_val = *(GET_POINTER(A[i]));
    *GET_POINTER(A[i]) = MODIFY_VALUE(new_ptr, old_val);

    // Copy the item into the NULL spot and clear the old spot
    A[index_of_NULL] = A[i];
    A[i] = (uint64_t) NULL; // Note that (uint64_t) NULL == 0

    // Update the NULL index tracker
    *addr_of_index_of_NULL_value = i;

}

void* fst_start_routine(void* arg) {

    // Tell the compiler not to panic about the fact that we're not using this argument
    (void) arg;

    // Loop forever
    while (1) {
        if (time(NULL) % 2 == 0) {
            do_a_random_swap(A, &index_of_NULL_value_A);
        } else {
            sleep(0.1);
        }
    }

    // We never get here
    return NULL;

}

void* snd_start_routine(void* arg) {
    
    // Tell the compiler not to panic about the fact that we're not using this argument
    (void) arg;

    // Loop forever
    while (1) {
        if (time(NULL) % 2 == 0) {
            do_a_random_swap(B, &index_of_NULL_value_B);
        } else {
            sleep(0.1);
        }
    }

    // We never get here
    return NULL;

}

void* integrity_checker_start_routine(void* arg) {

    // Tell the compiler not to panic about the fact that we're not using this argument
    (void) arg;

    // Loop forever, checking the integrity of the system during odd seconds
    for (;; sleep(0.1)) {
        if (time(NULL) % 2 == 1) {
            verify_integrity();
        }
    }

    // We never get here
    return NULL;

}

int main() {

    // Initialize random seed
    srand(time(NULL));

    // Initialize table and NULL-index trackers
    init_globals();

    // Verify integrity of the initialized values
    verify_integrity();

    // Spawn some threads
    pthread_create_or_exit(fst_start_routine);
    pthread_create_or_exit(snd_start_routine);
    pthread_create_or_exit(integrity_checker_start_routine);

    // Loop forever
    while (1) {
        sleep(1);
    }

    // Return successfully (we never get here)
    return 0;

}

Solution

  • Locking primitives in high level languages do not function. https://jakob.engbloms.se/archives/65 You must use standard library functions that provide locking or interlocked/atomic functionality if available, or if you can't synthesize one from standard library functions you need to break out assembly.

    Assuming you're not running in some weird embedded environment, it can be done with the _Atomic methods.

    If I have the following assembly functions I can do it:

    perform_swap_a would look like this:

    struct table_entry {
       _Atomic struct table_entry *ptr;
       void *attached;
    }
    
    const uintptr_t bitmask = ~3ULL;
    
    void perform_swap_a(size_t index)
    {
       // Lock our side
       uintptr_t selected;
       _Atomic struct table_entry *expected;
       do {
          selected = (uintptr_t)atomic_load_explicit(&A[index].ptr,memory_order_acquire);
          expected = (struct table_entry *)(selected & bitmask);
          // Spin until we locked this one
       } while (atomic_compare_exchange_weak(&A[index].ptr, &expected, (struct table_entry *)((selected & bitmask) + 1));
       // Lock other side
       struct table_entry *bptr = ((struct table_entry *)((uintptr_t)atomic_load_explicit(&A[index].ptr,memory_order_acquire) ~3);
       do {
          selected = (uintptr_t)atomic_load_explicit(&bptr->ptr,memory_order_acquire);
          expected = (struct table_entry *)(selected & bitmask);
          // Spin until we locked this one
       } while (atomic_compare_exchange_weak(&bptr->ptr, &expected, (struct table_entry *)((selected & bitmask) + 1)); 
       // Perform swap
       struct table_entry newentry;
       newentry = A[index];
       A[index_of_NULL_value_A] = A[index]
       A[index] = newentry;
       size_t tmp = index;
       index = index_of_NULL_value_A;
       index_of_NULL_value_A = tmp;
       // Unlock other side
       atomic_store_explicit(&bptr->ptr,(struct table_entry *)((uintptr_t)bptr->ptr & bitmask),  memory_order_release);
       bptr->ptr = (table_entry *)(read_ptr(bptr) & bitmask);
       // Unlock our side
       atomic_store_explicit(&A[index]->ptr, (struct table_entry *)((uintptr_t)bptr->ptr & bitmask),  memory_order_release);
    }
    

    perform_swap_b is similar:

    void perform_swap_b(size_t index)
    {
       // Lock other side
       struct table_entry *aptr;
       uintptr_t selected;
       struct table_entry * expected;
       do {
          // Every time we go around this loop, the pointer in index can change so we must reload.
          aptr = atomic_load_explicit(&B[index].ptr,memory_order_acquire);
          selected = atomic_load_explicit(&aptr->ptr,memory_order_acquire);
          expected = (struct table_entry *)(selected & bitmask);
          // Spin until we locked this one
       } while (aptr == NULL ||
            atomic_compare_exchange_weak(&aptr->ptr, &expected, (struct table_entry *)((selected & bitmask) + 1));
       // Lock our side
       do {
           selected = atomic_load_explicit(&B[index].ptr,memory_order_acquire);
          expected = (struct table_entry *)(selected & bitmask);
          // Spin until we locked this one
       } while (atomic_compare_exchange_weak(&B[index]->ptr, &expected, (struct table_entry *)((selected & bitmask) + 1));
       // Perform swap
       struct table_entry newentry;
       newentry = B[index];
       B[index_of_NULL_value_A] = B[index]
       B[index] = newentry;
       size_t tmp = index;
       index = index_of_NULL_value_B;
       index_of_NULL_value_B = tmp;
       // Unlock our side
       atomic_store_explicit(&B[index]->ptr, (struct table_entry *)((uintptr_t)B[index]->ptr & bitmask), memory_order_release);
       // Unlock other side
       atomic_store_explicit(&aptr->ptr, (struct table_entry *)((uintptr_t)(aptr->ptr) & bitmask);
    }
    

    The whole algorithm only used 1 bit in the pointer so that constant 3 can be changed to a 1 if need be.