boostshared-memoryinterprocessconditional-variable

Producer consumer using boost::interprocess_confition with boost:interprocess shared memory. Consumer dominates 100%


Just making a simple example because I am having issues with a more complex usecase and want to udnerstand the base case before spending too much time in trial and error.

Scenario: I have two binaries that supposedly takes turns incrementing a number (stored in shared memory). What happens in practice is that the "consumer" app takes over 100% never letting the "creator" run.

If I add a small delay in the consumer in that case I obtain the intended behaviour.

Simple POD struct

#pragma once

#include <boost/interprocess/managed_shared_memory.hpp>
#include <boost/interprocess/containers/vector.hpp>
#include <boost/interprocess/allocators/allocator.hpp>
#include <boost/interprocess/sync/interprocess_mutex.hpp>
#include <boost/interprocess/sync/interprocess_condition.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>

namespace bip = boost::interprocess;

namespace my_namespace {

static const char *name = "MySharedMemory";

struct MyStruct {
  bip::interprocess_mutex mutex;
  bip::interprocess_condition cond;
  unsigned long counter;

  MyStruct(): mutex(), cond(), counter(0) {
  }
};

}  // namespace my_namespace

"Creator/producer"

#include <signal.h>
#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>

#include <iostream>
#include <boost/interprocess/managed_shared_memory.hpp>
#include <boost/interprocess/allocators/allocator.hpp>
#include <boost/interprocess/sync/interprocess_mutex.hpp>
#include <boost/thread/locks.hpp>

#include "my_struct.h"

bool exit_flag = false;

void my_handler(int) {
  exit_flag = true;
}

namespace bip = boost::interprocess;

int main() {
  struct sigaction sigIntHandler;
  sigIntHandler.sa_handler = my_handler;
  sigemptyset(&sigIntHandler.sa_mask);
  sigIntHandler.sa_flags = 0;
  sigaction(SIGINT, &sigIntHandler, NULL);

  bip::shared_memory_object::remove(my_namespace::name);
  auto memory = bip::managed_shared_memory(bip::create_only, my_namespace::name, 65536);
  auto *data = memory.construct<my_namespace::MyStruct>(my_namespace::name)();

  long unsigned iterations = 0;
  while (!exit_flag) {
    boost::interprocess::scoped_lock lock(data->mutex);
    data->counter++;
    std::cout << "iteration:" << iterations << "Counter: " << data->counter << std::endl;
    ++iterations;
    auto start = boost::posix_time::microsec_clock::universal_time();
    auto wait_time = start + boost::posix_time::milliseconds(1000);
    auto ret = data->cond.timed_wait(lock, wait_time);
    if (!ret) {
      std::cout << "Timeout" << std::endl;
    }
  }
  return 0;
}

Consumer

#include <signal.h>
#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
#include <sched.h>
#include <chrono>

#include <iostream>
#include <thread>
#include <mutex>

#include "my_struct.h"

bool exit_flag = false;

void my_handler(int) {
  exit_flag = true;
}

namespace bip = boost::interprocess;

int fib(int x) {
  if ((x == 1) || (x == 0)) {
    return (x);
  } else {
    return (fib(x - 1) + fib(x - 2));
  }
}

int main() {
  struct sigaction sigIntHandler;
  sigIntHandler.sa_handler = my_handler;
  sigemptyset(&sigIntHandler.sa_mask);
  sigIntHandler.sa_flags = 0;
  sigaction(SIGINT, &sigIntHandler, nullptr);

  auto memory = bip::managed_shared_memory(bip::open_only, my_namespace::name);
  auto *data = memory.find<my_namespace::MyStruct>(my_namespace::name).first;

  long unsigned iterations = 0;
  while (!exit_flag) {
    {
      boost::interprocess::scoped_lock lock(data->mutex);
      std::this_thread::sleep_for(std::chrono::milliseconds(200));
      data->counter += 1;
      std::cout << "iteration:" << iterations << "Counter: " << data->counter << std::endl;
      ++iterations;
      std::cout << "notify_one" << std::endl;
      data->cond.notify_one();
    }
//    usleep(1);  // If I add this it works
  }
  return 0;
}

If someone can shed some light I would be grateful.


Solution

  • You're doing sleeps while holding the lock. This maximizes lock contention. E.g. in your consumer

    boost::interprocess::scoped_lock lock(data->mutex);
    std::this_thread::sleep_for(200ms);
    

    Could be

    std::this_thread::sleep_for(200ms);
    boost::interprocess::scoped_lock lock(data->mutex);
    

    Mutexes are supposed to synchronize access to shared resources. As long as you do not require exclusive access to the shared resource, don't hold the lock. In general, make access atomic and as short as possible in any locking scenario.

    Side Notes

    You don't need the complicated posix_time manipulation:

    auto ret = data->cond.wait_for(lock, 1000ms);
    if (bip::cv_status::timeout == ret) {
        std::cout << "Timeout" << std::endl;
    }
    

    Just for sharing a single POD struct, managed_shared_memory is a lot of overkill. Consider mapped_region.

    Consider Asio for signal handling. In any case, make the exit_flag atomic so you don't suffer a data race:

    static std::atomic_bool exit_flag{false};
    {
        struct sigaction sigIntHandler;
        sigIntHandler.sa_handler = [](int) { exit_flag = true; };
        sigemptyset(&sigIntHandler.sa_mask);
        sigIntHandler.sa_flags = 0;
        sigaction(SIGINT, &sigIntHandler, NULL);
    }
    

    Since your application is symmetrical, I'd expect the signaling to be symmetrical. If not, I'd expect the producing side to do signaling (after all, presumably there is nothing to consume when nothing was produced. Why be "busy" when you know nothing was produced?).

    Live Demo

    Live On Coliru

    #include <boost/interprocess/managed_shared_memory.hpp>
    #include <boost/interprocess/sync/interprocess_condition.hpp>
    #include <boost/interprocess/sync/interprocess_mutex.hpp>
    #include <iostream>
    #include <mutex>
    #include <thread>
    
    #include <signal.h>
    #include <unistd.h>
    
    #ifdef COLIRU // coliru doesn't support shared memory
        #include <boost/interprocess/managed_mapped_file.hpp>
        #define managed_shared_memory managed_mapped_file
    #endif
    
    namespace bip = boost::interprocess;
    using namespace std::chrono_literals;
    
    namespace my_namespace {
        static char const* name = "MySharedMemory";
    
        struct MyStruct {
            bip::interprocess_mutex     mutex;
            bip::interprocess_condition cond;
            unsigned long               counter = 0;
        };
    
    }  // namespace my_namespace
    
    namespace producer {
    
        void run() {
            static std::atomic_bool exit_flag{false};
            {
                struct sigaction sigIntHandler;
                sigIntHandler.sa_handler = [](int) { exit_flag = true; };
                sigemptyset(&sigIntHandler.sa_mask);
                sigIntHandler.sa_flags = 0;
                sigaction(SIGINT, &sigIntHandler, NULL);
            }
    
            bip::shared_memory_object::remove(my_namespace::name);
            auto  memory = bip::managed_shared_memory(bip::create_only, my_namespace::name, 65536);
            auto& data   = *memory.construct<my_namespace::MyStruct>(my_namespace::name)();
    
            for (size_t iterations = 0; !exit_flag;) {
                std::unique_lock lock(data.mutex);
                data.counter++;
                std::cout << "iteration:" << iterations << " Counter: " << data.counter << std::endl;
                ++iterations;
    
                auto ret = data.cond.wait_for(lock, 1000ms);
    
                if (bip::cv_status::timeout == ret) {
                    std::cout << "Timeout" << std::endl;
                }
            }
        }
    }
    
    namespace consumer {
        namespace bip = boost::interprocess;
    
        void run() {
            static std::atomic_bool exit_flag{false};
            {
                struct sigaction sigIntHandler;
                sigIntHandler.sa_handler = [](int) { exit_flag = true; };
                sigemptyset(&sigIntHandler.sa_mask);
                sigIntHandler.sa_flags = 0;
                sigaction(SIGINT, &sigIntHandler, nullptr);
            }
    
            bip::managed_shared_memory memory(bip::open_only, my_namespace::name);
            auto& data = *memory.find<my_namespace::MyStruct>(my_namespace::name).first;
    
            for (size_t iterations = 0; !exit_flag;) {
                std::this_thread::sleep_for(200ms);
    
                std::unique_lock lock(data.mutex);
                data.counter += 1;
                std::cout << "iteration:" << iterations << " Counter: " << data.counter << std::endl;
                ++iterations;
                std::cout << "notify_one" << std::endl;
                data.cond.notify_one();
            }
        }
    }
    
    int main(int argc, char**) {
        if (argc>1)
            producer::run();
        else
            consumer::run();
    }
    

    Testing with

    g++ -std=c++20 -O2 -pthread main.cpp -lrt -DCOLIRU
    ./a.out producer&
    sleep 1;
    ./a.out&
    sleep 4; kill -INT %2; sleep 3; 
    ./a.out&
    sleep 4; kill -INT %1 %2 %3
    

    Prints e.g.

    PRODUCER iteration:0 Counter: 1
    PRODUCER Timeout
    PRODUCER iteration:1 Counter: 2
    CONSUMER iteration:0 Counter: 3
    CONSUMER notify_one
    PRODUCER iteration:2 Counter: 4
    CONSUMER iteration:1 Counter: 5
    CONSUMER notify_one
    PRODUCER iteration:3 Counter: 6
    CONSUMER iteration:2 Counter: 7
    CONSUMER notify_one
    PRODUCER iteration:4 Counter: 8
    CONSUMER iteration:3 Counter: 9
    CONSUMER notify_one
    PRODUCER iteration:5 Counter: 10
    CONSUMER iteration:4 Counter: 11
    CONSUMER notify_one
    PRODUCER iteration:6 Counter: 12
    CONSUMER iteration:5 Counter: 13
    CONSUMER notify_one
    PRODUCER iteration:7 Counter: 14
    CONSUMER iteration:6 Counter: 15
    CONSUMER notify_one
    PRODUCER iteration:8 Counter: 16
    CONSUMER iteration:7 Counter: 17
    CONSUMER notify_one
    PRODUCER iteration:9 Counter: 18
    CONSUMER iteration:8 Counter: 19
    CONSUMER notify_one
    PRODUCER iteration:10 Counter: 20
    CONSUMER iteration:9 Counter: 21
    CONSUMER notify_one
    PRODUCER iteration:11 Counter: 22
    CONSUMER iteration:10 Counter: 23
    CONSUMER notify_one
    PRODUCER iteration:12 Counter: 24
    CONSUMER iteration:11 Counter: 25
    CONSUMER notify_one
    PRODUCER iteration:13 Counter: 26
    CONSUMER iteration:12 Counter: 27
    CONSUMER notify_one
    PRODUCER iteration:14 Counter: 28
    CONSUMER iteration:13 Counter: 29
    CONSUMER notify_one
    PRODUCER iteration:15 Counter: 30
    CONSUMER iteration:14 Counter: 31
    CONSUMER notify_one
    PRODUCER iteration:16 Counter: 32
    CONSUMER iteration:15 Counter: 33
    CONSUMER notify_one
    PRODUCER iteration:17 Counter: 34
    CONSUMER iteration:16 Counter: 35
    CONSUMER notify_one
    PRODUCER iteration:18 Counter: 36
    CONSUMER iteration:17 Counter: 37
    CONSUMER notify_one
    PRODUCER iteration:19 Counter: 38
    CONSUMER iteration:18 Counter: 39
    CONSUMER notify_one
    PRODUCER iteration:20 Counter: 40
    CONSUMER iteration:19 Counter: 41
    CONSUMER notify_one
    PRODUCER iteration:21 Counter: 42
    PRODUCER Timeout
    PRODUCER iteration:22 Counter: 43
    PRODUCER Timeout
    PRODUCER iteration:23 Counter: 44
    PRODUCER Timeout
    PRODUCER iteration:24 Counter: 45
    CONSUMER iteration:0 Counter: 46
    CONSUMER notify_one
    PRODUCER iteration:25 Counter: 47
    CONSUMER iteration:1 Counter: 48
    CONSUMER notify_one
    PRODUCER iteration:26 Counter: 49
    CONSUMER iteration:2 Counter: 50
    CONSUMER notify_one
    PRODUCER iteration:27 Counter: 51
    CONSUMER iteration:3 Counter: 52
    CONSUMER notify_one
    PRODUCER iteration:28 Counter: 53
    CONSUMER iteration:4 Counter: 54
    CONSUMER notify_one
    PRODUCER iteration:29 Counter: 55
    CONSUMER iteration:5 Counter: 56
    CONSUMER notify_one
    PRODUCER iteration:30 Counter: 57
    CONSUMER iteration:6 Counter: 58
    CONSUMER notify_one
    PRODUCER iteration:31 Counter: 59
    CONSUMER iteration:7 Counter: 60
    CONSUMER notify_one
    PRODUCER iteration:32 Counter: 61
    CONSUMER iteration:8 Counter: 62
    CONSUMER notify_one
    PRODUCER iteration:33 Counter: 63
    CONSUMER iteration:9 Counter: 64
    CONSUMER notify_one
    PRODUCER iteration:34 Counter: 65
    CONSUMER iteration:10 Counter: 66
    CONSUMER notify_one
    PRODUCER iteration:35 Counter: 67
    CONSUMER iteration:11 Counter: 68
    CONSUMER notify_one
    PRODUCER iteration:36 Counter: 69