apache-kafkastreamingksqldb

ksqlDB - How to explode rows for each day between 2 timestamp columns?


I have a topic that contains a startDate and an endDate attribute. When building the stream I would like to explode the single row into multiple rows, one for each day between start and end dates. For didactic reasons, I'm converting timestamps into string dates in the examples below.

Input data:

{ "id": 1, "startDate": "2023-03-01", "endDate": "2023-03-05" }
{ "id": 2, "startDate": "2023-03-02", "endDate": "2023-03-06" }

Some initial stream loading from the topic:

CREATE STREAM mystream (
   id INT
  ,startDate TIMESTAMP
  ,endDate TIMESTAMP
>) WITH (
  kafka_topic='mytopic',
  value_format='json',
  partitions=1
);

The desired streaming result after transformations and explosions:

{ "id": 1, "date": "2023-03-01" }
{ "id": 1, "date": "2023-03-02" }
{ "id": 1, "date": "2023-03-03" }
{ "id": 1, "date": "2023-03-04" }
{ "id": 1, "date": "2023-03-05" }
{ "id": 2, "date": "2023-03-02" }
{ "id": 2, "date": "2023-03-03" }
{ "id": 2, "date": "2023-03-04" }
{ "id": 2, "date": "2023-03-05" }
{ "id": 2, "date": "2023-03-06" }

Is it possible to do something like this with ksqlDB?


Solution

  • Asked the question in the ksqlDB forum and got an answer there: https://forum.confluent.io/t/how-to-explode-rows-for-each-day-between-2-timestamp-columns/7323

    So I basically need to create a user-defined table function following the steps here.

    The Java program to do the magic is here:

    package com.example;
    
    import io.confluent.ksql.function.udtf.Udtf;
    import io.confluent.ksql.function.udtf.UdtfDescription;
    import io.confluent.ksql.function.udf.UdfParameter;
    
    import java.sql.Timestamp;
    import java.time.LocalDate;
    import java.util.ArrayList;
    import java.util.List;
    
    functions
    @UdtfDescription(name = "date_explode",
             author = "Rafael",
             version = "1.0.0",
             description = "Given a start and end date, explode the period into a list of dates.")
    public class DateExplodeUdtf {
    
      @Udtf(description = "start and end date are days from epoch")
      public List<Timestamp> dateExplode(
        @UdfParameter(value = "startDate", description = "start date (days from epoch)") final Integer startDate,
        @UdfParameter(value = "endDate", description = "end date (days from epoch)") final Integer endDate
      ) {
        List<Timestamp> result = new ArrayList<>();
        LocalDate startDateComputed = LocalDate.ofEpochDay(startDate);
        LocalDate endDateComputed = LocalDate.ofEpochDay(endDate);
        for (LocalDate date = startDateComputed; !date.isAfter(endDateComputed); date = date.plusDays(1)) {
          result.add(Timestamp.valueOf(date.atStartOfDay()));
        }
        return result;
      }
    }
    

    And when running:

    SELECT
       id
      ,DATE_EXPLODE(startDate, endDate) AS date
    FROM mystream;