kotlinapache-kafkaapache-kafka-streamskotlinx.serialization

KTable-KTable FK join: can't select foreign key serde


summary

I'm trying to do a KTable-KTable foreign-key join, but I get an error because the Kafka Streams is trying to use a String serde for the foreign key.

I want it to use a Kotlinx Serialization serde. How can I specify this?

details

I want to join the data of two KTables together, using a FK selector and remapping the values into an aggregating object.

tilesGroupedByChunk
  .join<ChunkTilesAndProtos, SurfaceIndex, SurfacePrototypesData>(
    tilePrototypesTable, // join the prototypes KTable
    { cd: MapChunkData -> cd.chunkPosition.surfaceIndex }, // FK join on SurfaceIndex
    { chunkTiles: MapChunkData, protos: SurfacePrototypesData ->
      ChunkTilesAndProtos(chunkTiles, protos) // remap value 
    },
    namedAs("joining-chunks-tiles-prototypes"),
    materializedAs(
      "joined-chunked-tiles-with-prototypes",
      // `.serde()`- helper function to make a Serde from a Kotlinx Serialization JSON module 
      // see https://github.com/adamko-dev/kotka-streams/blob/38388e74b16f3626a2733df1faea2037b89dee7c/modules/kotka-streams-kotlinx-serialization/src/main/kotlin/dev/adamko/kotka/kxs/jsonSerdes.kt#L48
      jsonMapper.serde(),
      jsonMapper.serde(),
    ),
  )

However, I get an error, because Kafka Streams is using Serdes.String() (my default Serde) for deserializing the foreign key. But it's a JSON object, I want it to use Kotlinx Serialization.

org.apache.kafka.streams.errors.StreamsException: ClassCastException invoking Processor. 
Do the Processor's input types match the deserialized types? 
Check the Serde setup and change the default Serdes in 
StreamConfig or provide correct Serdes via method 
parameters. Make sure the Processor can accept the 
deserialized input of type key: myproject.MyTopology$MapChunkDataPosition, and value: org.apache.kafka.streams.kstream.internals.Change.

Note that although incorrect Serdes are a common cause 
of error, the cast exception might have another cause 
(in user code, for example). For example, if a 
processor wires in a store, but casts the generics 
incorrectly, a class cast exception could be raised 
during processing, but the cause would not be wrong Serdes.

background

The data I'm working with is from a computer game. The game has a map, called a surface. Each surface is uniquely identified by a surface index. Each surface has tiles, on an x/y plane. The tiles have a 'prototype name', which is the ID of a TilePrototype. Each TilePrototype has information about what that tile does, or looks like. I need it for the colour.

topology

group tiles by chunk

First I group the tiles into chunks of 32x32, and then group those into a KTable.

/** Each chunk is identified by the surface, and an x/y coordinate */
@Serializable
data class MapChunkDataPosition(
  val position: MapChunkPosition,
  val surfaceIndex: SurfaceIndex,
)

/** Each chunk has 32 tiles */
@Serializable
data class MapChunkData(
  val chunkPosition: MapChunkDataPosition,
  val tiles: Set<MapTile>,
)

// get all incoming tiles and group them by chunk,
// this works successfully
val tilesGroupedByChunk: KTable<MapChunkDataPosition, MapChunkData> =
  buildChunkedTilesTable(tilesTable)
group prototypes by surface index

Then I collect all prototypes by surface index, and aggregate them into a list


/** Identifier for a surface (a simple wrapper, so I can use a Kotlinx Serialization serde everywhere)*/
@Serializable
data class SurfaceIndex(
  val surfaceIndex: Int
)

/** Each surface has some 'prototypes' - I want this because each tile has a colour */
@Serializable
data class SurfacePrototypesData(
  val surfaceIndex: SurfaceIndex,
  val mapTilePrototypes: Set<MapTilePrototype>,
)

// get all incoming prototypes and group them by surface index,
// this works successfully
val tilePrototypesTable: KTable<SurfaceIndex, SurfacePrototypesData> =
  tilePrototypesTable()
ktable-ktable fk join

This is the code that causes the error

/** For each chunk, get all tiles in that chunk, and all prototypes */
@Serializable
data class ChunkTilesAndProtos(
  val chunkTiles: MapChunkData,
  val protos: SurfacePrototypesData
)

tilesGroupedByChunk
  .join<ChunkTilesAndProtos, SurfaceIndex, SurfacePrototypesData>(
    tilePrototypesTable, // join the prototypes
    { cd: MapChunkData -> cd.chunkPosition.surfaceIndex }, // FK join on SurfaceIndex
    { chunkTiles: MapChunkData, protos: SurfacePrototypesData ->
      ChunkTilesAndProtos(chunkTiles, protos) // remap value 
    },
    namedAs("joining-chunks-tiles-prototypes"),
    materializedAs(
      "joined-chunked-tiles-with-prototypes",
      // `.serde()`- helper function to make a Serde from a Kotlinx Serialization JSON module 
      // see https://github.com/adamko-dev/kotka-streams/blob/38388e74b16f3626a2733df1faea2037b89dee7c/modules/kotka-streams-kotlinx-serialization/src/main/kotlin/dev/adamko/kotka/kxs/jsonSerdes.kt#L48
      jsonMapper.serde(),
      jsonMapper.serde(),
    ),
  )

full stack trace

org.apache.kafka.streams.errors.StreamsException: ClassCastException invoking Processor. Do the Processor's input types match the deserialized types? Check the Serde setup and change the default Serdes in StreamConfig or provide correct Serdes via method parameters. Make sure the Processor can accept the deserialized input of type key: MyProject.processor.Topology$MapChunkDataPosition, and value: org.apache.kafka.streams.kstream.internals.Change.
Note that although incorrect Serdes are a common cause of error, the cast exception might have another cause (in user code, for example). For example, if a processor wires in a store, but casts the generics incorrectly, a class cast exception could be raised during processing, but the cause would not be wrong Serdes.
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:150)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:253)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:232)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:191)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172)
at org.apache.kafka.streams.kstream.internals.KTableMapValues$KTableMapValuesProcessor.process(KTableMapValues.java:131)
at org.apache.kafka.streams.kstream.internals.KTableMapValues$KTableMapValuesProcessor.process(KTableMapValues.java:105)
at org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:146)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:253)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:232)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:186)
at org.apache.kafka.streams.kstream.internals.TimestampedCacheFlushListener.apply(TimestampedCacheFlushListener.java:54)
at org.apache.kafka.streams.kstream.internals.TimestampedCacheFlushListener.apply(TimestampedCacheFlushListener.java:29)
at org.apache.kafka.streams.state.internals.MeteredKeyValueStore$1.apply(MeteredKeyValueStore.java:182)
at org.apache.kafka.streams.state.internals.MeteredKeyValueStore$1.apply(MeteredKeyValueStore.java:179)
at org.apache.kafka.streams.state.internals.CachingKeyValueStore.putAndMaybeForward(CachingKeyValueStore.java:107)
at org.apache.kafka.streams.state.internals.CachingKeyValueStore.lambda$initInternal$0(CachingKeyValueStore.java:87)
at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:151)
at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:109)
at org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:136)
at org.apache.kafka.streams.state.internals.CachingKeyValueStore.flushCache(CachingKeyValueStore.java:345)
at org.apache.kafka.streams.state.internals.WrappedStateStore.flushCache(WrappedStateStore.java:71)
at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flushCache(ProcessorStateManager.java:487)
at org.apache.kafka.streams.processor.internals.StreamTask.prepareCommit(StreamTask.java:402)
at org.apache.kafka.streams.processor.internals.TaskManager.commitAndFillInConsumedOffsetsAndMetadataPerTaskMap(TaskManager.java:1043)
at org.apache.kafka.streams.processor.internals.TaskManager.commit(TaskManager.java:1016)
at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1017)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:786)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:583)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:555)
Caused by: java.lang.ClassCastException: class MyProjectTopology$MapChunkData cannot be cast to class java.lang.String (MyProject.processor.MyProject$MapChunkData is in unnamed module of loader 'app'; java.lang.String is in module java.base of loader 'bootstrap')
at org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:29)
at org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ForeignJoinSubscriptionSendProcessorSupplier$UnbindChangeProcessor.process(ForeignJoinSubscriptionSendProcessorSupplier.java:99)
at org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ForeignJoinSubscriptionSendProcessorSupplier$UnbindChangeProcessor.process(ForeignJoinSubscriptionSendProcessorSupplier.java:69)
at org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:146)
... 30 common frames omitted

versions


Solution

  • Somewhat expectedly, I had made a mistake further up in the topology definition.

    In the final stage of creating one of the tables, I mapped the values - but I did not specify the serdes.

        .mapValues { _, v ->
          ChunkTilesAndProtos(v.tiles, v.protos)
        }
    

    So I changed it to specify the serdes.

        .mapValues(
          "finalise-web-map-tile-chunk-aggregation",
          materializedAs("web-map-tile-chunks", jsonMapper.serde(), jsonMapper.serde())
        ) { _, v ->
          ChunkTilesAndProtos(v.tiles, v.protos)
        }
    // note: this uses extension functions from github.com/adamko-dev/kotka-streams 
    

    It was not easy to find this. I found it by putting a breakpoint in the constructor of AbstractStream.java (among other constructors) to see when the keySerde and valueSerde fields were not being set.

    Sometimes a null serde is expected (I think some KTables/KStreams are 'virtual' and do not encode/decode to/from Kafka topics). However I was able to find the operation that caused my problem, and define serdes because I was changing the value type.