kotlinspring-webfluxreactive-programmingproject-reactorreactor-netty

Reactor netty, using Flux::groupBy results in freezing forever


I have two list with difference types of object but having the same attribute ID. for instance

list 1 => [{ id: "123, x: "xxx" }] | list 2 => [{ id: "123", y: "yyy" }, {id: "456", y: "yyy"}]

so I want to combine these two list into a new list by using group by method. This is what I have tried.

  // both list are not sorted and every id in the list_1 exists in list_2
  // list_1 data class Object1(val id: String, val x: String)
  val flux1 = Flux.fromIterable(list_1)
                  .map { obj -> obj.id to obj }
  // list_2 data class Object2(val id: String, val y: String)
  val flux2 = Flux.fromIteablle(list_2)
                  .map { obj -> obj.id to obj }

  Flux.merge(flux1, flux2)
      .groupBy { (id, obj) -> id }
      .flatMap { gFlux ->
        gFlux
          .map { (id, obj) -> obj }
          .collectList()
          .filter { it.size == 2 }
          .map { (obj1, obj2) -> Object3(obj1, obj2) }
      }
      .collectList()

but with large size of list 2 it started to freeze forever and I don't know why. I temporary fixed it by filter flux before grouping it like this since I have to group these flux based on the list_1 anyway.

Flux.merge(flux1, flux2)
    .filter { (id, obj) -> id in list_1.map { it.id } }

So I wonder why is it freezing and what's the proper solution to solve this problem or are there any better solutions for grouping two list based on the first one?


Solution

  • groupBy doesn't work for large groups (javadoc):

    Notably when the criteria produces a large amount of groups, it can lead to hanging if the groups are not suitably consumed downstream (eg. due to a flatMap with a maxConcurrency parameter that is set too low).

    So instead of using a groupBy you should use a different way aggregate the results. In your case it could be something like:

    Flux.merge(flux1, flux2)
      .reduce(mutableMapOf(), { (map, (id, obj)) ->
          if (!map.containsKey(id) {
             map[id] = mutableListOf()
          }
          map[id].add(obj)
          map
      }
      .flatMap { Flux.fromIterable(it.entries()) }
      .filter { it.value.size == 2 }
      .map { it.value }
      .map { (obj1, obj2) -> Object3(obj1, obj2) }     
      .collectList()
    

    It's not very effective, though. Because you keep in memory all you data until the last step, which is the same problem as groupBy is having. But in this case you don't consume all the schedulers, so it works better. If you can drop some groups when reducing, like groups with more that 2 elements, that would be more optimal.