springredisspring-webfluxserver-sent-eventsspring-data-redis-reactive

Strange behavior with SSE endpoint using Redis


I need to push some data to the client if it is in Redis, but client keeps reconnecting to the SSE endpoint every 5 seconds.

The backend code:

@RestController
@RequestMapping("/reactive-task")
public class TaskRedisController {

    private final TaskRedisRepository taskRedisRepository;

    TaskRedisController(TaskRedisRepository taskRedisRepository){
        this.taskRedisRepository = taskRedisRepository;
    }

   @CrossOrigin
    @GetMapping(value = "/get/{id}")
    public Flux<ServerSentEvent<Task>> getSseStream2(@PathVariable("id") String id) {
        return taskRedisRepository.findByTaskId(id)
            .map(task -> ServerSentEvent.<Task>builder().data(task).build());
    }
}

@Repository
public class TaskRedisRepository {
    public Flux<Task> findByTaskId(String id) {
        return template.keys("task:" + id).flatMap(template.opsForValue()::get);
    } 
}


@AllArgsConstructor
@NoArgsConstructor
@Getter
@Setter
@EqualsAndHashCode
@Entity
public class Task {
    @Id
    @GeneratedValue(strategy=GenerationType.IDENTITY)
    private Long id;
    @Column(length = 25)
    private String result;
}

The client consumes using JS:

var evt = new EventSource("http://localhost:8080/reactive-task/get/98"); evt.onmessage = function(event) {
  console.log(event);
};

Can anyone point me in the right direction?
Any advice would be appreciated.

Update: I need to store data for some time (5-10 mins) in Redis. Update: I wrote similar code on MongoDB and it works fine.


Solution

  • In this case, taskRedisRepository.findByTaskId(id) is probably sending a finite Flux - meaning several elements and a complete signal finishing the stream.

    Spring WebFlux will interpret the onComplete signal as the end of the stream and will close it. The default behavior of browser SSE clients is to reconnect right away to the endpoint in case the connection has been terminated.

    If you wish to keep a persistent connection and only be notified of new elements being added, you need to leverage that directly as a feature of your datastore. For Redis, this mode is supported with the pub/sub feature (see reference documentation).

    To summarize, I think you're seeing the expected behavior as in this case your datastore won't produce an infinite stream notifying you of new elements added to the collection, but rather a finite stream of elements present in that collection at a given time.