I have two processes P1 and P2 implementing IPC using boost::interprocess::managed_shared_memory
. Both use the following snippet of code:
namespace ipc = boost::interprocess;
// Open a shared memory segment with given name and size
ipc::managed_shared_memory segment_(ipc::open_or_create, "shared_memory_segment", 1024);
// Immediately proceed to create objects using find_or_construct
auto heartbeat_p1 = segment_.find_or_construct<std::atomic<int64_t>>("heartbeat_p1")(0);
// .. other shared objects
// And then proceed to use them
I think the usage of open_or_create
isn't thread-safe here, and does it require an external synchronization?
What might go wrong is either of two cases -
Both P1 and P2 enter the code for the constructor of segment_
. And since open_or_create
is itself a logic that can be represented as:
try {
create();
}
catch {
if (already_exists_error){
try {
open();
}
}
else {
throw();
}
}
We might have a race condition if both try to create()
at the same time?
I have seen my setup behaving weirdly in which the objects in boost managed shared memory aren't really "shared" across process. And there is no exception being thrown either.
Reference for try-catch block: Is there a better way to check for the existence of a boost shared memory segment?.
c++ Synchronize shared memory when reading also mentions to use external synchronization for reading from a process.
Is my understanding correct that we need external synchronization in using open_or_create
from two different processes? And if yes, how do I properly synchronize so that both can use open_or_create
?
So I went through the boost source code, and found following observations:
TL;DR : NOPE, we do not need any external synchronization here, since boost source ensures that all calls are synchronized either by OS system calls, or by the atomic reads and compare-and-swap writes into the starting address of the shared memory segment used as an enum.
Longer version: If we sail our way through boost managed_memory_segment source code, all the important bits can be found in this object's constructor:
managed_open_or_create_impl.hpp:
// boost::ipc::managed_shared_memory dervies from:
// Skipping non-important arguments and templates for this answer
class managed_open_or_create_impl
{
managed_open_or_create_impl()
{
priv_open_or_create();
}
void priv_open_or_create()
{
bool created = false;
// more code ...
else { //DoOpenOrCreate
created = this->do_create_else_open(dev, id, size, perm);
}
// on basis of created variable
if(created){
this->do_map_after_create(dev, m_mapped_region, size, addr, construct_func);
}
else{
this->do_map_after_open(dev, m_mapped_region, addr, construct_func, ronly, cow);
}
}
}
So we have 3 functions of interest here: do_create_else_open(), do_map_after_create(), do_map_after_open().
Let's go through them one-by-one:
template <class DeviceId>
static bool do_create_else_open(DeviceAbstraction &dev, const DeviceId & id, std::size_t size, const permissions &perm)
{
spin_wait swait;
unsigned tries = 0;
while(1){
BOOST_TRY{
create_device<FileBased>(dev, id, size, perm, file_like_t());
return true;
}
BOOST_CATCH(interprocess_exception &ex){
#ifndef BOOST_NO_EXCEPTIONS
if(ex.get_error_code() != already_exists_error){
BOOST_RETHROW
}
else if (++tries == MaxCreateOrOpenTries) {
//File existing when trying to create, but non-existing when
//trying to open, and tried MaxCreateOrOpenTries times. Something fishy
//is happening here and we can't solve it
throw interprocess_exception(error_info(corrupted_error));
}
else{
BOOST_TRY{
DeviceAbstraction tmp(open_only, id, read_write);
dev.swap(tmp);
return false;
}
BOOST_CATCH(interprocess_exception &e){
if(e.get_error_code() != not_found_error){
BOOST_RETHROW
}
}
BOOST_CATCH(...){
BOOST_RETHROW
} BOOST_CATCH_END
}
#endif //#ifndef BOOST_NO_EXCEPTIONS
}
BOOST_CATCH(...){
BOOST_RETHROW
} BOOST_CATCH_END
swait.yield();
}
return false;
}
Okay, so this is simple, it wraps the try-catch block inside the create_device calls. The create_device expands to shm_open() and is bound to be atomic due to it being an OS system call, similar to file open. So, one process is bound to throw and enters the catch block, where it simply attaches the the OS shared memory already created.
And the one who creates it calls do_map_after_create(). The one who attaches calls do_map_after_open().
do_map_after_create():
void do_map_after_create()
{
BOOST_TRY{
//If this throws, we are lost
truncate_device<FileBased>(dev, static_cast<offset_t>(size), file_like_t());
//If the following throws, we will truncate the file to 1
mapped_region region(dev, read_write, 0, 0, addr);
boost::uint32_t *patomic_word = 0; //avoid gcc warning
patomic_word = static_cast<boost::uint32_t*>(region.get_address());
boost::uint32_t previous = atomic_cas32(patomic_word, InitializingSegment, UninitializedSegment);
if(previous == UninitializedSegment){
BOOST_TRY{
construct_func( static_cast<char*>(region.get_address()) + ManagedOpenOrCreateUserOffset
, size - ManagedOpenOrCreateUserOffset, true);
//All ok, just move resources to the external mapped region
final_region.swap(region);
}
BOOST_CATCH(...){
atomic_write32(patomic_word, CorruptedSegment);
BOOST_RETHROW
} BOOST_CATCH_END
atomic_write32(patomic_word, InitializedSegment);
}
else{
atomic_write32(patomic_word, CorruptedSegment);
throw interprocess_exception(error_info(corrupted_error));
}
}
BOOST_CATCH(...){
BOOST_TRY{
truncate_device<FileBased>(dev, 1u, file_like_t());
}
BOOST_CATCH(...){
}
BOOST_CATCH_END
BOOST_RETHROW
}
BOOST_CATCH_END
}
Now, do_map_after_open(), broken into 2 parts: Part-1:
void do_map_after_open()
{
const usduration TimeoutSec(usduration_seconds(MaxInitializeTimeSec));
if(FileBased){
offset_t filesize = 0;
spin_wait swait;
//If a file device was used, the creator might be truncating the device, so wait
//until the file size is enough to map the initial word
ustime ustime_start = microsec_clock<ustime>::universal_time();
while(1){
if(!get_file_size(file_handle_from_mapping_handle(dev.get_mapping_handle()), filesize)){
error_info err = system_error_code();
throw interprocess_exception(err);
}
if (filesize != 0)
break;
else {
//More than MaxZeroTruncateTimeSec seconds waiting to the creator
//to minimally increase the size of the file: something bad has happened
const usduration elapsed(microsec_clock<ustime>::universal_time() - ustime_start);
if (elapsed > TimeoutSec){
throw interprocess_exception(error_info(corrupted_error));
}
swait.yield();
}
}
//The creator detected an error creating the file and signalled it with size 1
if(filesize == 1){
throw interprocess_exception(error_info(corrupted_error));
}
}
// .. more code later
}
In part-1:
Part-2
{
// continued;
mapped_region region(dev, ronly ? read_only : (cow ? copy_on_write : read_write), 0, 0, addr);
boost::uint32_t *patomic_word = static_cast<boost::uint32_t*>(region.get_address());
boost::uint32_t value = atomic_read32(patomic_word);
if (value != InitializedSegment){
ustime ustime_start = microsec_clock<ustime>::universal_time();
spin_wait swait;
while ((value = atomic_read32(patomic_word)) != InitializedSegment){
if(value == CorruptedSegment){
throw interprocess_exception(error_info(corrupted_error));
}
//More than MaxZeroTruncateTimeSec seconds waiting to the creator
//to minimally increase the size of the file: something bad has happened
const usduration elapsed(microsec_clock<ustime>::universal_time() - ustime_start);
if (elapsed > TimeoutSec){
throw interprocess_exception(error_info(corrupted_error));
}
swait.yield();
}
//The size of the file might have grown while Uninitialized -> Initializing, so remap
{
mapped_region null_map;
region.swap(null_map);
}
mapped_region final_size_map(dev, ronly ? read_only : (cow ? copy_on_write : read_write), 0, 0, addr);
final_size_map.swap(region);
}
construct_func( static_cast<char*>(region.get_address()) + ManagedOpenOrCreateUserOffset
, region.get_size() - ManagedOpenOrCreateUserOffset
, false);
//All ok, just move resources to the external mapped region
final_region.swap(region);
}
Thus, all calls are synchronized either by OS system calls, or by the atomic reads and compare-and-swap enum writes into the starting address of the shared memory segment.
Edit: I finally found the issue for which Processes were not able to attach themselves to the same shared memory segment. It was due to systemd deleting the shared memory resources for a user account on logouts. Link: https://superuser.com/a/1179962/1818191