apache-kafkaapache-kafka-streamsmaterialized-viewsktable

KTable not updating (immediately) when new messages are put on input stream


I have a kafka topic of Strings with an arbitrary key. I want to create a topic of characters in string : value pairs, e.g:

input("key","value") -> outputs (["v","value"],["a","value"],...)

To keep it simple, my input topic has a single partition, and thus the KTable code should be receiving all messages to a single instance.

I have created the following sandbox code, which builds the new table just fine , but doesn't update when a new item is put into the original topic:

import java.util.LinkedHashSet;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StoreQueryParameters;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;

public class Sandbox 
{
    private final static String kafkaBootstrapServers = "192.168.1.254:9092";
    private final static String kafkaGlobalTablesDirectory = "C:\\Kafka\\tmp\\kafka-streams-global-tables\\";
    private final static String topic = "sandbox";
    private static KafkaStreams streams;
    public static void main(String[] args) 
    {
        // 1. set up the test data
        Properties producerProperties = new Properties();
        producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
        producerProperties.put(ProducerConfig.CLIENT_ID_CONFIG, Sandbox.class.getName() + "_testProducer");
        producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        Producer<String, String> sandboxProducer = new KafkaProducer<>(producerProperties);

        sandboxProducer.send(new ProducerRecord<String, String>(topic,"uvw","uvw"));
        
        // 2. read the test data and check it's working
        ReadOnlyKeyValueStore<String, String> store = getStore();
        
        printStore(store.all());
        System.out.println("-------------ADDING NEW VALUE----------------");
        sandboxProducer.send(new ProducerRecord<String, String>(topic,"xyz","xyz"));
        System.out.println("-------------ADDED NEW VALUE----------------");
        printStore(store.all());
        
        sandboxProducer.close();
        streams.close();
    }
    
    private static void printStore(KeyValueIterator<String, String> i)
    {
        System.out.println("-------------PRINT START----------------");
        while (i.hasNext())
        {
            KeyValue<String, String> n = i.next();
            System.out.println(n.key + ":" +  String.join(",", n.value));
        }
        System.out.println("-------------PRINT END----------------");
    }
    
    private static ReadOnlyKeyValueStore<String, String> getStore()
    {
        ReadOnlyKeyValueStore<String, String> store = null;
        String storeString = "sandbox_store";
        StreamsBuilder builder = new StreamsBuilder();
        builder.stream(topic
                , Consumed.with(Serdes.String(),Serdes.String()))
            .filter((k,v)->v!=null)
            .flatMap((k,v)->{
                Set<KeyValue<String, String>> results = new LinkedHashSet<>();
                if (v != null)
                {
                    for (char subChar : v.toCharArray())
                    {
                        results.add(KeyValue.<String, String>pair(new String(new char[] {subChar}), v));
                    }
                }
                return results;
            })
            .groupByKey(Grouped.with(Serdes.String(), Serdes.String())) 
            .aggregate(()->new String()
                    , (key, value, agg) -> {
                        agg = agg + value;
                        return agg;
                    }
                    ,Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as(storeString)
                        .withKeySerde(Serdes.String())
                        .withValueSerde(Serdes.String()));

        final Properties streamsConfiguration = new Properties();
        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "sandbox");
        streamsConfiguration.put(StreamsConfig.CLIENT_ID_CONFIG, Sandbox.class.getName());
        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, kafkaGlobalTablesDirectory + "Sandbox");
        streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        streams = new KafkaStreams(builder.build(), streamsConfiguration);
        
        streams.setUncaughtExceptionHandler((Thread thread, Throwable throwable) -> {
              System.out.println("Exception on thread " + thread.getName() + ":" + throwable.getLocalizedMessage());
            });
        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));

        streams.cleanUp(); // clear any old streams data - forces a rebuild of the local caches.
        streams.start(); // hangs until the global table is built
        
        StoreQueryParameters<ReadOnlyKeyValueStore<String,  String>> storeSqp 
        = StoreQueryParameters.fromNameAndType(storeString
        ,QueryableStoreTypes.<String, String>keyValueStore());
        
        // this while loop gives time for Kafka Streams to start up properly before creating the store
        while (store == null)
        {
            try {
                TimeUnit.SECONDS.sleep(1);
                store = streams.store(storeSqp);
                System.out.println("Store " + storeString + " Created successfully.");
            } catch (InterruptedException e) {
            }
            catch (Exception e) {
                System.out.println("Exception creating store " + storeString + ". Will try again in 1 second. Message: " + e.getLocalizedMessage());
            }
        }
        return store;
    }
}

The output I am getting is as follows:

Store sandbox_store Created successfully.
-------------PRINT START----------------
u:uvw
v:uvw
w:uvw
-------------PRINT END----------------
-------------ADDING NEW VALUE----------------
-------------ADDED NEW VALUE----------------
-------------PRINT START----------------
u:uvw
v:uvw
w:uvw
-------------PRINT END----------------

Note that the xyz I added has gone missing!

(p.s. I know I could use reduce instead of aggregate, but in practise the new value would be a different type, not a string, so it wouldn't work for my actual use-case)

Now, if I add a 10 second pause before printing the second time; or if I then restart the Sandbox class without clearing the topic, the first xyz shows up then. So it's clearly that there's a time delay somewhere in the system. And in practise I'm dealing with 300mb+ of messages all going onto the input topic at once, once an hour; and so the delay is even longer than just a few seconds.

How can I help speed things up?


Solution

  • The problem is that the processing is asynchronous. You can adjust the streamsConfiguration (e.g. COMMIT_INTERVAL_MS_CONFIG and FETCH_MIN_BYTES_CONFIG) to speed this up, but ultimately it's going to be a timing thing. If you care about a specific message (e.g. in a junit test scenario), you can use callbacks to check for the message in the store before closing