I'm using the flink Session windows when it does not receive elements for a certain period of time,i.e; when a gap of inactivity occurred it should emit an event.
I configured the gap as 10 seconds in the flink job. And I sent the event1 and sends event2 after 5 seconds. These two events should belong to first window. The output should be an aggregate of these two events. But I am getting only the first event.
below is the code I tried:
fun setupJob(env: StreamExecutionEnvironment) {
val testStream = env.sampleStream()
.keyBy { it.f0 }
.window(EventTimeSessionWindows.withGap(Time.seconds(10)))
.process(MyProcessWindowFunction())
testStream.map { it.toKafkaMessage() }
.kafkaSink<SampleOutput>() }
}
then MyProcessWindowFunction looks like
class MyProcessWindowFunction : ProcessWindowFunction<Tuple4<String, inputA?, inputB?, inputC?>, Tuple2<String, SampleOutput?>,
String, TimeWindow>() {
private lateinit var sampleOutputState: ValueState<SampleOutputState>
override fun open(parameters: Configuration) {
val SampleOutputStateDescriptor = ValueStateDescriptor("sample-output-state", SampleOutputState::class.java)
SampleOutputState = runtimeContext.getState(SampleOutputStateDescriptor)
}
override fun process(key: String, context: Context, elements: MutableIterable<Tuple4<String, inputA?, inputB?, inputC?>, out: Collector<Tuple2<String, SampleOutput?>>) {
val current = sampleOutputState.value()
val value = elements.iterator().next()
val latestState = when {
value.f2 != null -> processCondition(value.f2!!, current)
else -> return
}
sampleOutputState.update(latestState)
out.collect(Tuple2(key, latestState))
}
private fun processInputB(inputB: InputB, currentState: SampleOutputState?): SampleOutputState {
return currentState?.copy(
timestamp = System.currentTimeMillis(),
eventTime = condition.eventTime,
) ?:
createInputBState(inputB)
}
private fun createInputBState(inputB: InputB): SampleOutputState = SampleOutputState(
id = UUID.randomUUID().toString(),
timestamp = System.currentTimeMillis(),
eventTime = condition.eventTime,
)
}
I'm getting the only event1 but I wanted to get the aggregate of both the events (that I sent event1 and event2).
How do we get the aggregate of the events that are available with in a session ?
All of the events assigned to the window will be in the iterable sent to the process
method of your ProcessWindowFunction
. You are currently only looking at the first element with
val value = elements.iterator().next()
You need to iterate through elements
in order to produce an aggregated result.