javaakkaakka-stream

Using generic class in Akka Stream BroadcastHub


I am trying to create a BroadcastHub of type ConsumerRecord<String, String> from the Source of the same element type, but generic class type is not allowed.

RunnableGraph<Source<ConsumerRecord<String, String>, NotUsed>> graph =
                source.toMat(
                        BroadcastHub.of(ConsumerRecord.class, 256),
                        Keep.right()
                );

Found similar question has been answered here, but looks like there isn't similar approach for BroadcastHub

Currently I only have an option of wrap the generic type into a wrapper class

RunnableGraph<Source<ConsumerRecordWrapper, NotUsed>> graph =
                source.map(ConsumerRecordWrapper::new)
                        .toMat(
                                BroadcastHub.of(ConsumerRecordWrapper.class, 256),
                                Keep.right()
                        );

Is there a better solution for this?


Solution

  • The root problem is erasure. Since the ConsumerRecord.class exists just to guide the type inference (at runtime, the class object is the same regardless of type and effectively behaves like its always ConsumerRecord<Object, Object>), it's safe to use an unchecked cast of the Class object to make the types work out.

    So something like this compiles for me

    @SuppressWarnings("unchecked")
    final Class<ConsumerRecord<String, String>> consumerRecordStringToStringClass = (Class<ConsumerRecord<String, String>>)(Class<?>)ConsumerRecord.class;
    
    RunnableGraph<Source<ConsumerRecord<String, String>, NotUsed>> graph =
      source.toMat(
        BroadcastHub.of(consumerRecordStringToStringClass, 256),
        Keep.right()
      );
    

    (This is a specialization of some of these answers)