quarkusapache-kafka-streams

Set lz4 topic compression for materialized state stores


Assume a Quarkus Kafka Streams application doing some kind of re-keying and aggregation with materialization like this:

final KTable<Long, GroupedCommunications> groupedCommunications = communications //
        .groupBy( //
                (key, value) -> KeyValue.pair(value.personId(), value), //
                Grouped.with( //
                        "communications-grouped", //
                        longKeySerde, //
                        communicationSerde)) //
        .aggregate( //
                GroupedCommunications::new, //
                (key, value, aggregate) -> aggregate.add(value), //
                (key, value, aggregate) -> aggregate.remove(value), //
                Named.as("communications-grouped-aggregated"), //
                Materialized //
                        .<Long, GroupedCommunications, KeyValueStore<Bytes, byte[]>>as("communications-grouped-aggregated-materialized") //
                        .withKeySerde(longKeySerde) //
                        .withValueSerde(groupedCommunicationSerde));

This implicitly creates two Kafka topics communications-grouped-repartition and communications-grouped-aggregated-materialized-changelog with compression type producer.

How do I change this to lz4? Setting kafka-streams.producer.compression-type=lz4 has no effect.


Solution

  • What do mean by "has no effect"? Is the data not written using lz4 by the producer? Or are you only asking about the topic config itself?

    Compression can be configured at a topic level config and as a producer level config, and both configs "interact". In the end, topic config for compression type set to "producer" tells the broker to write the data into the topic just accepting the format the producer picked -- if the producer is configured with lz4, data would be written in lz4.

    If you change the topic level compression config, the broker might need to re-compress the data to match the required format in case that the producer is configured with a different compression format, thus, keeping compression format at "producer" is usually the best thing to avoid re-compression overhead. For example, if the producer compression is turned-off, the broker would compress using lz4 before writing the data -- if producer compression is set to snappy, the broker would first decompress the data, and afterward recompress into lz4 before writing the data.

    Using the producer. prefix does change the producer config only (not the topic config), ie, how does the producer compress the data and send to the broker. Are you saying the producer is not use lz4 as configured?

    If you really want to, you can add topic specific configuration for internal topic to your Kafka Streams configuration using config name prefix topic., eg, topic.compression-type=lz4. This would change the topic level config, however, if the producer is not configured with compression (by default it's "none" IIRC), compression would now only happen broker side as explained above, most likely not what you want.