javaapache-kafkaapache-kafka-streams

How to extract timestamp embedded in messages in Kafka Streams


I want to extract Timestamps embedded with each message and send them as json payload into my database.

I want to get the following three timestamps.

Event-time: The point in time when an event or data record occurred, i.e. was originally created “by the source”.

Processing-time: The point in time when the event or data record happens to be processed by the stream processing application, i.e. when the record is being consumed.

Ingestion-time: The point in time when an event or data record is stored in a topic partition by a Kafka broker.

This is my streams application code:

Properties props = new Properties();

props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_URL + ":9092"); // pass from env localhost:9092 ,BROKER_URL + ":9092"
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

final StreamsBuilder builder = new StreamsBuilder();

KStream<String, String> source_o365_user_activity = builder.stream("o365_user_activity");

source_o365_user_activity.flatMapValues(new ValueMapper<String, Iterable<String>>() {
    @Override
    public Iterable<String> apply(String value) {
        System.out.println("========> o365_user_activity_by_date Log:     " + value);
        ArrayList<String> keywords = new ArrayList<String>();
        try {
            JSONObject send = new JSONObject();
            JSONObject received = new JSONObject(value);

            send.put("current_date", getCurrentDate().toString()); // UTC TIME
            send.put("activity_time", received.get("CreationTime")); // CONSTANTS FINAL STATIC(Topic Names, Cassandra keys)
            send.put("user_id", received.get("UserId"));
            send.put("operation", received.get("Operation"));
            send.put("workload", received.get("Workload"));
            keywords.add(send.toString());

        } catch (Exception e) {
            // TODO: handle exception
            System.err.println("Unable to convert to json");
            e.printStackTrace();
        }

        return keywords;
    }
}).to("o365_user_activity_by_date");

In the code I am simply getting each record, doing some stream processing on it and sending it to a different topic.

Now with each record I want to send Event-time, Processing-time and Ingestion-time embedded in the payload.

I have looked at the FailOnInvalidTimestamp and WallclockTimestampExtractor but I am confused on how to use them.

Kindly guide me how can I achieve this.


Solution

  • The Timestamp extractor can only give you one timestamp and this timestamp is used for time-based operations like windowed-aggregations or joins. It seems that you don't do any time-based computation thought, thus, from a computation point of view, it does not matter.

    Note, that a record has only one metadata timestamp field. This timestamp field can be used to store an event-timestamp that can be set by the producer (ie, the app code can set it, or the producer will default to System.currentTimeMillis() otherwise). As an alternative, you can let the broker overwrite the producer provided timestamp with the broker ingestion time. This is a topic configuration.

    To access the record metadata timestamp (independent if it's event-time or ingestion-time), use the default timestamp extractor that will give you this timestamp.

    Newer version of Kafka (2.7+)

    If you want to access it in your application, you need to use Processor API, ie, in your case a .process() instead of a .flatMap operator. Inside the Processor you get Record objects which allow to you access the record's timestamp.

    Older version of Kafka (<= 2.6)

    If you want to access it in your application, you need to use Processor API, ie, in your case a .transform() instead of a .flatMap operator. Your Transformer will be initialized with a context object that allows you to access the extracted timestamp.

    Because a record can only store one metadata timestamp and because you want to use this for broker ingestion time, the upstream producer must put the event-timestamp into the payload directly.

    Newer version of Kafka (3.0+)

    For processing-time, you can use the context object passed into the Processor's init method, using #currentSystemTimeMs().

    Newer version of Kafka (<= 2.8)

    For processing-time, just do a system call as indicated in your code snippet already.