springspring-bootkotlinserver-sent-events

Broadcast SseEmitter to multiple clients


I'm currently implementing a notification system based on some examples I found reading articles about Spring and SSE (in a non-reactive way).

I've succeeded implementing a solution, and it works well if I have a single client consuming the events sent by the backend.

The problem is when I open several browsers and try to fire the events to all the consumers : only the last client which subscribed to the SSE broadcaster endpoint receive the notification.

How to fire events to multiple clients at the same time ?

Maybe it is normal to have a single SSE connection if clients are on the same network ?

Here is my controller:

@RestController
@RequestMapping("/sse/servlet")
class EventController {
    private val executorService: ScheduledExecutorService = Executors.newSingleThreadScheduledExecutor()
    private val emitters: MutableMap<String, SseEmitter> = mutableMapOf()
    private val objectMapper: ObjectMapper = ObjectMapper()
    private val log = KotlinLogging.logger {}

    @GetMapping("/notifications")
    fun listenNotifications(@RequestParam eventId: String): SseEmitter? {
        if (emitters[eventId] != null) {
            log.info("SSE connection already exists")
            return emitters[eventId]
        }
        val emitter = SseEmitter(TimeUnit.MINUTES.toMillis(10))
        emitter.onCompletion {
            log.info("SSE connection closed")
            emitters.remove(eventId)
        }
        emitter.onTimeout {
            log.info("SSE connection timed out")
            emitter.complete()
        }
        emitter.onError { throwable: Throwable? ->
            log.error("Listen SSE exception", throwable)
        }
        emitters[eventId] = emitter
        return emitter
    }

    @PostMapping("/notifications")
    @ResponseStatus(ACCEPTED)
    fun fireNotification(
        @RequestParam eventId: String,
        @RequestBody notification: Notification
    ) {
        val sseEmitter = emitters[eventId]
        if (sseEmitter === null) {
            log.info("SSE connection does not exist")
            return
        }
        handleEmitter(notification, sseEmitter, eventId)
    }

    private fun handleEmitter(
        notification: Notification,
        sseEmitter: SseEmitter,
        eventId: String
    ) = try {
        val data = objectMapper.writeValueAsString(notification)
        val sseEventBuilder = event().data(data)
        sseEmitter.send(sseEventBuilder)
    } catch (ioException: IOException) {
        log.error("Send SSE exception", ioException)
        emitters.remove(eventId)
    }

the notification model

data class Notification(val message: String)

my pretty simple application.yaml properties file

server:
  servlet:
    context-path: /api

my gradle configuration

import org.jetbrains.kotlin.gradle.tasks.KotlinCompile

plugins {
    id("org.springframework.boot") version "3.0.4"
    id("io.spring.dependency-management") version "1.1.0"
    kotlin("jvm") version "1.7.22"
    kotlin("plugin.spring") version "1.7.22"
}

group = "com.ggardet"
version = "0.0.1-SNAPSHOT"
java.sourceCompatibility = JavaVersion.VERSION_17

repositories {
    mavenCentral()
}

dependencies {
    implementation("com.fasterxml.jackson.module:jackson-module-kotlin")
    implementation("io.github.microutils:kotlin-logging-jvm:2.0.11")
    implementation("org.jetbrains.kotlin:kotlin-reflect")
    implementation("org.springframework.boot:spring-boot-starter-web")
    implementation("org.springframework.boot:spring-boot-starter-security")
}

tasks.withType<KotlinCompile> {
    kotlinOptions {
        freeCompilerArgs = listOf("-Xjsr305=strict")
        jvmTarget = "17"
    }
}

and finally the requests used to create/consume the events (using httpie)

# to listen for events I open 2 or 3 terminals and launch the following request
http --stream -a user:user -v GET http://localhost:8080/api/sse/servlet/notifications\?eventId\=1
# to fire a new event I open another terminal instance and launch this single request
http -a user:user -v POST http://localhost:8080/api/sse/servlet/notifications\?eventId\=1 message=test

terminal Thanks for any kind of help

Note: I have the same issue if I remove the "emitters" map and use a single SseEmitter at the class level to send the events.


Solution

  • So I didn't realize clients can't share the same SseEmitter.
    I had to create one SseEmitter per susbcription to make it works.

    To answer the questions:

    Not sure if it's the best way to do this but I post my solution here in case someone misunderstood how SseEmitter works, the same way I did:

    @RestController
    @RequestMapping("/sse/servlet")
    class EventController {
        private val executorService: ScheduledExecutorService = Executors.newSingleThreadScheduledExecutor()
        private val emitters = ConcurrentHashMap<String, MutableList<SseEmitter>>()
        private val objectMapper: ObjectMapper = ObjectMapper()
        private val log = KotlinLogging.logger {}
    
        @GetMapping("/notifications", produces = [MediaType.TEXT_EVENT_STREAM_VALUE])
        fun listenNotifications(@RequestParam eventId: String): SseEmitter? {
            val emitter = SseEmitter(Duration.ofMinutes(10).toMillis())
            emitter.onCompletion {
                log.info("SSE connection closed")
                emitters.remove(eventId)
            }
            emitter.onTimeout {
                log.info("SSE connection timed out")
                emitters.remove(eventId)
                emitter.complete()
            }
            emitter.onError { throwable: Throwable? ->
                log.error("Listen SSE exception", throwable)
                emitters.remove(eventId)
            }
            emitters.computeIfAbsent(eventId) { mutableListOf() }.add(emitter)
            return emitter
        }
    
        @PostMapping("/notifications")
        @ResponseStatus(ACCEPTED)
        fun fireNotification(
            @RequestParam eventId: String,
            @RequestBody notification: Notification
        ) = emitters[eventId]?.forEach { handleEmitter(notification, it) }
    
        private fun handleEmitter(
            notification: Notification,
            sseEmitter: SseEmitter,
        ) = try {
            val data = objectMapper.writeValueAsString(notification)
            val sseEventBuilder = event().data(data)
            sseEmitter.send(sseEventBuilder)
        } catch (ioException: IOException) {
            log.error("Send SSE exception", ioException)
        }
    }