In my setting I want to forward certain status changes via an SSE channel (Server sent events). The status changes are initiated by calling a REST endpoint. So, I need to forward the incoming status change to the SSE stream.
What is the best/simplest way to accomplish this in Quarkus.
One solution I can think of is to use an EventBus (https://quarkus.io/guides/reactive-messaging). The SSE endpoint would subscribe to the status changes and push it through the SSE channel. The status change endpoint publishes appropriate events.
Is this a viable solution? Are there other (simpler) solutions? Do I need to use the reactive stuff in any case to accomplish this?
Any help is very appreciated!
Dmytro, thanks for pointing me in the right direction. I have opted for Mutiny in connection with Kotlin. My code now looks like this:
data class DeviceStatus(var status: Status = Status.OFFLINE) {
enum class Status {OFFLINE, CONNECTED, ANALYZING, MAINTENANCE}
}
@ApplicationScoped
class DeviceStatusService {
var deviceStatusProcessor: PublishProcessor<DeviceStatus> = PublishProcessor.create()
var deviceStatusQueue: Flowable<DeviceStatus> = Flowable.fromPublisher(deviceStatusProcessor)
fun pushDeviceStatus(deviceStatus: DeviceStatus) {
deviceStatusProcessor.onNext(deviceStatus)
}
fun getStream(): Multi<DeviceStatus> {
return Multi.createFrom().publisher(deviceStatusQueue)
}
}
@Path("/deviceStatus")
class DeviceStatusResource {
private val LOGGER: Logger = Logger.getLogger("DeviceStatusResource")
@Inject
@field: Default
lateinit var deviceStatusService: DeviceStatusService
@POST
@Consumes(MediaType.APPLICATION_JSON)
fun status(status: DeviceStatus): Response {
LOGGER.info("POST /deviceStatus " + status.status);
deviceStatusService.pushDeviceStatus(status)
return Response.ok().build();
}
@GET
@Path("/eventStream")
@Produces(MediaType.SERVER_SENT_EVENTS)
@SseElementType(MediaType.APPLICATION_JSON)
fun stream(): Multi<DeviceStatus>? {
return deviceStatusService.getStream()
}
}
As minimal setup the service could directly use the deviceStatusProcessor as publisher. However, the Flowable adds buffering. Comments on the implementation are welcome.