I need your help. There is a reactive chain in a Spring WebFlux application that uses R2dbcRepository:
entityRepository //0
.findById(entityId) //1 Mono<Entity>
.doOnNext(e-> e.setValue("0")) //2
.flatMap(entity-> //3
entityRepository //4
.save(entity) //5 Mono<Entity>
.flatMap(this::process) //6 Mono<Result>
.flatMap(result-> { //7
entity.setValue("1"); //8
return entityRepository //9
.save(entity) //10 Mono<Entity>
.thenReturn(result); //11 Mono<Result>
}) //12
.flatMap(result-> /*REST request using WebFlux*/) //13 Mono<Body>
.flatMap(body -> { //14
entity.setValue("2"); //15
return entityRepository //16
.save(entity) //17 Mono<Entity>
.thenReturn(body); //18 Mono<Body>
}) //19
); //20 Mono<Body>
On line 3, there is an "entity" object that is modified later on lines 8 and 15 (probably in a different threads). Is this an unsafe publication of the "entity" object between different threads in reactive stream? Is this approach to writing a reactive chain incorrect?
At first glance, it appears that there are no concurrent (simultaneous) changes to the entity, and all processing is completed in a sequential manner, so it should work safely. It's important to remember to include the volatile
keyword in the value field if it's being accessed for both reading and writing in multiple threads.
However, I strongly recommend creating an immutable entity as it is a more dependable and error-free approach. If you plan on utilizing this entity instance in a wide scope (across various classes/modules within your application), it's better to use an immutable entity for safety purposes. The use of a mutable entity can lead to unpredictable consistency problems as your application progresses. Therefore, it's best to use mutable entities only in limited and highly encapsulated scopes.