I have a Flink application on AWS Kinesis Analytics service. I need to filter some values on a data stream based on a threshold. Also, I'm passing the threshold parameter using AWS Systems Manager Parameter Store service. For now, I got this:
val threshold: Int = ssmParameter.getParameterRequest(ssmClient, "/kinesis/threshold").toInt
val kinesis_deserialization_schema = new KinesisDeserialization[ID]
val KinesisConsumer = new FlinkKinesisConsumer[ID](
"Data-Stream",
kinesis_deserialization_schema,
consumerProps
)
val KinesisSource = env.addSource(KinesisConsumer).name(s"Kinesis Data")
val valid_data = KinesisSource
.filter(new MyFilter[ID](threshold))
.name("FilterData")
.uid("FilterData")
import cl.mydata.InputData
import org.apache.flink.api.common.functions.FilterFunction
class MyFilter[ID <: InputData](
threshold: Int
) extends FilterFunction[ID] {
override def filter(value: ID): Boolean = {
value.myvalue > threshold
}
}
}
This works fine, the thing is that I need to update the threshold parameter every hour, because that value can be changed by my client.
Perhaps you can implement the ProcessingTimeCallback interface in the MyFilter class, which supports timer operations, and you can update the threshold in the onProcessingTime function
public class MyFilter extends FilterFunction<...> implements ProcessingTimeCallback {
int threshold;
@Override
public void open(Configuration parameters) throws Exception {
scheduler.scheduleAtFixedRate(this, 1, 1, TimeUnit.HOURS);
final long now = getProcessingTimeService().getCurrentProcessingTime();
getProcessingTimeService().registerTimer(now + 3600000, this);
}
@Override
public boolean filter(IN xxx) throws Exception {
return xxx > threshold;
}
@Override
public void onProcessingTime(long timestamp) throws Exception {
threshold = XXXX;
final long now = getProcessingTimeService().getCurrentProcessingTime();
getProcessingTimeService().registerTimer(now + 3600000, this);
}
}