kotlinsessionapache-flinkflink-streamingflink-statefun

How to aggregate the elements in the window sessions flink?


I'm using the flink Session windows when it does not receive elements for a certain period of time,i.e; when a gap of inactivity occurred it should emit an event.

I configured the gap as 10 seconds in the flink job. And I sent the event1 and sends event2 after 5 seconds. These two events should belong to first window. The output should be an aggregate of these two events. But I am getting only the first event.

below is the code I tried:

    fun setupJob(env: StreamExecutionEnvironment) {

        val testStream = env.sampleStream()
                    .keyBy { it.f0 }
                    .window(EventTimeSessionWindows.withGap(Time.seconds(10)))
                   .process(MyProcessWindowFunction())
    
            testStream.map { it.toKafkaMessage() }
                    .kafkaSink<SampleOutput>() }
}

then MyProcessWindowFunction looks like

class MyProcessWindowFunction : ProcessWindowFunction<Tuple4<String, inputA?, inputB?, inputC?>, Tuple2<String, SampleOutput?>,
            String, TimeWindow>() {
    

    private lateinit var sampleOutputState: ValueState<SampleOutputState>

    override fun open(parameters: Configuration) {
        val SampleOutputStateDescriptor = ValueStateDescriptor("sample-output-state", SampleOutputState::class.java)
        SampleOutputState = runtimeContext.getState(SampleOutputStateDescriptor)
    }

    override fun process(key: String, context: Context, elements: MutableIterable<Tuple4<String, inputA?, inputB?, inputC?>, out: Collector<Tuple2<String, SampleOutput?>>) {
        val current = sampleOutputState.value()

        val value = elements.iterator().next()

        val latestState = when {
            value.f2 != null -> processCondition(value.f2!!, current)
            else -> return
        }
        sampleOutputState.update(latestState)
        out.collect(Tuple2(key, latestState))
    }

    private fun processInputB(inputB: InputB, currentState: SampleOutputState?): SampleOutputState {
        return currentState?.copy(
                timestamp = System.currentTimeMillis(),
                eventTime = condition.eventTime,

        )  ?: 
        createInputBState(inputB)
    }

    private fun createInputBState(inputB: InputB): SampleOutputState = SampleOutputState(
            id = UUID.randomUUID().toString(),
            timestamp = System.currentTimeMillis(),
            eventTime = condition.eventTime,
          
    )

}

I'm getting the only event1 but I wanted to get the aggregate of both the events (that I sent event1 and event2).

How do we get the aggregate of the events that are available with in a session ?


Solution

  • All of the events assigned to the window will be in the iterable sent to the process method of your ProcessWindowFunction. You are currently only looking at the first element with

    val value = elements.iterator().next()

    You need to iterate through elements in order to produce an aggregated result.