scalawebsocketakkaakka-httpblockingqueue

Websocket with BlockingQueue subscription


I want to build a GraphQl-like subscription where a client via websocket can ask the Akka Http server to subscribe to updated results from a database and push them back via websocket to the client. My database emits transactions via a java BlockingQueue and when transacted data match a query, a new result should be pushed. I'm setting up the blocking queue in a separate thread that should hopefully send updates back via the Akka eventbus.

But I'm a Akka newbie and stuck with my setup where a Sink is not even receiving the incoming message with the query to match from the client. Am I on the right track or can you help me in the right direction?

This is what I have been able to put together so far (simplified) from reading the docs and browsing various websocket examples:

implicit val system          : ActorSystem              = ActorSystem()
implicit val executionContext: ExecutionContextExecutor = system.dispatcher

Http().newServerAt("localhost", 8080).bind(route)

lazy val route: Route = cors() {
  path("ws") {
    handleWebSocketMessages(
      Flow.fromSinkAndSource(incomingMessages, outgoingMessages)
    )
  }
}

Receiving the query from the client. Apparently not the right way to make the Sink since incoming messages are not received.

val incomingMessages: Sink[Message, Future[Done]] = {
  Sink.foreach {
    case BinaryMessage.Strict(argsSerialized) =>
      println("Never gets here...")

      // Deserialize incoming query
      val query = deserialize(argsSerialized.asByteBuffer)

      // What to do when new query results emerge
      val callback: List[Any] => Unit = { (result: List[Any]) =>
        val resultBytes   = serialize(result)
        val binaryMessage = BinaryMessage(ByteString(resultBytes))

        // Publish serialized results to Akka eventbus
        system.eventStream.publish(binaryMessage)
      }

      // Subscribe to updated query results from queue in separate thread
      subscribe(query, callback)

    case other => throw new Exception("Unexpected incoming message: " + other)
  }
}

Sending back updated query results when changes match the query:

val outgoingMessages: Source[Message, Any] = {
  Source.lazySource { () =>
    val (actorRef, itemSource) = Source
      .actorRef[BinaryMessage](
        completionMatcher = PartialFunction.empty,
        failureMatcher = PartialFunction.empty,
        1000,
        OverflowStrategy.backpressure // ?
      )
      .preMaterialize()

    system.eventStream.subscribe(actorRef, classTag[BinaryMessage].runtimeClass)
    itemSource
  }
}

Separate thread looping forever and blocking between each new database transaction

def subscribe(query: String, callback: List[Data] => Unit): Unit = {
  object TxReportWatcher extends Runnable {
    override def run(): Unit = {
      while (true) {
        try {
          // `take` blocks until new data is transacted
          matchResult(query, javaBlockingQueue.take).foreach(machingResult =>
            // Publish new result to the Akka eventbus
            callback(machingResult)
          )
        } catch {
          case e: InterruptedException => e.printStackTrace()
        }
      }
    }
  }
  Executors.newSingleThreadExecutor().execute(TxReportWatcher)
}

Solution

  • Found a solution. Instead of using the Akka event bus to signal changes, an Akka Source.queue could be used to push changed results back to the client.

    New results are serialised and offered to the queue as a binary message. Every time this happens it triggers the source to push the message back to the client. This allows us to have the server push data to the client whenever we want while the websocket connection is kept alive.

    The callback function is passed to the separate thread that runs the subscription mechanism. This seems like a good separation of concerns since the callback caller doesn't need to know anything about websockets/actors/akka - it only executes the callback function with the changed result.

    def wsFlow: Flow[Message, Message, _] = {
      // Use queue with 1 message at a time
      val (queue, source) = Source.queue[Message](
        1, OverflowStrategy.backpressure
      ).preMaterialize()
    
      val sink = Sink.foreach[Message] {
        case BinaryMessage.Strict(querySerialized) =>
          // Deserialize query
          val query = deserialize(querySerialized.asByteBuffer)
    
          // What to do when new query results emerge
          val callback: List[Any] => Unit = { (result: List[Any]) =>
            val resultBytes  = serialize(result)
            val binaryResult = BinaryMessage(ByteString(resultBytes))
    
            // Offer result to queue
            // This makes `source` push serialized result to client
            queue.offer(binaryResult)
          }
    
          // Subscribe to updated query results from db queue in separate thread
          subscribe(query, callback)
    
        case other => throw new Exception("Unexpected incoming message: " + other)
      }
    
      Flow.fromSinkAndSource(sink, source)
    }
    
    lazy val route: Route = cors() {
      path("ws") {
        handleWebSocketMessages(wsFlow)
      }
    }