rustasync-awaitrust-futures

Async Executor terminates early despite Waker clone in Future


I have the following problem:

My async executor exits prematurely after the first iteration, even though a cloned Waker is stored in the Sleep future. The Waker seems to lose its ability to wake the task, and the program terminates without waiting for the sleep duration. However, if I modify how the Waker is created (using Arc::clone + into_raw), it works. I don't understand why my original approach fails.

Executor's Run Loop:

impl Executor {
    pub fn run(&self) -> () {
        while let Ok(task) = self.ready_queue.recv() {
            let mut future_lock = task.future.lock().unwrap();
            if let Some(mut future) = future_lock.take() {
                std::mem::drop(future_lock);
                
                let waker = waker_ref(&task);
                let mut context = std::task::Context::from_waker(&waker);

                if future.as_mut().poll(&mut context).is_pending() {
                    *task.future.lock().unwrap() = Some(future);
                }
            }
        }
    }
}

Trait ArcWake & impl:

trait ArcWake {
    fn wake(self: Arc<Self>) -> () {
        Self::wake_by_ref(&self);
    }

    fn wake_by_ref(arc_self: &Arc<Self>) -> ();
}

impl ArcWake for Task {
    fn wake_by_ref(arc_self: &Arc<Self>) -> () {
        let task = Arc::clone(arc_self);
        arc_self.tx
            .send(task)
            .expect("Failed to place the Task onto the queue!");
    }
}

Waker creation:

(P.S i know it's better to use the futures::task::waker_ref, but i want to use my own implementation for educational purposes)

fn waker_ref<T: ArcWake>(arc_waker: &Arc<T>) -> Waker {
    let ptr = Arc::as_ptr(&arc_waker) as *const ();
    let raw_waker = RawWaker::new(ptr, waker_vtable::<T>());
    unsafe { Waker::from_raw(raw_waker) }
}

fn waker_vtable<T: ArcWake>() -> &'static RawWakerVTable {
    &RawWakerVTable::new(
        RawWakerExtended::clone::<T>, 
        RawWakerExtended::wake::<T>, 
        RawWakerExtended::wake_by_ref::<T>, 
        RawWakerExtended::drop::<T>
    )
}

impl RawWakerExtended {
    unsafe fn clone<T: ArcWake>(ptr: *const ()) -> RawWaker {
        println!("DEBUG clone!");
        unsafe { Arc::increment_strong_count(ptr) };
        RawWaker::new(ptr, waker_vtable::<T>())
    }

    unsafe fn drop<T: ArcWake>(ptr: *const ()) {
        println!("DEBUG drop!");
        unsafe { Arc::from_raw(ptr) };
    }

    unsafe fn wake<T: ArcWake>(ptr: *const ()) {
        println!("DEBUG wake!");
        let data_from_ptr = unsafe { Arc::from_raw(ptr as *const T) };
        ArcWake::wake(data_from_ptr);
    }

    unsafe fn wake_by_ref<T: ArcWake>(ptr: *const ()) {
        println!("DEBUG wake by ref!");
        let data_from_ptr = unsafe { Arc::from_raw(ptr as *const T) };
        ArcWake::wake_by_ref(&data_from_ptr);
    }
}

Sleep Future:

#[derive(Default)]
pub struct Sleep {
    state: Arc<RwLock<State>>
}

#[derive(Default)]
struct State {
    is_ready: bool,
    waker: Option<Waker>
}

impl Future for Sleep {
    type Output = ();

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        if self.state.read().unwrap().is_ready {
            Poll::Ready(())
        } else {
            self.state.write().unwrap().waker = Some(cx.waker().clone());
            Poll::Pending
        }
    }
}

pub fn sleep(dur: std::time::Duration) -> Sleep {
    let sleep = Sleep::default();
    let sleep_state = Arc::clone(&sleep.state);

    std::thread::spawn(move || {
        std::thread::sleep(dur);
        let mut state_lock = sleep_state.write().unwrap();
        state_lock.is_ready = true;
        if let Some(waker) = state_lock.waker.take() {
            waker.wake();
        }
    });

    sleep
}

My observations:

  1. I know, that the loop terminates when the tx half of the mpsc::channel is dropped. But the Waker instance, which is cloned and stored in the Sleep.state.waker, contains the tx field, because Task does and Waker is constructed from the ptr to Arc<Task>.

  2. I also understand, that when the first iteration completes, the initially created Waker instance goes out of scope, but before it happens, the clone of the Waker in the poll() method occurs. So i believe it should still be valid.

Outputs:

  1. if i use Arc::as_ptr() in waker_ref() without wrapping RawWaker into std::mem::ManuallyDrop
sleep for 1 second!
DEBUG clone!  // Waker cloned (strong count = 2)
DEBUG drop!    // Waker dropped (strong count = 1)
// Program exits here without waiting
  1. if i either use Arc::clone() + Arc::into_raw() instead of Arc::as_ptr() OR if i use Arc::as_ptr(), but now wrapping RawWaker into std::mem::ManuallyDrop
sleep for 1 second!
DEBUG clone!    // strong count = 2
DEBUG drop!     // strong count = 1 (after Waker drop)
DEBUG wake!     // Sleep thread wakes the executor
wake up!
DEBUG drop!     // Final drop

(So the 2nd approach works, but the 1st doesn't.)

Questions:

  1. Why does the original as_ptr approach fail, even though clone increments the strong count? The stored Waker should keep the Arc alive, but the executor exits early.

  2. Why wrapping RawWaker into std::mem::ManuallyDrop helps, as well as using Arc::clone() + Arc::into_raw() instead of Arc::as_ptr()?

Please, note, that i'm a beginner and want to have a firm understanding of these concepts, so any help would be very appreciated. Thanks a lot for time!


Solution

  • The core issue is that Arc::as_ptr does not keep the Arc alive - it just gets a pointer to the contents. If task was the last handle to that Arc, then it will be destroyed when the handle goes out of scope at the end of your run loop. This the pointer you've given to the waker may/will be dangling when it goes to use it.

    You can see this in action if you add a wait after your executor stops running to hit the waker from sleep. Running this under miri (as one should always do with unsafe code) shows an error:

    fn main() {
        let (executor, spawner) = new_executor_and_spawner();
    
        spawner.spawn(async {
            println!("sleep for 1 second!");
            sleep(std::time::Duration::from_secs(1)).await;
            println!("wake up!");
        });
    
        std::mem::drop(spawner);
    
        executor.run();
    
        std::thread::sleep(std::time::Duration::from_secs(2)); // added this
    }
    
    error: Undefined Behavior: in-bounds pointer arithmetic failed: alloc1191 has been freed, so this pointer is dangling
        --> /playground/.rustup/toolchains/nightly-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/alloc/src/sync.rs:1716:27
         |
    1716 |             let arc_ptr = ptr.byte_sub(offset) as *mut ArcInner<T>;
         |                           ^^^^^^^^^^^^^^^^^^^^ in-bounds pointer arithmetic failed: alloc1191 has been freed, so this pointer is dangling
         |
         = help: this indicates a bug in the program: it performed an invalid operation, and caused Undefined Behavior
         = help: see https://doc.rust-lang.org/nightly/reference/behavior-considered-undefined.html for further information
    help: alloc1191 was allocated here:
        --> src/main.rs:32:20
         |
    32   |           let task = Arc::new(Task {
         |  ____________________^
    33   | |             future: Mutex::new(Some(boxed_future)),
    34   | |             tx: self.tx.clone(),
    35   | |         });
         | |__________^
    help: alloc1191 was deallocated here:
        --> src/main.rs:56:9
         |
    56   |         }
         |         ^
         = note: BACKTRACE (of the first span) on thread `unnamed-1`:
         = note: inside `std::sync::Arc::<Task>::from_raw_in` at /playground/.rustup/toolchains/nightly-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/alloc/src/sync.rs:1716:27: 1716:47
         = note: inside `std::sync::Arc::<Task>::from_raw` at /playground/.rustup/toolchains/nightly-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/alloc/src/sync.rs:1468:18: 1468:47
    note: inside `RawWakerExtended::wake::<Task>`
        --> src/main.rs:89:38
         |
    89   |         let data_from_ptr = unsafe { Arc::from_raw(ptr as *const T) };
         |                                      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
         = note: inside `std::task::Waker::wake` at /playground/.rustup/toolchains/nightly-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/core/src/task/wake.rs:447:18: 447:59
    note: inside closure
        --> src/main.rs:154:13
         |
    154  |             waker.wake();
         |             ^^^^^^^^^^^^
    

    Full code on the playground.

    Your alternative with Arc::into_raw is the correct way to keep a strong reference alive when using a pointer. The documentation for Arc::from_raw even says as much that it must have come from an Arc::into_raw call. Running miri with the change does not report any issues.

    With that revelation, everything else is explained: the task not being kept alive means the Task and thus the tx is being dropped prematurely and so your executor loop exits early.

    Why does the original as_ptr approach fail, even though clone increments the strong count? The stored Waker should keep the Arc alive, but the executor exits early.

    You are right that increment_strong_count is being called, specifically when Sleep clones it from the context to keep it for itself. However, at this point you have three "handles" to the Arc: one in task, one in waker, and one in Sleep::state but you only have a strong reference count of two! The waker and task will be dropped at the end of your loop which will bring the count down to zero and drop everything - leaving the poor Sleep dangling.

    Why wrapping RawWaker into std::mem::ManuallyDrop helps, as well as using Arc::clone() + Arc::into_raw() instead of Arc::as_ptr()?

    Simple, it doesn't drop the inner waker and thus nothing happens when waker leaves scope. So it doesn't increment the reference count on creation (Arc::as_ptr) but it also doesn't decrement it on drop (due to drop not being called at all).