javaspringspring-mvceventsource

Should Spring SseEmitter.complete() trigger an EventSource reconnect - how to close connection server-side


I'm trying to set up a Spring SseEmitter to send a sequence of updates of the status of a running job. It seems to be working but:

Whenever I call emitter.complete() in in my Java server code, the javascript EventSource client calls the registered onerror function and then calls my Java endpoint again with a new connection. This happens in both Firefox and Chrome.

I can probably send an explicit "end-of-data" message from Java and then detect that and call eventSource.close() on the client, but is there a better way?

What is the purpose of emitter.complete() in that case?

Also, if I always have to terminate the connection on the client end, then I guess every connection on the server side will be terminated by either a timeout or a write error, in which case I probably want to manually send back a heartbeat of some kind every few seconds?

It feels like I'm missing something if I'm having to do all this.


Solution

  • I have added the following to my Spring boot application to trigger the SSE connection close()

    Server Side:

    1. Create a simple controller which returns SseEmitter.
    2. Wrap the backend logic in a single thread executor service.
    3. Send your events to the SseEmitter.
    4. On complete send an event of type complete via the SseEmitter.

      @RestController
      public class SearchController {
      
      @Autowired
      private SearchDelegate searchDelegate;
      
      @GetMapping(value = "/{customerId}/search")
      @ResponseStatus(HttpStatus.OK)
      @ApiOperation(value = "Search Sources", notes = "Search Sources")
      @ApiResponses(value = {
              @ApiResponse(code = 201, message = "OK"),
              @ApiResponse(code = 401, message = "Unauthorized")
      })
      @ResponseBody
      public SseEmitter search(@ApiParam(name = "searchCriteria", value = "searchCriteria", required = true) @ModelAttribute @Valid final SearchCriteriaDto searchCriteriaDto) throws Exception {
          return searchDelegate.route(searchCriteriaDto);
        }
      }
      
      
      
      @Service
      public class SearchDelegate {
      public static final String SEARCH_EVENT_NAME = "SEARCH";
      public static final String COMPLETE_EVENT_NAME = "COMPLETE";
      public static final String COMPLETE_EVENT_DATA = "{\"name\": \"COMPLETED_STREAM\"}";
      
      @Autowired
      private SearchService searchService;
      
      private ExecutorService executor = Executors.newCachedThreadPool();
      
      public SseEmitter route(SearchCriteriaDto searchCriteriaDto) throws Exception {
          SseEmitter emitter = new SseEmitter();
          executor.execute(() -> {
              try {
                  if(!searchCriteriaDto.getCustomerSources().isEmpty()) {
                      searchCriteriaDto.getCustomerSources().forEach(customerSource -> {
                          try {
                              SearchResponse searchResponse = searchService.search(searchCriteriaDto);
                              emitter.send(SseEmitter.event()
                                      .id(customerSource.getSourceId())
                                      .name(SEARCH_EVENT_NAME)
                                      .data(searchResponse));
                          } catch (Exception e) {
                              log.error("Error while executing query for customer {} with source {}, Caused by {}",
                                      customerId, source.getType(), e.getMessage());
                          }
                      });
                  }else {
                      log.debug("No available customerSources for the specified customer");
                  }
                  emitter.send(SseEmitter.event().
                          id(String.valueOf(System.currentTimeMillis()))
                          .name(COMPLETE_EVENT_NAME)
                          .data(COMPLETE_EVENT_DATA));
                  emitter.complete();
              } catch (Exception ex) {
                  emitter.completeWithError(ex);
              }
          });
          return emitter;
         }
      }
      

    Client Side:

    1. Since we specified the name of event on our SseEmitter, an event will be dispatched on the browser to the listener for the specified event name; the website source code should use addEventListener() to listen for named events. (Notice: The onmessage handler is called if no event name is specified for a message)
    2. Call the EventSource on the COMPLETE event to release the client connection.

    https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events

    var sse = new EventSource('http://localhost:8080/federation/api/customers/5d96348feb061d13f46aa6ce/search?nativeQuery=true&queryString=*&size=10&customerSources=1,2,3&start=0');
    
    sse.addEventListener("SEARCH", function(evt) {
       var data = JSON.parse(evt.data);
       console.log(data);
    });
    
    sse.addEventListener("COMPLETE", function(evt) {
       console.log(evt);
       sse.close();
    });