cassandracompletable-futuredatastax-java-drivercompletion-stage

What can I use as Replacement for ResultSetFuture in Cassandra 4.x?


I have to upgrade Cassandra to 4.x. This was the code previously written in cassandra 3.5

 protected <T> Stream<T> getAll(Stream<Statement> statements, Mapper<T> mapper) {
        List<ResultSetFuture> futures = statements
                .peek(p -> cassandraReads.inc())
                .map(s -> session.session().executeAsync(s))
                .collect(Collectors.toList());

        return futures.stream()
                .map(ResultSetFuture::getUninterruptibly)
                .map(mapper::map)
                .flatMap(r -> StreamSupport.stream(r.spliterator(), false));
    }

I have done some changes...

 List<CompletionStage<AsyncResultSet>> futures = statements
                .peek(p -> cassandraReads.inc())
                .map(s -> session.getSession().executeAsync(s))
                .collect(Collectors.toList());

BUT what should I use in place of .map(ResultSetFuture::getUninterruptibly) since it has been removed now. Since I am new to Cassandra and asynchronous programming any help would be appreciated.


Solution

  • Since you are already mixing async and blocking calls in your code, I'd suggest that you go fully blocking and use Session.execute instead of Session.executeAsync. It's going to make things much easier. Then you could rewrite your code as follows:

    protected <T> Stream<T> getAll(Stream<Statement<?>> statements, Mapper<T> mapper) {
      return statements
          .peek(p -> cassandraReads.inc())
          .map(s -> session.execute(s))
          .flatMap(rs -> StreamSupport.stream(rs.spliterator(), false))
          .map(mapper::map);
    }
    

    Note: I'm changing slightly your Mapper interface, which I assume would be:

    public interface Mapper<T> {
      T map(Row row);
    }
    

    (You could actually replace this Mapper interface with just Function<Row, T> btw.)

    Other than that, as Erick Ramirez said, please have a look at this page to understand how to write proper async code with driver 4.x.