I found some resources on how to test the producer, however there is nothing I could find which shows how to test the Consumer.
In producer, I create a dummy consumer and everything works fine, however in consumer I am struggling with testing.
defmodule DataProducer do
use GenStage
def start_link([]) do
GenStage.start_link(__MODULE__, 0, name: __MODULE__)
end
# {:queue.new, demand, size}
def init(counter) do
{:producer, counter, dispatcher: GenStage.BroadcastDispatcher}
end
def handle_demand(demand, state) do
events = Enum.to_list(state..state + demand + 1)
# Logger.info "demand is: #{inspect(demand)}, state is #{inspect(state)}"
{:noreply, events, (state + demand)}
end
end
Producer Test:
defmodule DataProducerTest do
use ExUnit.Case
test "check the results" do
{:ok, stage} = DataProducer.start_link([])
{:ok, _cons} = TestConsumer.start_link(stage)
assert_receive {:received, events}
GenStage.stop(stage)
end
end
defmodule TestConsumer do
def start_link(producer) do
GenStage.start_link(__MODULE__, {producer, self()})
end
def init({producer, owner}) do
{:consumer, owner, subscribe_to: [producer]}
end
def handle_events(events, _from, owner) do
send(owner, {:received, events})
{:noreply, [], owner}
end
end
And consumer:
defmodule DataConsumer do
use GenStage
def start_link([]) do
GenStage.start_link(__MODULE__, :any_state)
end
def init(state) do
{:consumer, state, subscribe_to: [{DataProducer, selector: fn n -> n > 50 && n < 100 end, max_demand: 10}]}
end
def handle_events(events, _from, state) do
for event <- events do
# :timer.sleep(250)
Logger.info inspect( {self(), event, state} )
end
{:noreply, [], state}
end
end
Thank you in advanced.
In the test for the consumer:
test "should behave like consumer" do
{:ok, producer} = DummyProducer.start_link(1)
{:ok, consumer} = Consumer.start_link(producer)
Process.register self, :test
assert_receive {:called_back, 10}
end
Now DummyProducer
defmodule DummyProducer do
use GenStage
def start_link(demand) do
GenStage.start_link(__MODULE__, demand)
end
def init(demand) do
{:producer, demand}
end
def handle_demand(demand, counter) when demand > 0 do
events = Enum.to_list(counter..counter+demand-1)
Process.send_after(self(), {:stop, demand}, 1)
{:noreply, events, demand + counter}
end
def handle_info({:stop, demand}, state) do
send :test, {:called_back, demand}
{:stop, :normal, demand}
end
end
I think,
The point of Testing the consumer is checking if consumer can send the demand and stick with max demand allocated in the subscription.