Lagom: 1.5.4
Consider having a ServiceCall (example):
def stream: ServiceCall[NotUsed, Source[Int, NotUsed]] = ServiceCall { _ =>
Future.successful(
Source(1.to(1000)).wireTap(msg => log.info(s"sending $msg"))
)
}
When another service (example) is consuming this ServiceCall by e.g.:
val ticker = Source.tick(1.second, 100.millis, true)
helloWorldStreamService.stream.invoke().flatMap(_.zip(ticker).map {
case (msg, _) =>
log.info(s"received $msg")
msg
}.runWith(Sink.seq))
You would expect the artificially slow consumer would slow down the producer. Looking at the logs this doesn't seem to be the case:
sending 1
sending 2
sending 3
[...]
sending 1000
[1 second pause]
received 1
[100ms pause]
received 2
[100ms pause]
received 3
[...]
Am I missing any hidden buffers?
Example code:
https://github.com/an-tex/lagom-backpressure
run sbt runAll
and then execute curl 127.0.0.1:[port of hello-world-stream-client service]/api/test
to see the effect
There are system buffers exceeding the test size. On Mac OS there seems to be a 128kb (512kb burst) buffer. Outside of the buffers the backpressure works like a charm. I've updated the github repo with a bigger test size in case someone wants to play around.
Credit goes to TimMoore who answered this question on Lightbend Discuess