erlangelixirerlang-otpgen-servergossip

Elixir: Genserver.call not initiaing handle_call


I am implementing the Gossip Algorithm in which multiple actors spread a gossip at the same time in parallel. The system stops when each of the Actor has listened to the Gossip for 10 times.

Now, I have a scenario in which I am checking the listen count of the recipient actor before sending the gossip to it. If the listen count is already 10, then gossip will not be sent to the recipient actor. I am doing this using synchronous call to get the listen count.

def get_message(server, msg) do
    GenServer.call(server, {:get_message, msg})
end

def handle_call({:get_message, msg}, _from, state) do
    listen_count = hd(state) 
    {:reply, listen_count, state}
end

The program runs well in the starting but after some time the Genserver.call stops with a timeout error like following. After some debugging, I realized that the Genserver.call becomes dormant and couldn't initiate corresponding handle_call method. Is this behavior expected while using synchronous calls? Since all actors are independent, shouldn't the Genserver.call methods be running independently without waiting for each others response.

02:28:05.634 [error] GenServer #PID<0.81.0> terminating
    ** (stop) exited in: GenServer.call(#PID<0.79.0>, {:get_message, []}, 5000)
    ** (EXIT) time out
    (elixir) lib/gen_server.ex:774: GenServer.call/3

Edit: The following code can reproduce the error when running in iex shell.

defmodule RumourActor do
use GenServer

def start_link(opts) do
    {:ok, pid} = GenServer.start_link(__MODULE__,opts)
    {pid}
end

def set_message(server, msg, recipient) do      
    GenServer.cast(server, {:set_message, msg, server, recipient})
end

def get_message(server, msg) do
    GenServer.call(server, :get_message)
end

def init(opts) do
    state=opts
    {:ok,state}
end

def handle_cast({:set_message, msg, server, recipient},state) do
  :timer.sleep(5000)
  c = RumourActor.get_message(recipient, [])
  IO.inspect c
  {:noreply,state}
end

def handle_call(:get_message, _from, state) do
    count = tl(state)
    {:reply, count, state}
end
end

Open iex shell and load above module. Start two processes using:

a = RumourActor.start_link(["", 3])
b = RumourActor.start_link(["", 5])

Produce error by calling a deadlock condition as mentioned by Dogbert in comments. Run following without much time difference.

cb = RumourActor.set_message(elem(a,0), [], elem(b,0))
ca = RumourActor.set_message(elem(b,0), [], elem(a,0))

Wait for 5 seconds. Error will appear.


Solution

  • A gossip protocol is a way of dealing with asynchronous, unknown, unconfigured (random) networks that may be suffering intermittent outages and partitions and where no leader or default structure is present. (Note that this situation is somewhat unusual in the real world and out-of-band control is always imposed on systems in some way.)

    With that in mind, let's change this to be an asynchronous system (using cast) so that we are following the spirit of the concept of chatty gossip style communication.

    We need digest of messages that counts how many times a given message has been received, a digest of messages that have been received and are already over the magic number (so we don't re-send one if it is way late), and a list of processes enrolled in our system so we know to whom we are broadcasting:

    (The following example is in Erlang because I just trip over Elixir syntax ever since I stopped using it...)

    -module(rumor).
    
    -record(s,
            {peers  = []         :: [pid()],
             digest = #{}        :: #{message_id(), non_neg_integer()},
             dead   = sets:new() :: sets:set(message_id())}).
    
    -type message_id() :: zuuid:uuid().
    

    Here I am using a UUID, but it could be whatever. An Erlang reference would be fine for a test case, but since gossip isn't useful within an Erlang cluster, and references are unsafe outside the originating system I'm just jumping to the assumption this is for a networked system.

    We will need an interface function that allows us to tell a process to inject a new message into the system. We will also need an interface function that sends a message between two processes once it is already in the system. Then we will need an inner function that broadcasts messages to all the known (subscribed) peers. Ah, that means we need a greeting interface so that peer processes can notify each other of their presence.

    We will also want a way to have a process tell itself to keep broadcasting over time. How long to set the interval on retransmission is not actually a simple decision -- it has everything to do with network topology, latency, variability, etc (you would actually probably occasionally ping peers and develop some heuristic based on the latency, drop peers that seem unresponsive, and so on -- but we're not going to get into that madness here). Here I'm just going to set it for 1 second because that is an easy to interpret interval for humans observing the system.

    Note that everything below is asynchronous.

    Interfaces...

    insert(Pid, Message) ->
        gen_server:cast(Pid, {insert, Message}).
    
    relay(Pid, ID, Message) ->
        gen_server:cast(Pid, {relay, ID, Message}).
    
    greet(Pid) ->
        gen_server:cast(Pid, {greet, self()}).
    
    make_introduction(Pid, PeerPid) ->
        gen_server:cast(Pid, {make_introduction, PeerPid}).
    

    That last function is going to be our way as testers of the system to cause one of the processes to call greet/1 on some target Pid so they start to build a peer network. In the real world something slightly different usually goes on.

    Inside our gen_server callback for receiving a cast we will get:

    handle_cast({insert, Message}, State) ->
        NewState = do_insert(Message, State);
        {noreply, NewState};
    handle_cast({relay, ID, Message}, State) ->
        NewState = do_relay(ID, Message, State),
        {noreply, NewState};
    handle_cast({greet, Peer}, State) ->
        NewState = do_greet(Peer, State),
        {noreply, NewState};
    handle_cast({make_introduction, Peer}, State) ->
        NewState = do_make_introduction(Peer, State),
        {noreply, NewState}.
    

    Pretty simple stuff.

    Above I mentioned that we would need a way for this thing to tell itself to resend after a delay. To do that we are going to send ourselves a naked message to "redo_relay" after a delay using erlang:send_after/3 so we are going to need a handle_info/2 to deal with it:

    handle_info({redo_relay, ID, Message}, State) ->
        NewState = do_relay(ID, Message, State),
        {noreply, NewState}.
    

    Implementation of the message bits is the fun part, but none of this is terribly tricky. Forgive the do_relay/3 below -- it could be more concise, but I'm writing this in a browser off the top of my head, so...

    do_insert(Message, State = #s{peers = Peers, digest = Digest}) ->
        MessageID = zuuid:v1(),
        NewDigest = maps:put(MessageID, 1, Digest),
        ok = broadcast(Message, Peers),
        ok = schedule_resend(MessageID, Message),
        State#s{digest = NewDigest}.
    
    do_relay(ID,
             Message,
             State = #s{peers = Peers, digest = Digest, dead = Dead}) ->
        case maps:find(ID, Digest) of
            {ok, Count} when Count >= 10 ->
                NewDigest = maps:remove(ID, Digest),
                NewDead = sets:add_element(ID, Dead),
                ok = broadcast(Message, Peers),
                State#s{digest = NewDigest, dead = NewDead};
            {ok, Count} ->
                NewDigest = maps:put(ID, Count + 1),
                ok = broadcast(ID, Message, Peers),
                ok = schedule_resend(ID, Message),
                State#s{digest = NewDigest};
            error ->
                case set:is_element(ID, Dead) of
                    true ->
                        State;
                    false ->
                        NewDigest = maps:put(ID, 1),
                        ok = broadcast(Message, Peers),
                        ok = schedule_resend(ID, Message),
                        State#s{digest = NewDigest}
                end
        end.
    
    broadcast(ID, Message, Peers) ->
        Forward = fun(P) -> relay(P, ID, Message),
        lists:foreach(Forward, Peers).
    
    schedule_resend(ID, Message) ->
        _ = erlang:send_after(1000, self(), {redo_relay, ID, Message}),
        ok.
    

    And now we need the social bits...

    do_greet(Peer, State = #s{peers = Peers}) ->
        case lists:member(Peer, Peers) of
            false -> State#s{peers = [Peer | Peers]};
            true  -> State
        end.
    
    do_make_introduction(Peer, State = #s{peers = Peers}) ->
        ok = greet(Peer),
        do_greet(Peer, State).
    

    So what did all of the horribly untypespecced stuff up there do?

    It avoided any possibility of a deadlock. The reason deadlocks are so, well, deadly in peer systems is that anytime you have two identical processes (or actors, or whatever) communicating synchronously, you have created a textbook case of a potential deadlock.

    Any time A has a synchronous message headed toward B and B has a synchronous message headed toward A at the same time you now have a deadlock. There is no way to create to identical processes that call each other synchronously without creating a potential deadlock. In massively concurrent systems anything that might happen almost certainly will eventually, so you're going to run into this sooner or later.

    Gossip is intended to be asynchronous for a reason: it is a sloppy, unreliable, inefficient way to deal with a sloppy, unreliable, inefficient network topology. Trying to make calls instead of casts not only defeats the purpose of gossip-style message relay, it also pushes you into impossible deadlock territory incident to changing the nature of the protocol from asynch to synch.