scalaakkaakka-streamakka-testkit

How should I test akka-streams RestartingSource usage


I'm working on an application that has a couple of long-running streams going, where it subscribes to data about a certain entity and processes that data. These streams should be up 24/7, so we needed to handle failures (network issues etc).

For that purpose, we've wrapped our sources in RestartingSource.

I'm now trying to verify this behaviour, and while it looks like it functions, I'm struggling to create a test where I push in some data, verify that it processes correctly, then send an error, and verify that it reconnects after that and continues processing.

I've boiled that down to this minimal case:

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{RestartSource, Sink, Source}
import akka.stream.testkit.TestPublisher
import org.scalatest.concurrent.Eventually
import org.scalatest.{FlatSpec, Matchers}

import scala.concurrent.duration._
import scala.concurrent.ExecutionContext

class MinimalSpec extends FlatSpec with Matchers with Eventually {

  "restarting a failed source" should "be testable" in {
    implicit val sys: ActorSystem = ActorSystem("akka-grpc-measurements-for-test")
    implicit val mat: ActorMaterializer = ActorMaterializer()
    implicit val ec: ExecutionContext = sys.dispatcher

    val probe = TestPublisher.probe[Int]()
    val restartingSource = RestartSource
      .onFailuresWithBackoff(1 second, 1 minute, 0d) { () => Source.fromPublisher(probe) }

    var last: Int = 0
    val sink = Sink.foreach { l: Int => last = l }

    restartingSource.runWith(sink)

    probe.sendNext(1)

    eventually {
      last shouldBe 1
    }

    probe.sendNext(2)

    eventually {
      last shouldBe 2
    }

    probe.sendError(new RuntimeException("boom"))

    probe.expectSubscription()

    probe.sendNext(3)

    eventually {
      last shouldBe 3
    }

  }
}

This test consistently fails on the last eventually block with Last failure message: 2 was not equal to 3. What am I missing here?

Edit: akka version is 2.5.31


Solution

  • I figured it out after having had a look at the TestPublisher code. Its subscription is a lazy val. So when RestartSource detects the error, and executes the factory method () => Source.fromPublisher(probe) again, it gets a new Source, but the subscription of the probe is still pointing to the old Source. Changing the code to initialize both a new Source and TestPublisher works.