htmx

How to politely close an SSE event stream using htmx?


I'm trying to set up a short-lived event stream handler using htmx. Imagine, e.g., streaming a chatgpt response or similar. The stream will last for 10-30 seconds and then I expect it to be exhausted.

Is there a way to have htmx listen for an SSE event type and close the SSE source in response?

I have a clunky workaround where I have an htmx listener for the stream termination event type, and that listener then makes a request to an endpoint that just returns an empty div and swaps that div in for the original listener to prevent it from endlessly trying to reconnect to the SSE source.

But I feel like I must be missing something because this doesn't seem very elegant.

Here's what I have now, below. It works (mostly), but is there something better? Thanks!

<div
  id="chat-sse-listener"
  hx-ext="sse"
  sse-connect="{% url 'stream_test' %}"
>

  {# This listens for the next token in the stream and appends it to the chat. #}
  <div
    sse-swap="message"
    hx-target="#chat-message"
    hx-swap="beforeend"
  ></div>

  {# This listens for my custom EndOfStream SSE event and awkwardly replaces the entirely SSE listener with an empty div. #}
  <div
    hx-trigger="sse:EndOfStream"
    hx-target="#chat-sse-listener"
    hx-swap="outerHTML"
    hx-get="{% url 'empty_div_response' %}"
  ></div>

</div>

<div id="chat-message"></div>

Update:

After playing around with it for a bit, I have a slightly less awkward (but still not great) solution. Instead of having a second listener and a second event type, I end the stream by having the server pass down a div with htmx's out-of-band swap API. That div is then used to replace (and thus remove) the sse listener.

Code:

    async def __anext__(self):
        await asyncio.sleep(0.25)

        if self.story_tokens_queue.empty():
            if self.sent_close_token:
                raise StopAsyncIteration
            else:
                self.sent_close_token = True
                # this div will be swapped, out-of-band, for the existing #chat-see-listener, effectively removing it
                # and closing the SSE source on the client side
                return 'data: <div id="chat-sse-listener" hx-swap-oob="true"></div>\n\n'
        else:
            token = self.story_tokens_queue.get().replace("\n", "<br/>")
            return f"data: {token}\n\n"

Fewer moving parts, but not as crisp as explicitly requesting the client close the source. I'm convinced at this point thought that that would require additional javascript.

Final Follow-up Here's what I'm going with in production for my side project. It works for now!

My backend code puts this in as the very last token sent by the SSE stream:

            # second, we close the SSE listener on the client side
            elif not self.sent_close_token:
                self.sent_close_token = True
                # this div will be swapped, out-of-band, for the existing #chat-see-listener, effectively removing
                # it and closing the SSE source on the client side
                return 'data: <div id="chat-sse-listener" hx-swap-oob="true"></div>\n\n'

My HTML looks like this:

<div
  id="chat-sse-listener"
  hx-ext="sse"
  sse-connect="{% url 'chat_request_answer' id=chat_thread.id %}"
>

  {# This listens for the next token in the stream and appends it to the chat. #}
  <div
    sse-swap="message"
    hx-target="#sage-answer-{{ new_answer_id }}"
    hx-swap="beforeend"
  ></div>

</div>

I'm using htmx 1.9.6.

It seems to work for me with that setup.


Solution

  • htmx 2.0 will have updated version of SSE extension, which has sse-close attribute for closing the event source on a specific event. This feature is included in the extension since 2.1.0

    https://www.npmjs.com/package/htmx-ext-sse