c++c++-actor-framework

Best practice for forwarding messages between typed actors in the C++ Actors Framework?


I'm trying to hand off some work from one typed actor to another. The CAF user manual indicates that this can be done using the forward_to method. That method looks like it is only available to actors that are explicitly of the event_based_actor type. However, forward_to appears to be a thin wrapper over the forward_current_message method, which is defined for all actors of the local_actor type. Therefore, I assume it's okay to call forward_current_message directly?

Also, in order to get message forwarding working with typed actors, I still had to return a response from the intermediate actor. That actor's response seems to be ignored which is good, but am I doing something wrong? Or is it really necessary to pay the (normally minimal) cost of constructing a response that won't be used?

Here's the some working sample code that demonstrates my attempt at message forwarding with typed actors:

#include <iostream>
#include "caf/all.hpp"

using namespace caf;
using namespace std;

using a_type = typed_actor<replies_to<int>::with<bool>>;
using b_type = typed_actor<replies_to<int>::with<bool>>;

actor worker()
{
    return spawn(
        [](event_based_actor *self) -> behavior
        {
            return
            {
                [self](int index)
                {
                    aout(self) << "Worker: " << index << endl;
                    return index;
                }
            };
        });
}

b_type::behavior_type bBehavior(b_type::pointer self)
{
    return
    {
        [self](int value)
        {
            // Create blocking actor
            scoped_actor blockingActor;

            // Spawn pool workers and send each a message
            auto pool = actor_pool::make(value, worker, actor_pool::round_robin());
            for(int i = 0; i < value; ++i)
            {
                blockingActor->send(pool, i);
            }

            // Wait for completion
            vector<int> results;
            int i = 0;
            blockingActor->receive_for(i, value) (
                [&results](int value)
                {
                    results.push_back(value);
                });

            blockingActor->send_exit(pool, exit_reason::user_shutdown);
            self->quit();
            return (value == results.size());
        }
    };
}

class A : public a_type::base
{
protected:
    behavior_type make_behavior() override
    {
        return
        {
            [this](int value) -> bool
            {
                aout(this) << "Number of tasks: " << value << endl;
                b_type forwardDestination = spawn(bBehavior);
                auto castDestination = actor_cast<actor>(forwardDestination);
                this->forward_current_message(castDestination);
                this->quit();
                return false;
            }
        };
    }
};


void tester()
{
    a_type testeeActor = spawn<A>();
    scoped_actor self;
    self->sync_send(testeeActor, 5).await(
        [testeeActor, &self](bool success)
        {
            aout(self) << "All workers completed? " << (success ? "Yes!" : "No :(") << endl;
        });
}

int main()
{
    tester();
    await_all_actors_done();
    shutdown();
    cout << "Press Enter to continue" << endl;
    cin.get();
}

Solution

  • Therefore, I assume it's okay to call forward_current_message directly?

    No, forward_current_message ist not part of the public API in CAF (and is thus not listed in Doxygen). This means the member function could be renamed, removed, or made protected/private at any time.

    The best practice to forward messages to typed actors is delegate. This is a new feature (introduced with 0.14.1) and unfortunately is not mentioned in the manual yet. The best "documentation" currently available is its use in the unit test for typed actors.

    The short version is: delegate is an alternative to send that forwards the responsibility for a request. In a typed actor, you can return delegated<T> instead of T from a message handler to indicate that an other actor will respond with a T to the original sender.

    In your case, class A would be implemented like this:

    class A : public a_type::base
    {
    protected:
        behavior_type make_behavior() override {
            return {
                [this](int value) {
                    aout(this) << "Number of tasks: " << value << endl;
                    auto forwardDestination = spawn(bBehavior);
                    this->quit();
                    return delegate(forwardDestination, value);
                }
            };
        }
    };