I am writing a pyflink program that is used for anomaly detection on the number of logs coming into a kafka topic using exponentially weighted moving average. The kafka broker the topic is on is running within my local computer
from pyflink.common import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.kafka import FlinkKafkaConsumer
from pyflink.datastream.formats.json import JsonRowDeserializationSchema
from pyflink.datastream.window import TumblingProcessingTimeWindows
from pyflink.datastream.functions import AggregateFunction, MapFunction
from pyflink.common.time import Time
from datetime import datetime, timedelta
class SimpleCountAggregateFunction(AggregateFunction):
def create_accumulator(self):
return (0, None) # Include a placeholder for timestamp
def add(self, value, accumulator):
# Assume value is a tuple of (count, timestamp)
count, timestamp = value
if count == 0:
timestamp = datetime.now() # Update timestamp for zero count window
return (accumulator[0] + 1, timestamp)
def get_result(self, accumulator):
return accumulator
def merge(self, a, b):
return (a[0] + b[0], a[1])
class EMACalculatorWithPrediction(MapFunction):
def __init__(self, period, start_time):
self.period = period
self.smoothing_factor = 2 / (1 + period)
self.ema = None
# Convert start_time from datetime to timestamp for simpler handling
self.window_start_time = start_time
def map(self, value):
count, _ = value # Unpack the count, ignore the incoming timestamp
original_value = float(count)
if self.ema is None:
self.ema = original_value
else:
self.ema = (original_value * self.smoothing_factor) + (self.ema * (1 - self.smoothing_factor))
current_time = self.window_start_time.strftime("%Y-%m-%d %H:%M:%S")
# Update start_time for the next window
self.window_start_time += timedelta(seconds=60)
# Check for anomaly
if original_value > 0 and ((original_value - self.ema) / original_value) > 0.3:
# Report anomaly
return (current_time, original_value, self.ema, "Anomaly Detected")
else:
return (current_time, original_value, self.ema, "No Anomaly")
def main():
env = StreamExecutionEnvironment.get_execution_environment()
kafka_source = FlinkKafkaConsumer(
topics="input-events",
deserialization_schema=JsonRowDeserializationSchema.builder().type_info(
type_info=Types.ROW_NAMED(["timestamp", "data"], [Types.INSTANT(), Types.STRING()])).build(),
properties={'bootstrap.servers': 'localhost:9092', 'group.id': 'count-group'}
)
data_stream = env.add_source(kafka_source)
# Convert to a tuple of (count, timestamp) before windowing
mapped_stream = data_stream.map(lambda x: (1, x[0]), output_type=Types.TUPLE([Types.LONG(), Types.INSTANT()]))
counted_stream = mapped_stream \
.window_all(TumblingProcessingTimeWindows.of(Time.seconds(60))) \
.aggregate(SimpleCountAggregateFunction(), output_type=Types.TUPLE([Types.LONG(), Types.INSTANT()])) \
.map(lambda count_timestamp: (str(count_timestamp[0]), count_timestamp[1]), output_type=Types.TUPLE([Types.STRING(), Types.INSTANT()]))
# Initialize EMACalculatorWithPrediction with the current system time as the start time for the first window
initial_window_start_time = datetime.now()
ema_and_value_stream = counted_stream.map(EMACalculatorWithPrediction(period=10, start_time=initial_window_start_time),
output_type=Types.TUPLE([Types.STRING(), Types.FLOAT(), Types.FLOAT(), Types.STRING()]))
ema_and_value_stream.map(lambda x: f"Timestamp: {x[0]}, Original: {x[1]}, EWMA: {x[2]}, Status: {x[3]}", output_type=Types.STRING()).print()
env.execute("Kafka JSON Count, EWMA Calculation, and Anomaly Detection")
if __name__ == "__main__":
main()
The code counts the number of logs coming into a kafka topic using a tumbling window, the ewma (exponential weighted moving average) and anomaly detection part of the code works fine, but the issue arises with the count part of the code.The count of tumbling window works when there are logs coming into the topic, however, the count does not work when the number of logs is zero, i want to modify the code such that it works when the count is zero
The things that I have done to solve this issue are •Writing an if else condition comparing it to 0 •Changing the way the map function is calculated
any other suggestions to help me fix the logic would be greatly appreciated, i have been stuck here for the past 6 hours
When you use the DataStream API, windows don't exist until the first record is assigned to them, so there's no chance for an empty window to produce results. The same is also true for the Table/SQL API.
My suggestion would be to rewrite this using a KeyedProcessFunction rather than a window. I'm suggesting a KeyedProcessFuntion rather than a ProcessFunction, because you'll need to keep some state (the counter), and you'll need a timer, and timers are only available with keyed streams. Since it doesn't make sense to key partition the stream, you can use a key selector function that returns a constant -- i.e., assign the same key to every event.
The keyed process function should be pretty straightforward: