Before I get to my main question, I'm assuming that std::mutex::unlock() stops touching this as soon as it puts the mutex in a state where another thread might lock it - even though the call to unlock() might not have returned yet. In other words, the following C++ code is correct:
#include <atomic>
#include <cstddef>
#include <mutex>
struct RefCount {
std::mutex m_;
std::size_t c_{};
// Always construct on heap
static RefCount *make() { return new RefCount; }
virtual ~RefCount() = default;
void incref()
{
m_.lock();
++c_;
m_.unlock();
}
void decref()
{
m_.lock();
auto c = --c_;
m_.unlock();
if (!c)
delete this;
}
protected:
RefCount() noexcept = default;
};
In particular, there should be no undefined behavior if two threads hold the reference count and call decref() - the first thread functionally unlocks the mutex even though unlock() hasn't returned yet, then the second thread unlocks the mutex and destroys the object including the mutex, and only then does the first thread return from the call to unlock().
Unfortunately, examples like the above would seem to preclude straight-forward futex-based mutexes implemented with std::atomic. The usual pattern would be to have 3 states (unlocked, locked, and locked-with-waiters), which might look like the following:
struct BadMutex {
static constexpr uint8_t kUnlocked = 0;
static constexpr uint8_t kLocked = 1;
static constexpr uint8_t kLockedWantNotify = 2;
std::atomic_uint8_t state_{kUnlocked};
void lock()
{
for (;;) {
auto s = state_.exchange(kLocked, std::memory_order_acquire);
if (s != kUnlocked)
s = state_.exchange(kLockedWantNotify, std::memory_order_acquire);
if (s == kUnlocked)
return;
state_.wait(kLockedWantNotify);
}
}
void unlock()
{
if (state_.exchange(kUnlocked, std::memory_order_release) == kLockedWantNotify)
// UB if BadMutex destroyed here
state_.notify_all();
}
};
Unfortunately, if you plug BadMutex into RefCount above, you can get undefined behavior if the second-to-last decref() thread is preempted right before state_.notify_all() and BadMutex is destroyed, because invoking a method on a destroyed object is UB.
Note that using a raw FUTEX_WAKE system call would be fine, because it's no big deal to send a spurious wakeup or even call FUTEX_WAKE on unmapped memory (just get an EFAULT you can ignore). So, in practice, this UB should be no big deal, but of course history doesn't look kindly on intentional UB.
So, my next question is, can I do anything about this? The best I've come up with is to spin during destruction, but it's offensive to have to spin when we have futexes that are specifically designed to avoid spinning. The best I can come up with is something like this:
struct UglyMutex {
static constexpr uint8_t kUnlocked = 0;
static constexpr uint8_t kLocked = 1;
static constexpr uint8_t kLockedWantNotify = 2;
std::atomic_uint8_t state_{kUnlocked};
std::atomic_uint16_t unlocking_{0};
~UglyMutex()
{
while (unlocking_.load(std::memory_order_acquire))
;
}
void lock()
{
for (;;) {
auto s = state_.exchange(kLocked, std::memory_order_acquire);
if (s != kUnlocked)
s = state_.exchange(kLockedWantNotify, std::memory_order_acquire);
if (s == kUnlocked)
return;
state_.wait(kLockedWantNotify);
}
}
void unlock()
{
unlocking_.fetch_add(1, std::memory_order_relaxed);
if (state_.exchange(kUnlocked, std::memory_order_release) == kLockedWantNotify)
state_.notify_all();
unlocking_.fetch_sub(1, std::memory_order_release);
}
};
Am I missing some other trick I could use to do this without spinning - maybe with std::atomic_ref?
Is there any hope that a future version of C++ might provide a safe way to notify on destroyed atomics?
Is there any benefit to the language making this UB, or is this basically a flaw in the language spec that atomics don't expose the full expressiveness of the underlying futexes on which they are implemented?
.notify_one() / .notify_all() on a destructed std::atomic is UB.std::atomic_ref, because the object it points to must outlive the std::atomic_ref object.std::binary_semaphore.wait() and .notify_xxx() to a separate atomic variable that has a longer lifetimeBefore I get to my main question, I'm assuming that
std::mutex::unlock()stops touching this as soon as it puts the mutex in a state where another thread might lock it [...]
That's a valid assumption - the standard even mandates that this type of usage must be supported:
35.5.3.2.1 Class
mutex[thread.mutex.class](2) [ Note: After a thread A has called
unlock(), releasing a mutex, it is possible for another thread B to lock the same mutex, observe that it is no longer in use, unlock it, and destroy it, before thread A appears to have returned from its unlock call. Implementations are required to handle such scenarios correctly, as long as thread A doesn't access the mutex after the unlock call returns. These cases typically occur when a reference-counted object contains a mutex that is used to protect the reference count. — end note ]
Is it possible to build a mutex from C++20
std::atomicwithout spinning to avoid use-after-free when deleting?
Yes, it is possible.
But not as straight-forward as one might expect, due to the lifetime issues you already mentioned.
See 3. Solutions for a list of potential ways you could work around this limitiation.
Is there any hope that a future version of C++ might provide a safe way to notify on destroyed atomics?
There is a paper addressing this specific issue, that could have been part of C++26:
P2616 - Making std::atomic notification/wait operations usable in more situations
Revisions:
| Paper | Date | Target C++ Version |
|---|---|---|
P2616R4 |
2023-02-15 | C++26 |
P2616R3 |
2022-11-29 | C++26 |
P2616R2 |
2022-11-16 | C++26 |
P2616R1 |
2022-11-09 | C++26 |
P2616R0 |
2022-07-05 | C++26 |
The current state of this paper can be seen at cplusplus/papers Issue #1279 on github
(currently the repo is private due to the current committee meeting - archive.org version)
It is stuck with needs-revision since May 23, 2023 - so it's unclear if (and when) it'll ever become part of the standard.
So, my next question is, can I do anything about this?
The best I've come up with is to spin during destruction, but it's offensive to have to spin when we have futexes that are specifically designed to avoid spinning.
There's unfortunately no one-size-fits-all solution for this.
std::binary_semaphore might be an option - its acquire() and release() member functions do the wait / notify atomically, so there should be no lifetime problems.std::counting_semaphore / std::binary_semaphore)Am I missing some other trick I could use to do this without spinning - maybe with std::atomic_ref?
std::atomic_ref doesn't help in this case either unfortunately.
One of its rules is that the object it points to must outlive the atomic_ref object.
(which would not be the case if you destruct the object)
31.7 Class template
atomic_ref[atomics.ref.generic]
(3) The lifetime ([basic.life]) of an object referenced by*ptrshall exceed the lifetime of allatomic_refs that reference the object. While anyatomic_refinstances exist that reference the*ptrobject, all accesses to that object shall exclusively occur through thoseatomic_refinstances. [...]
Is there any benefit to the language making this UB, or is this basically a flaw in the language spec that atomics don't expose the full expressiveness of the underlying futexes on which they are implemented?
That is unfortunately hard to answer given that there is no rationale for the API that's in the standard.
My guess would be that this edge-case was simply missed.
This is a (small, incomplete) list of potential solutions / workarounds.
Depending on your specific usecase those might be helpful.
Like in OP's example.
Not pretty, but it gets the job done.
Might even be an acceptable option assuming it's rather rare that two threads release the mutex in quick succession and the last release happens to free the object containing the mutex.
e.g.: godbolt
class my_mutex {
private:
using state_t = std::uint32_t;
static constexpr state_t StateUnlocked = 0;
static constexpr state_t StateLocked = 1;
static constexpr state_t StateLockedWithWaiters = 2;
static_assert(std::atomic<state_t>::is_always_lock_free);
std::atomic<state_t> state = StateUnlocked;
std::atomic<std::uint32_t> release_count = 0;
public:
void lock() {
state_t expected = StateUnlocked;
if (state.compare_exchange_strong(
expected,
StateLocked,
std::memory_order::acquire,
std::memory_order::relaxed
)) [[likely]] {
return;
}
while (true) {
if (expected != StateLockedWithWaiters) {
expected = state.exchange(
StateLockedWithWaiters,
std::memory_order::acquire
);
if (expected == StateUnlocked) {
return;
}
}
state.wait(StateLockedWithWaiters);
expected = state.load(std::memory_order_relaxed);
}
}
bool try_lock() {
state_t expected = StateUnlocked;
return state.compare_exchange_strong(
expected,
StateLocked,
std::memory_order::acquire,
std::memory_order::relaxed
);
}
void unlock() {
release_count.fetch_add(1, std::memory_order::relaxed);
state_t prev = state.exchange(
StateUnlocked,
std::memory_order::release
);
if (prev == StateLockedWithWaiters) [[unlikely]] {
state.notify_one();
}
release_count.fetch_sub(1, std::memory_order::release);
}
~my_mutex() {
while(release_count.load(std::memory_order::relaxed)) {
// TODO: optional arch specific relax instruction
__builtin_ia32_pause();
}
}
};
Upsides:
Downsides:
std::counting_semaphore / std::binary_semaphoreIt is relatively straightforward to wrap std::binary_semaphore into a custom mutex implementation that supports unlocking the mutex from a different thread than the one that locked it.
e.g.: godbolt
class my_mutex {
private:
std::binary_semaphore sem;
public:
my_mutex() : sem(1) {}
void lock() {
sem.acquire();
}
void unlock() {
sem.release();
}
bool try_lock() {
return sem.try_acquire();
}
template<class Rep, class Period>
bool try_lock_for(std::chrono::duration<Rep, Period> const& timeout) {
return sem.try_acquire_for(timeout);
}
template<class Clock, class Duration>
bool try_lock_until(std::chrono::time_point<Clock, Duration> const& timeout) {
return sem.try_acquire_until(timeout);
}
};
Upsides:
Downsides:
libc++ / libstdc++) versions.
std::atomic_ref and use the futex wait / wake syscallsAnother option would be to use std::atomic_ref for the atomic read & write operations, and manually handle the waking / sleeping part (by calling the syscalls directly).
e.g.: godbolt
class my_mutex {
private:
using state_t = std::uint32_t;
static constexpr state_t StateUnlocked = 0;
static constexpr state_t StateLocked = 1;
static constexpr state_t StateLockedWithWaiters = 2;
static_assert(std::atomic_ref<state_t>::is_always_lock_free);
state_t state = StateUnlocked;
void wait() {
// TODO use WaitOnAddress for windows, ... other oses ...
syscall(
SYS_futex,
&state,
FUTEX_WAIT_PRIVATE,
StateLockedWithWaiters,
NULL
);
}
void wake() {
// TODO use WakeOnAddress for windows, ... other oses ...
syscall(
SYS_futex,
&state,
FUTEX_WAKE_PRIVATE,
1
);
}
public:
void lock() {
state_t expected = StateUnlocked;
if(std::atomic_ref(state).compare_exchange_strong(
expected,
StateLocked,
std::memory_order::acquire,
std::memory_order::relaxed
)) [[likely]] {
return;
}
while(true) {
if(expected != StateLockedWithWaiters) {
expected = std::atomic_ref(state).exchange(
StateLockedWithWaiters,
std::memory_order::acquire
);
if(expected == StateUnlocked) {
return;
}
}
// TODO: maybe spin a little before waiting
wait();
expected = std::atomic_ref(state).load(
std::memory_order::relaxed
);
}
}
bool try_lock() {
state_t expected = StateUnlocked;
return std::atomic_ref(state).compare_exchange_strong(
expected,
StateLocked,
std::memory_order::acquire,
std::memory_order::relaxed
);
}
void unlock() {
state_t prev = std::atomic_ref(state).exchange(
StateUnlocked,
std::memory_order::release
);
if(prev == StateLockedWithWaiters) [[unlikely]] {
wake();
}
}
};
Upsides:
Downsides:
wait() and wake() manually, by directly calling the syscalls.std::atomic_ref::wait when the object gets destroyed)Another option would be to avoid waiting on the atomic variable directly by using another atomic variable (with a longer lifetime) solely for the wait / notify.
This is also how most standard libraries implement std::atomic::wait() for types that are not futex-sized.
(libstdc++ for example has a pool of 16 futexes it uses for waits on atomic variables that are non-futex sized. (__wait_flags::__proxy_wait is the flag used to handle wether the wait will be on the atomic value itself or one of the 16 proxy futexes))
e.g.: godbolt
class waiter {
private:
alignas(std::hardware_destructive_interference_size)
std::atomic<std::uint32_t> counter = 0;
public:
void notify_all() {
counter.fetch_add(1, std::memory_order::release);
counter.notify_all();
}
template <class T>
void wait(
std::atomic<T> const& var,
std::type_identity_t<T> const& oldval
) {
while (true) {
auto counterval = counter.load(std::memory_order::acquire);
if (var.load(std::memory_order::relaxed) != oldval) {
return;
}
counter.wait(counterval);
}
}
};
template <std::size_t N = 256>
class waiter_pool {
private:
static_assert(std::has_single_bit(N), "N should be a power of 2");
waiter waiters[N];
waiter& waiter_for_address(const void *ptr) {
std::uintptr_t addr = reinterpret_cast<std::uintptr_t>(ptr);
addr = std::hash<std::uintptr_t>{}(addr);
return waiters[addr % N];
}
public:
template <class T>
void notify_all(std::atomic<T> const& var) {
waiter& w = waiter_for_address(static_cast<const void*>(&var));
w.notify_all();
}
template <class T>
void wait(
std::atomic<T> const& var,
std::type_identity_t<T> const& oldval
) {
waiter& w = waiter_for_address(static_cast<const void*>(&var));
w.wait(var, oldval);
}
};
waiter_pool pool;
class my_mutex {
private:
using state_t = std::uint8_t;
static constexpr state_t StateUnlocked = 0;
static constexpr state_t StateLocked = 1;
static constexpr state_t StateLockedWithWaiters = 2;
static_assert(std::atomic<state_t>::is_always_lock_free);
std::atomic<state_t> state = StateUnlocked;
public:
void lock() {
state_t expected = StateUnlocked;
if (state.compare_exchange_strong(
expected,
StateLocked,
std::memory_order::acquire,
std::memory_order::relaxed
)) [[likely]] {
return;
}
while (true) {
if (expected != StateLockedWithWaiters) {
expected = state.exchange(
StateLockedWithWaiters,
std::memory_order::acquire
);
if (expected == StateUnlocked) {
return;
}
}
// TODO: maybe spin a little before waiting
pool.wait(state, StateLockedWithWaiters);
expected = state.load(std::memory_order_relaxed);
}
}
bool try_lock() {
state_t expected = StateUnlocked;
return state.compare_exchange_strong(
expected,
StateLocked,
std::memory_order::acquire,
std::memory_order::relaxed
);
}
void unlock() {
state_t prev = state.exchange(
StateUnlocked,
std::memory_order::release
);
if (prev == StateLockedWithWaiters) [[unlikely]] {
pool.notify_all(state);
}
}
};
Upsides:
my_mutex::state does not need to be 32 bits because the futex waiting is delegated to the pool. So instances of my_mutex can be much smaller.Downsides:
async_mutexIf you're already using C++20 coroutines then using something like async_mutex from cppcoro might be an option.
Coroutines also would have the benefit of not blocking the thread while they're waiting for the mutex - instead the coroutine simply suspends until the mutex is available again.
See async_mutex.hpp / async_mutex.cpp for the implementation.
Upsides:
Downsides:
It's also possible to create a mutex than can be used by both coroutines and native threads. (by adapting the async_mutex design to also have a blocking lock method)
e.g.: godbolt
class my_mutex {
private:
struct wait_list_entry {
wait_list_entry* next = nullptr;
virtual void wake() = 0;
};
struct native_waiter final : wait_list_entry {
native_waiter() : sema(0) {}
void wait() { sema.acquire(); }
void wake() override final { sema.release(); }
std::binary_semaphore sema;
};
struct awaitable_default_resumer {
void operator()(std::coroutine_handle<void> h) {
h.resume();
}
};
template<class Resumer>
class awaitable_base : protected wait_list_entry {
protected:
my_mutex& mtx;
Resumer resumer;
std::coroutine_handle<void> coro;
public:
template<class R>
awaitable_base(my_mutex& m, R&& r)
: mtx(m), resumer(std::forward<R>(r)), coro(nullptr) {}
bool await_ready() {
return mtx.try_lock();
}
bool await_suspend(std::coroutine_handle<void> h) {
coro = h;
if(mtx.lock_or_register_waiter(this)) {
coro = nullptr;
return false;
}
return true;
}
protected:
void wake() override final {
resumer(std::exchange(coro, nullptr));
}
};
template<class Resumer>
class awaitable_raw final : public awaitable_base<Resumer> {
public:
using awaitable_base<Resumer>::awaitable_base;
void await_resume() const noexcept {}
template<class NewResumer>
[[nodiscard]]
awaitable_raw<std::remove_cvref_t<NewResumer>> resume_with(NewResumer&& nr) {
return {this->mtx, std::forward<NewResumer>(nr)};
}
};
template<class Resumer>
class awaitable_unique final : public awaitable_base<Resumer> {
public:
using awaitable_base<Resumer>::awaitable_base;
[[nodiscard]]
std::unique_lock<my_mutex> await_resume() {
return {this->mtx, std::adopt_lock};
}
template<class NewResumer>
[[nodiscard]]
awaitable_unique<std::remove_cvref_t<NewResumer>> resume_with(NewResumer&& nr) {
return {this->mtx, std::forward<NewResumer>(nr)};
}
};
private:
static constexpr std::uintptr_t state_unlocked = 0;
static constexpr std::uintptr_t state_locked_no_waiters = 1;
// contains one of three values:
// state_unlocked == mutex is currently unlocked
// state_locked_no_waiters == mutex is locked, no pending waiters
// pointer to waiter_list_entry == head of singly linked list of new waiters in LIFO order
std::atomic<std::uintptr_t> state = state_unlocked;
// waiter queue, will be populated by unlock()
wait_list_entry* waiter_queue = nullptr;
public:
my_mutex() = default;
my_mutex(my_mutex const&) = delete;
my_mutex(my_mutex&&) = delete;
my_mutex& operator=(my_mutex const&) = delete;
my_mutex& operator=(my_mutex&&) = delete;
bool try_lock() {
std::uintptr_t prev_state = state_unlocked;
return state.compare_exchange_strong(
prev_state,
state_locked_no_waiters,
std::memory_order::acquire,
std::memory_order::relaxed
);
}
void lock() {
if(try_lock()) [[likely]] {
return;
}
native_waiter waiter;
if(lock_or_register_waiter(&waiter)) {
return;
}
// enqueued waiter, wait for our turn
waiter.wait();
}
void unlock() {
if(waiter_queue == nullptr) [[likely]] {
std::uintptr_t prev_state = state_locked_no_waiters;
if(state.compare_exchange_strong(
prev_state,
state_unlocked,
std::memory_order::release,
std::memory_order::relaxed
)) [[likely]] {
return;
}
move_waiters_to_waiter_queue();
}
wait_list_entry* entry = waiter_queue;
waiter_queue = entry->next;
entry->wake();
}
[[nodiscard]]
awaitable_raw<awaitable_default_resumer> operator co_await() {
return lock_async();
}
[[nodiscard]]
awaitable_raw<awaitable_default_resumer> lock_async() {
return awaitable_raw<awaitable_default_resumer>{*this, awaitable_default_resumer{}};
}
[[nodiscard]]
awaitable_unique<awaitable_default_resumer> lock_async_unique() {
return awaitable_unique<awaitable_default_resumer>{*this, awaitable_default_resumer{}};
}
private:
// Either locks the mutex synchronously (returns true)
// or adds the waiter to the wait list (returns false)
// The waiter will not be called if the mutex was locked synchronously.
bool lock_or_register_waiter(wait_list_entry* waiter) {
std::uintptr_t prev_state = state.load(std::memory_order::relaxed);
while(true) {
if(prev_state == state_unlocked) {
if(state.compare_exchange_weak(
prev_state,
state_locked_no_waiters,
std::memory_order::acquire,
std::memory_order::relaxed
)) {
return true;
}
} else {
waiter->next = nullptr;
if(prev_state != state_locked_no_waiters) {
waiter->next = reinterpret_cast<wait_list_entry*>(prev_state);
}
std::uintptr_t next_state = reinterpret_cast<std::uintptr_t>(
waiter
);
if(state.compare_exchange_weak(
prev_state,
next_state,
std::memory_order::release,
std::memory_order::relaxed
)) {
return false;
}
}
}
}
void move_waiters_to_waiter_queue() {
std::uintptr_t prev_state = state.exchange(
state_locked_no_waiters,
std::memory_order::acquire
);
// reverse list of waiters so that they are served in FIFO order
// if the mutex does not need to be fair then this loop could be removed
wait_list_entry* current_entry = reinterpret_cast<wait_list_entry*>(prev_state);
wait_list_entry* prev_entry = nullptr;
while(current_entry != nullptr) {
wait_list_entry* next = current_entry->next;
current_entry->next = prev_entry;
prev_entry = current_entry;
current_entry = next;
}
waiter_queue = prev_entry;
}
};
That's kind of the best of both worlds.
Normal threads can still do mutex.lock() and mutex.unlock() (or use std::lock_guard / std::unique_lock / etc...), and coroutines can use co_await mutex.lock_async() and mutex.unlock() (or auto lock = co_await mutex.lock_async_unique() for a std::unique_lock)
Upsides:
Downsides:
.resume_with() can be used to control how coroutines are resumed)unlock() sequenceAnother option would be to give your mutex the option to increment the refcount of its containing object (or give it a shared_ptr to it if it's managed by one, etc...)
That way your unlock() could look like this:
class my_mutex {
some_type* outer;
/* ... */
void unlock() {
if(outer) outer->incref();
if (state_.exchange(kUnlocked, std::memory_order_release) == kLockedWantNotify)
state_.notify_all();
if(outer) outer->decref();
}
/* ... */
};
It's not pretty, but at least it gets rid of the spinlock in the destructor.
The extreme option of this would be to dynamically allocate your mutex state - that way you can keep it alive during the unlock() sequence.
e.g.: godbolt
class my_mutex {
private:
using state_t = std::uint32_t;
static constexpr state_t StateUnlocked = 0;
static constexpr state_t StateLocked = 1;
static constexpr state_t StateLockedWithWaiters = 2;
std::shared_ptr<std::atomic<state_t>> state;
public:
my_mutex()
: state(std::make_shared<std::atomic<state_t>>(StateUnlocked))
{
}
void lock() {
state_t expected = StateUnlocked;
if (state->compare_exchange_strong(
expected,
StateLocked,
std::memory_order::acquire,
std::memory_order::relaxed
)) [[likely]] {
return;
}
while (true) {
if (expected != StateLockedWithWaiters) {
expected = state->exchange(
StateLockedWithWaiters,
std::memory_order::acquire
);
if (expected == StateUnlocked) {
return;
}
}
state->wait(StateLockedWithWaiters);
expected = state->load(std::memory_order_relaxed);
}
}
bool try_lock() {
state_t expected = StateUnlocked;
return state->compare_exchange_strong(
expected,
StateLocked,
std::memory_order::acquire,
std::memory_order::relaxed
);
}
void unlock() {
std::shared_ptr ref = state;
state_t prev = ref->exchange(
StateUnlocked,
std::memory_order::release
);
if (prev == StateLockedWithWaiters) [[unlikely]] {
ref->notify_one();
}
}
};
Upsides:
Downsides: