springspring-bootkotlinspring-cloud-streamspring-cloud-stream-binder-kafka

Getting java.lang.ClassCastException on consuming an event payload of generic type


I have Spring Boot 3.1.1 kotlin application which is using Spring Cloud 2022.0.4 to consume Kafka events.

consumer function:

@Bean
    fun ipConsumer(): Consumer<Message<PayloadWithEmbeddedHeaders<InstructionalPlanCreatedEvent>>> {
        return Consumer {
            
            if (it.headers[PublisherKafka.EVENT_TYPE_HEADER_NAME] == InstructionalPlanCreatedEvent::class.java.simpleName) {
                process(ip = it.payload.body)
            }
        }
    }

Genreric type class:

data class PayloadWithEmbeddedHeaders<T>(val headers: Map<String, Any> = emptyMap(), val body: T)

event class:

data class InstructionalPlanCreatedEvent(
    val id: UUID,
    val organisationId: UUID,
    val organisationName: String? = null,
    val curriculumId: Long,
    val curriculumName: String? = null,
    val subjectId: Long,
    val subjectName: String? = null,
    val gradeId: Long,
    val gradeName: String? = null,
    val academicYearId: Long,
    val academicYear: String? = null
)

In my consumer function when it.payload.body is accessed it throws ClassCastException because it is coming as LinkedHashMap.

Also, it is not failing in binder test but fails with actual kafka event coming in.


Solution

  • This has been addressed with this commit.

    Can you please explicitly add this as a dependency?

    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-function-context</artifactId>
        <version>4.0.6-SNAPSHOT</version>
    </dependency>