apache-kafkaapache-flink

Flink and Kafka configuration for aggregated count by topic


We want to track all visits by country. so, our click tracker will send payload containing country to its corresponding topic (1 country maps to 1 topic) where one visit, no matter the page, so long it's in our domain, will create one payload. traffic averages to millions to lower tens of millions per minute and can peak to hundreds of millions per country per minute for our top countries. We want to get :

we're new to using flink and want to ask about the best practice for our traffic volume:

  1. Do we need to partition each country topic? if yes, what should be the partition key?
  2. If we do need partitioning, that would have no effect on flink job count per topic, correct? I dont see any need to have separate job for different partition in the same topic
  3. The best practice is to provision one job for each topic (country), so, 1 flink job per country. Is this correct?
  4. Window size is 24 hour and sliding interval is 5 minutes, correct?
  5. What's the better solution for total all country's visit? union all topics (countries) in 1 flink job or create another topic global and have click tracker always send duplicate payload one to global and one to country topic and have another flink job consuming from global topic?
  6. at most, we'll only have 24 hour = 1440 minutes divided by 5-minute interval = 288 active windows per job, and we only need to store visit count (long 8 byte). so, total is 288 x 8 byte ~ 3-4KB. this means flink can just use JVM heap for state management, no need for RocksDB or file system and pair with S3 for long-term persistence, correct?
  7. Should flink have another kafka topic containing aggregated count per topic (country) as the sink or DB? we want to show both global and by-country visit count to our user page regularly (5-minute refresh rate)

Thanks


Solution

  • In general, Flink workflows perform well when you don't over-engineer them. So you want to start with the simplest approach that might work, and then figure out whether you need to make things more complicated.

    To that end, here's the configuration for your use case that I'd recommend starting with (the actual counts for number of partitions, Flink cluster capacity, etc depend on your data volumes)...

    1. Have one Kafka topic, with 128 partitions. Data gets loaded into random partitions (round-robin), not per-country.
    2. Have a single Flink workflow, with parallelism = 64.
    3. The workflow is simple, with a .keyBy(r -> r.getCountry()), then a .window(TumblingEventTimeWindow) with the 24 hour size and 5 minute slide you described.
    4. The window is followed by a .reduce() or an .aggregate(), depending on what result you need.
    5. This stream is both written to your per-country output sink, and is the input stream for a .windowAll(TumblingEventTimeWindow) with a .reduce() or an .aggregate(). Since the event time for the results of the first per-country aggregation are set to the window end, the subsequent .windowAll() will also put counts into the correct windows. And because the input to the .windowAll() has already been rolled up per-country, the fact that the .windowAll() has an implicit parallelism of 1 won't result in a performance bottleneck.

    The above assumes that you're using a WatermarkStrategy that's correctly setting the event time for incoming events, of course.

    In answer to your question about the size of state, you'll have roughly 288 x entries. The actual amount of memory for each entry depends on which state backend you're using, but yes you should be able to use the HashMapStateBackend. But note that you STILL want to set up checkpointing to a distributed file system (S3 or HDFS, typically) to save this state in a durable system separate from the Flink cluster.

    And finally, to answer your question about where to save the results...that depends on your dashboard system. But given the low data rate for results (roughly <# of countries + 1> x 12 entries/hour), a regular DB using the JDBC sink should be fine.