elixirgenstage

elixir GenStage producer state


There is a producer, which is initialized by a list of values.

defmodule GenstageExample.Producer do
  use GenStage
  require Logger

  def start_link(initial \\ 0) do
    GenStage.start_link(__MODULE__, initial, name: __MODULE__)
  end

  def init(el_list) do
    Logger.info "producer init #{inspect(el_list)}"
    {:producer, el_list}
  end

  def handle_demand(demand, [head|tail]) do
    Logger.info("producer handle_demand #{demand}, #{inspect(head)}, #{inspect(tail)}")
    {:noreply, [head], tail}
  end

end

There is producer-consumer, which receives one value from producer, performs some actions with this value.

defmodule GenstageExample.ProducerConsumer do
  use GenStage
  require Logger

  def start_link do
    GenStage.start_link(__MODULE__, :state_doesnt_matter, name: __MODULE__)
  end

  def init(state) do
    Logger.info "producer_consumer init #{inspect(state)}"
    {:producer_consumer, state, subscribe_to: [GenstageExample.Producer]}
  end

  def handle_events(events, _from, state) do
    Logger.info "producer_consumer handle_events #{inspect(events)}, #{inspect(state)}"
    {:noreply, events, state}
  end

end

And there is a consumer that simply displays this value.

defmodule GenstageExample.Consumer do
  use GenStage
  require Logger

  def start_link do
    GenStage.start_link(__MODULE__, :state_doesnt_matter)
  end

  def init(state) do
    Logger.info "consumer init #{inspect(state)}"
    {:consumer, state, subscribe_to: [GenstageExample.ProducerConsumer]}
  end

  def handle_events(events, _from, state) do
    Logger.info "consumer handle_events #{inspect(events)}, #{inspect(state)}"
    for event <- events do
      IO.inspect {self(), event, state}
    end
    {:noreply, [], state}
  end

end

my application.ex:

defmodule GenstageExample.Application do

  @moduledoc false

  use Application

  def start(_type, _args) do
    import Supervisor.Spec, warn: false

      worker(GenstageExample.Producer, [["el0", "el1", "el2"]]),
      worker(GenstageExample.ProducerConsumer, []),
      worker(GenstageExample.Consumer, [])
    ]

    opts = [strategy: :one_for_one, name: GenstageExample.Supervisor]
    Supervisor.start_link(children, opts)
  end
end

After startup, I see such a log:

11:56:42.631 [info]  producer init ["el0", "el1", "el2"]
11:56:42.633 [info]  producer_consumer init :state_doesnt_matter
11:56:42.633 [info]  producer handle_demand 1000, "el0", ["el1", "el2"]
11:56:42.634 [info]  consumer init :state_doesnt_matter
11:56:42.634 [info]  producer_consumer handle_events ["el0"], :state_doesnt_matter
11:56:42.634 [info]  consumer handle_events ["el0"], :state_doesnt_matter
{#PID<0.189.0>, "el0", :state_doesnt_matter}

And that's it, nothing else happens.

As I understand, in {: noreply, [head], tail}, I specify a new state for the producer. Why producer-consumer does not request the next item from this list?


Solution

  • It was necessary to specify the maximum and minimum demand

      def init(state) do
        Logger.info "producer_consumer init #{inspect(state)}"
        {:producer_consumer, state, subscribe_to: [{GenstageExample.Producer, max_demand: 1, min_demand: 0}]}
      end
    

    and

      def init(state) do
        Logger.info "consumer init #{inspect(state)}"
        {:consumer, state, subscribe_to: [{GenstageExample.ProducerConsumer, max_demand: 1, min_demand: 0}]}
      end