elixirphoenix-frameworkgen-servergenstage

GenStage: retry handle_demand when GenServer updates


If my GenStage's handle_demand/2 method looks like this:

def handle_demand(demand, _state) when demand > 0 do
  case Queue.dequeue do
    nil ->
      Logger.debug("Queue empty.")
      {:noreply, [], []}
    {job, updated_queue} -> {:noreply, job, updated_queue}
  end
end

How do I get it to "rerun" when my Queue (a GenServer) is changed/updated?

My queue module just looks like this:

defmodule Queue do
  use GenServer

  ### client

  def start_link(state \\ []) do
    GenServer.start_link(__MODULE__, state, name: __MODULE__)
  end

  def queue, do: GenServer.call(__MODULE__, :queue)

  def enqueue(value), do: GenServer.cast(__MODULE__, {:enqueue, value})

  def dequeue, do: GenServer.call(__MODULE__, :dequeue)

  ### server

  def init(state), do: {:ok, state}

  def handle_call(:dequeue, _from, [value | state]) do
    {:reply, value, state}
  end

  def handle_call(:dequeue, _from, []), do: {:reply, nil, []}

  def handle_call(:queue, _from, state), do: {:reply, state, state}

  def handle_cast({:enqueue, value}, state) do
    {:noreply, state ++ [value]}
  end
end

Solution

  • Why would you want to “rerun” it when Queue changes? This is a drastic misusing of GenStage. It was invented to allow fight a back pressure, that comes from Queue, not vice versa. In real life, you either don’t need a GenStage at all, or you don’t want to “rerun” demand when Queue gets updated because it will sooner or later kill it via timeouts/messagebox.

    You probably have kinda “consumer” to call handle_demand when it handles the previous load from the queue. GenStage’s repo has four incredibly clear examples using different patterns to work with GenStage. Besides that, there is a great intro to GenStage in Elixir blog.

    Just pick up the pattern you need and adopt it from the sources linked above.