2

I am implementing the Gossip Algorithm in which multiple actors spread a gossip at the same time in parallel. The system stops when each of the Actor has listened to the Gossip for 10 times.

Now, I have a scenario in which I am checking the listen count of the recipient actor before sending the gossip to it. If the listen count is already 10, then gossip will not be sent to the recipient actor. I am doing this using synchronous call to get the listen count.

def get_message(server, msg) do
    GenServer.call(server, {:get_message, msg})
end

def handle_call({:get_message, msg}, _from, state) do
    listen_count = hd(state) 
    {:reply, listen_count, state}
end

The program runs well in the starting but after some time the Genserver.call stops with a timeout error like following. After some debugging, I realized that the Genserver.call becomes dormant and couldn't initiate corresponding handle_call method. Is this behavior expected while using synchronous calls? Since all actors are independent, shouldn't the Genserver.call methods be running independently without waiting for each others response.

02:28:05.634 [error] GenServer #PID<0.81.0> terminating
    ** (stop) exited in: GenServer.call(#PID<0.79.0>, {:get_message, []}, 5000)
    ** (EXIT) time out
    (elixir) lib/gen_server.ex:774: GenServer.call/3

Edit: The following code can reproduce the error when running in iex shell.

defmodule RumourActor do
use GenServer

def start_link(opts) do
    {:ok, pid} = GenServer.start_link(__MODULE__,opts)
    {pid}
end

def set_message(server, msg, recipient) do      
    GenServer.cast(server, {:set_message, msg, server, recipient})
end

def get_message(server, msg) do
    GenServer.call(server, :get_message)
end

def init(opts) do
    state=opts
    {:ok,state}
end

def handle_cast({:set_message, msg, server, recipient},state) do
  :timer.sleep(5000)
  c = RumourActor.get_message(recipient, [])
  IO.inspect c
  {:noreply,state}
end

def handle_call(:get_message, _from, state) do
    count = tl(state)
    {:reply, count, state}
end
end

Open iex shell and load above module. Start two processes using:

a = RumourActor.start_link(["", 3])
b = RumourActor.start_link(["", 5])

Produce error by calling a deadlock condition as mentioned by Dogbert in comments. Run following without much time difference.

cb = RumourActor.set_message(elem(a,0), [], elem(b,0))
ca = RumourActor.set_message(elem(b,0), [], elem(a,0))

Wait for 5 seconds. Error will appear.

Prince Bhatti
  • 4,671
  • 4
  • 18
  • 24
  • 2
    Looks like a [Deadlock](https://en.wikipedia.org/wiki/Deadlock). Can you post the complete code (or enough code so we can reproduce this bug)? An [MCVE](https://stackoverflow.com/help/mcve) would be best. – Dogbert Oct 01 '17 at 07:27
  • @Dogbert My question can be unclear. This [question](https://stackoverflow.com/questions/45526514/elixir-genserver-parallel-handle-call?rq=1) can be the possible duplicate. Can you please explain this issue. I have a pretty big code. I will post the MCVE as mentioned to reproduce the error. – Prince Bhatti Oct 01 '17 at 07:37
  • 3
    You probably have two process, A and B, and A's handle_call calls B and B's handle_call calls A. This will create a deadlock because B's call won't return before A returns and A cannot return before B returns. – Dogbert Oct 01 '17 at 07:39
  • @Dogbert I have added the code to reproduce the error. Thanks for pointing out the Deadlock. It's happening exactly same as you mentioned. Can you please suggest any way to avoid this issue? – Prince Bhatti Oct 01 '17 at 08:28
  • 1
    I got on a wild kick today for some reason and way over answered your follow-on question... Hopefully my example educates more than it confuses. – zxq9 Oct 02 '17 at 07:09

2 Answers2

6

A gossip protocol is a way of dealing with asynchronous, unknown, unconfigured (random) networks that may be suffering intermittent outages and partitions and where no leader or default structure is present. (Note that this situation is somewhat unusual in the real world and out-of-band control is always imposed on systems in some way.)

With that in mind, let's change this to be an asynchronous system (using cast) so that we are following the spirit of the concept of chatty gossip style communication.

We need digest of messages that counts how many times a given message has been received, a digest of messages that have been received and are already over the magic number (so we don't re-send one if it is way late), and a list of processes enrolled in our system so we know to whom we are broadcasting:

(The following example is in Erlang because I just trip over Elixir syntax ever since I stopped using it...)

-module(rumor).

-record(s,
        {peers  = []         :: [pid()],
         digest = #{}        :: #{message_id(), non_neg_integer()},
         dead   = sets:new() :: sets:set(message_id())}).

-type message_id() :: zuuid:uuid().

Here I am using a UUID, but it could be whatever. An Erlang reference would be fine for a test case, but since gossip isn't useful within an Erlang cluster, and references are unsafe outside the originating system I'm just jumping to the assumption this is for a networked system.

We will need an interface function that allows us to tell a process to inject a new message into the system. We will also need an interface function that sends a message between two processes once it is already in the system. Then we will need an inner function that broadcasts messages to all the known (subscribed) peers. Ah, that means we need a greeting interface so that peer processes can notify each other of their presence.

We will also want a way to have a process tell itself to keep broadcasting over time. How long to set the interval on retransmission is not actually a simple decision -- it has everything to do with network topology, latency, variability, etc (you would actually probably occasionally ping peers and develop some heuristic based on the latency, drop peers that seem unresponsive, and so on -- but we're not going to get into that madness here). Here I'm just going to set it for 1 second because that is an easy to interpret interval for humans observing the system.

Note that everything below is asynchronous.

Interfaces...

insert(Pid, Message) ->
    gen_server:cast(Pid, {insert, Message}).

relay(Pid, ID, Message) ->
    gen_server:cast(Pid, {relay, ID, Message}).

greet(Pid) ->
    gen_server:cast(Pid, {greet, self()}).

make_introduction(Pid, PeerPid) ->
    gen_server:cast(Pid, {make_introduction, PeerPid}).

That last function is going to be our way as testers of the system to cause one of the processes to call greet/1 on some target Pid so they start to build a peer network. In the real world something slightly different usually goes on.

Inside our gen_server callback for receiving a cast we will get:

handle_cast({insert, Message}, State) ->
    NewState = do_insert(Message, State);
    {noreply, NewState};
handle_cast({relay, ID, Message}, State) ->
    NewState = do_relay(ID, Message, State),
    {noreply, NewState};
handle_cast({greet, Peer}, State) ->
    NewState = do_greet(Peer, State),
    {noreply, NewState};
handle_cast({make_introduction, Peer}, State) ->
    NewState = do_make_introduction(Peer, State),
    {noreply, NewState}.

Pretty simple stuff.

Above I mentioned that we would need a way for this thing to tell itself to resend after a delay. To do that we are going to send ourselves a naked message to "redo_relay" after a delay using erlang:send_after/3 so we are going to need a handle_info/2 to deal with it:

handle_info({redo_relay, ID, Message}, State) ->
    NewState = do_relay(ID, Message, State),
    {noreply, NewState}.

Implementation of the message bits is the fun part, but none of this is terribly tricky. Forgive the do_relay/3 below -- it could be more concise, but I'm writing this in a browser off the top of my head, so...

do_insert(Message, State = #s{peers = Peers, digest = Digest}) ->
    MessageID = zuuid:v1(),
    NewDigest = maps:put(MessageID, 1, Digest),
    ok = broadcast(Message, Peers),
    ok = schedule_resend(MessageID, Message),
    State#s{digest = NewDigest}.

do_relay(ID,
         Message,
         State = #s{peers = Peers, digest = Digest, dead = Dead}) ->
    case maps:find(ID, Digest) of
        {ok, Count} when Count >= 10 ->
            NewDigest = maps:remove(ID, Digest),
            NewDead = sets:add_element(ID, Dead),
            ok = broadcast(Message, Peers),
            State#s{digest = NewDigest, dead = NewDead};
        {ok, Count} ->
            NewDigest = maps:put(ID, Count + 1),
            ok = broadcast(ID, Message, Peers),
            ok = schedule_resend(ID, Message),
            State#s{digest = NewDigest};
        error ->
            case set:is_element(ID, Dead) of
                true ->
                    State;
                false ->
                    NewDigest = maps:put(ID, 1),
                    ok = broadcast(Message, Peers),
                    ok = schedule_resend(ID, Message),
                    State#s{digest = NewDigest}
            end
    end.

broadcast(ID, Message, Peers) ->
    Forward = fun(P) -> relay(P, ID, Message),
    lists:foreach(Forward, Peers).

schedule_resend(ID, Message) ->
    _ = erlang:send_after(1000, self(), {redo_relay, ID, Message}),
    ok.

And now we need the social bits...

do_greet(Peer, State = #s{peers = Peers}) ->
    case lists:member(Peer, Peers) of
        false -> State#s{peers = [Peer | Peers]};
        true  -> State
    end.

do_make_introduction(Peer, State = #s{peers = Peers}) ->
    ok = greet(Peer),
    do_greet(Peer, State).

So what did all of the horribly untypespecced stuff up there do?

It avoided any possibility of a deadlock. The reason deadlocks are so, well, deadly in peer systems is that anytime you have two identical processes (or actors, or whatever) communicating synchronously, you have created a textbook case of a potential deadlock.

Any time A has a synchronous message headed toward B and B has a synchronous message headed toward A at the same time you now have a deadlock. There is no way to create to identical processes that call each other synchronously without creating a potential deadlock. In massively concurrent systems anything that might happen almost certainly will eventually, so you're going to run into this sooner or later.

Gossip is intended to be asynchronous for a reason: it is a sloppy, unreliable, inefficient way to deal with a sloppy, unreliable, inefficient network topology. Trying to make calls instead of casts not only defeats the purpose of gossip-style message relay, it also pushes you into impossible deadlock territory incident to changing the nature of the protocol from asynch to synch.

zxq9
  • 13,020
  • 1
  • 43
  • 60
  • Thanks for such a comprehensive answer. I am convinced that calling synchronous call is a bad idea in this case. However, can you list out a scenario where the synchronous call is useful but cannot lead to a deadlock? . – Prince Bhatti Oct 02 '17 at 17:30
  • 1
    @COSTA Synchronous calls should typically go one way. If A might call B, that's fine, but in that case B should never call A, or vice-versa. Generally you will have a handful of "registry" processes that control ongoing state, and many workers. The workers *cast* results to the registry processes and may call them for data, but registries should not be calling workers (spawning them, sure, but not calling). If two identical processes need to interact I usually spawn a third process to represent the transaction itself. So if two chars in a game trade, I spawn a "broker" process to manage it. – zxq9 Oct 02 '17 at 22:09
  • 1
    @COSTA In the case of a broker, if A and B try to establish a trade then A will spawn a broker when B spawns a broker. Then A's broker contacts B to initiate a trade state while B's does the same to A. The worst that can happen is that A and B respond that they are already in a trading state and both trades fail (human players typically try again). The point here is that *both* A and B are *free to respond* because neither is blocking. This idea of using brokered exchanges combined with special service states extends well beyond game characters trading, of course. – zxq9 Oct 02 '17 at 22:17
  • 2
    As a design guideline you should: **Always try to do everything asynch to start with, and then only move to synch communication when you have proven that you really need to.** About 1% of the time you'll need a call but can't manage it for some reason and will wind up with tagged queues of calls, but that is actually pretty rare. – zxq9 Oct 02 '17 at 22:27
2

Genser.call has a default timeout of 5000 milliseconds. So what probably happening is, the message queue of the actor is filled with millions of messages and by the time it reaches to call, the calling actor has timed out.

You can handle timeout using a try...catch:

try do
  c = RumourActor.get_message(recipient, [])
catch
  :exit, reason ->
    # handle timeout

Now, the called actor will finally get to the call message and respond, which will come as an unexpected message to the first process. This you'll need to handle using handle_info. So one way is to ignore the error in catch block and send it rumor from handle_info.

Also, this will significantly degrade the performance if there are many process waiting to be timed-out for 5 seconds before moving ahead. One could deliberately reduce the timeout and handle the reply in handle_info. This will reduce to using cast and handling reply from other process.

Your blocking call need to be broken into two non blocking calls. So if A is making a blocking call to B, instead of waiting for reply, A can ask B to send its state on a given address (A's address) and move on. Then A will handle that message separately and reply if necessary.

 A.fun1():
   body of A before blocking call
   result = blockingcall()
   do things based on result

needs to be divided into:

 A.send():
   body of A before blocking call
   nonblockingcall(A.receive) #A.receive is where B should send results
   do other things

 A.receive(result):
   do things based on result
Anurag Peshne
  • 1,547
  • 12
  • 29
  • Considering the example of two actors A and B having a deadlock. Which actor will finally receive the call message? – Prince Bhatti Oct 02 '17 at 21:17
  • 1
    If A is sending a message to B using `call`, and if B doesn't respond in 5 sec then A will timeout. Sometime later when B finally gets to the `call` message it will reply which will come as unexpected reply to A. – Anurag Peshne Oct 02 '17 at 21:20
  • 1
    B will receive the call message. – Anurag Peshne Oct 02 '17 at 21:23
  • Thanks, Anurag. As it will reduce the performance significantly, I guess it's better not to use synchronous calls in this case. zxq9 suggested an alternate way in erlang. Is there any way that allows the actor B to send an acknowledgment back to actor A without using synchronous call? – Prince Bhatti Oct 02 '17 at 21:31
  • Yes, I fully agree with @zxq9, making a blocking call is not a good idea in this case. I'm updating my answer to answer this. – Anurag Peshne Oct 02 '17 at 21:33