uwptaskc++-cxppl

How can I serialize thread execution across a task continuation block?


In the below example, a call to HandleChangesAsync is made from within an asynchronous task, and via an event handler.

Question - Is there a way to ensure that only one thread can execute the HandleChangesAsync create_task + task continuation blocks at a time (even if the task continuation blocks invoke other async functions)?

Note that I can't just use a synchronization primitive because HandleChangesAsync can return before the async operations complete.

void MyClass::DoStuffAsync()
{    
    WeakReference weakThis(this);
    create_task(DoMoreStuffAsync())
        .then([weakThis]())
    {
        auto strongThis = weakThis.Resolve<HomePromotionTemplateViewModel>();
        if (strongThis)
        {
            strongThis->RegisterForChanges();
            strongThis->HandleChangesAsync();
        }
    });
}

void MyClass::RegisterForChanges()
{    
    // Attach event handler to &MyClass::OnSomethingChanged
}

void MyClass::OnSomethingChanged()
{
    HandleChangesAsync();
}

void MyClass::HandleChangesAsync()
{    
    WeakReference weakThis(this);
    create_task(DoMoreCoolStuffAsync(m_needsProtection))
        .then([weakThis]()
    {
        // do async stuff 
        // update m_needsProtection
    })
        .then([weakThis]()
    {
        // do more async stuff 
        // update m_needsProtection
    });
}

Solution

  • Assuming you just want to ignore overlapping requests (vs. queue them up) this is pretty easily accomplished with an atomic Boolean. The following can be pasted into a new XAML page and then just check the output window. There are two threads each racing to call DoStuff() but only one of them will ever execute at a time -- the value of value_ will always be exactly 0 or 16 at the completion of the work; never anything else. If multiple threads executed the work at the same time, you'd potentially get other numbers.

    There's a bunch of silly code to "do work" but the basic is in the compare_exchange_strong() call. It basically says "I expect the value of busy_ to be false, in which case update busy_ to be true and return true (in which case, I'll start doing work). But if the value of busy_ wasn't already false then return false (and I won't do any work)". Note the logical negation ! inside the if :-)

    I'm not an expert in memory ordering so it's possible there's a more efficient way to do this (ie, passing an explicit memory_order value) if you were running in a tight loop, but it should be correct and should suffice for normal UX work:

    #include <atomic>
    #include <ppltasks.h>
    #include <string>
    
    using namespace concurrency;
    
    class Test
    {
      std::atomic<bool> busy_;
      int value_;
    
      task<int> AddNumbersAsync(int x, int y)
      {
        return task<int>{[x, y]
        {
          Sleep(20);
          return x + y;
        }};
      }
    
      void CompleteStuff()
      {
        OutputDebugStringA("** Done with work; value is ");
        OutputDebugStringA(std::to_string(value_).c_str());
        OutputDebugStringA("\r\n");
        busy_ = false;
      }
    
    public:
      Test()
        : busy_{ false }, value_{ 0 }
      {}
    
      void DoStuff()
      {
        // ---
        // This is where the magic happens...
        // ---
        bool expected{ false };
        if (!busy_.compare_exchange_strong(expected, true))
        {
          OutputDebugStringA("Work already in progress; bailing.\r\n");
          return;
        }
    
        OutputDebugStringA("Doing work...\r\n");
        value_ = 2;
    
        try
        {
          AddNumbersAsync(value_, value_).then([this](int i)
          {
            value_ = i;
            return AddNumbersAsync(value_, value_);
          }).then([this](int i)
          {
            value_ = i;
            return AddNumbersAsync(value_, value_);
          }).then([this](task<int> i)
          {
            // ---
            // Handle any async exceptions
            // ---
            try
            {
              value_ = i.get();
            }
            catch (...)
            {
              OutputDebugStringA("Oops, an async exception! Resetting value_\r\n");
              value_ = 0;
            }
            CompleteStuff();
          });
        }
        // ---
        // Handle any sync exceptions
        // ---
        catch (...)
        {
          OutputDebugStringA("Oops, an exception! Resetting value_\r\n");
          value_ = 0;
          CompleteStuff();
        }
      }
    };
    
    Test t;
    
    task<void> TestTask1;
    task<void> TestTask2;
    
    MainPage::MainPage()
    {
      InitializeComponent();
      TestTask1 = task<void>([]
      {
        for (int i = 0; i < 100; i++)
        {
          t.DoStuff();
          Sleep(20);
        }
      });
    
      TestTask1 = task<void>([]
      {
        for (int i = 0; i < 100; i++)
        {
          t.DoStuff();
          Sleep(30);
        }
      });
    }
    

    Note that we need to catch both synchronous exceptions (those potentially caused by the first part of the task chain) and asynchronous exceptions (those caused by one of the tasks or their continuations). We catch asynchronous exceptions by using a continuation of type task<int> and then check for exceptions by explicitly calling get() on the task. In both cases we reset the value_ and the busy_ flag.

    You should see some output such as the following; you can tell that multiple threads are competing with each other since sometimes the OutputDebugString calls get interleaved:

    Doing work...
    Work already in progress; bailing.
    Work already in progress; bailing.
    Work already in progress; bailing.
    Work already in progress; bailing.
    ** Done with work; value is Work already in progress; bailing.
    16