My task is simple, I need to query elasticsearch rolling index with reactive way.
As the @Document didn't support index name with Spring EL, like @Document(index = "indexName-#(new Date().format(yyyy-MM-dd))")
So I am trying to call elasticsearch with ReactiveElasticsearchTemplate which support me changing index name in runtime.
But as the data volume is larger than 10000, so I need to use scroll to repeat query until we get all data.
I have finished the first query and scroll query and it could return value.
But I need to combine all result and then return.
How can I do that? For now when the consumer still working, the empty result have been return to frontend. How can I ask the thread wait until the consumer finish the elasticsearch return all data? Thanks.
public Flux<ELKModel> getByTradeDateBetween(LocalDateTime from, LocalDateTime to)
throws Exception {
List<ELKModel> result = new ArrayList<ELKModel>();
List<Long> total = new ArrayList<>();
List<Long> currentSize = new ArrayList<>();
List<String> scrollId = new ArrayList<>();
NativeSearchQueryBuilder sourceBuilder = new NativeSearchQueryBuilder();
sourceBuilder.withQuery(
QueryBuilders.boolQuery().must(QueryBuilders.rangeQuery(TRADE_DATE).gte(from).lte(to)));
sourceBuilder.withPageable(PageRequest.of(0, SINGLE_QUERY_SIZE));
NativeSearchQuery query = sourceBuilder.build();
elasticsearchSupport
.scrollStart(query, ELKModel.class)
.map(ELKModelWrapper::valueFrom).
subscribe(
wrapper -> {
total.add(wrapper.getTotal());l
currentSize.add(wrapper.getCurrentSize());
result.addAll(wrapper.getResults());
scrollId.add(wrapper.getScrollId());
}).dispose();
while (currentSize.size() == 1 && total.size() == 1 && currentSize.get(0) < total.get(0)) {
elasticsearchSupport
.scrollContinue(scrollId.get(0), ELKModel.class)
.map(ELKModelWrapper::valueFrom)
.subscribe(
wrapper -> {
currentSize.add(0, currentSize.get(0) + wrapper.getCurrentSize());
result.addAll(wrapper.getResults());
scrollId.add(0, wrapper.getScrollId());
}).dispose();
}
return Flux.fromIterable(result);
}
You must use a pretty outdated version of Spring Data Elasticsearch. SpEL support for index names in the @Document
annotation was introduced 4 years ago, you can see how this can be used in my post at https://www.sothawo.com/2020/07/how-to-provide-a-dynamic-index-name-in-spring-data-elasticsearch-using-spel/.
As for the reactive thing: you must never block a thread in reactive code. And the reactive code does this scrolling under the hood, I don't see why you want to do this by yourself.