apache-sparkpysparkspark-streaming

Limited in defining Spark time Window


I have a code of pyspark streaming. Which is in following:

 parsed_df = df.selectExpr("CAST(value AS STRING) as message", "timestamp") \
    .select(
    from_json(col("message"), json_schema).alias("data"),
    col("timestamp")
) \
.select("data.*", "timestamp")

 complex_windowed_df = parsed_df \
.filter(col("PROVINCE").isNotNull()) \
.filter(col("AMOUNT").cast("long") > 0) \
.withWatermark("timestamp", "10 minutes") \
.groupBy(
    window("timestamp", "5 minutes")
  ) \
   .agg(
    count("*").alias("event_count"),
    collect_set("CITY_NAME").alias("cities")
   )

one_minute_df = parsed_df \
.filter(col("PROVINCE").isNotNull()) \
.filter(col("AMOUNT").cast("long") > 0) \
.withWatermark("timestamp", "5 minutes") \
.withColumn("processing_time",
            # Normal distribution simulation using multiple random values
            ((rand() + rand() + rand() + rand()) / 4) * 200 + 50)\
.groupBy(
    window("timestamp", "1 minute"),
    col("PROVINCE"),
    col("TERMINAL_TYPE"),
    col("TRANSACTION_TYPE")
) \
.agg(
    count("*").alias("event_count_1min"),
    avg(col("processing_time").cast("double")).alias("avg_processing_time"),
    spark_min(col("processing_time").cast("double")).alias("min_processing_time"),
    spark_max(col("processing_time").cast("double")).alias("max_processing_time"),
    avg(col("AMOUNT").cast("double")).alias("avg_amount_1min")
  )

 one_second_df = parsed_df \
.filter(col("PROVINCE").isNotNull()) \
.filter(col("AMOUNT").cast("long") > 0) \
.withWatermark("timestamp", "30 seconds") \
.groupBy(
    window("timestamp", "1 second"),
    col("PROVINCE"),
    col("TERMINAL_TYPE"),
    col("TRANSACTION_TYPE")
) \
.agg(
    count("*").alias("event_count"),
    avg(col("AMOUNT").cast("double")).alias("avg_amount_1second"),
    count('RULE_ID').alias('rule_id_count')
)

 query = complex_windowed_df.writeStream \
.outputMode("update") \
.format("console") \
.option("truncate", "false") \
.option("numRows", 100) \
.trigger(processingTime="30 seconds") \
.start()

query_1min = one_minute_df.writeStream \
.outputMode("update") \
.format("console") \
.option("truncate", "false") \
.trigger(processingTime="5 seconds") \
.start()

 query_1sec = one_second_df.writeStream \
.outputMode("update") \
.format("console") \
.option("truncate", "false") \
.trigger(processingTime="5 seconds") \
.start()

query.awaitTermination()   
query_1min.awaitTermination()
query_1sec.awaitTermination()

The code calculate some features. Since I need to aggregate some features in different time window, I define three time windows. The problem is that the last code which is for 1-second time window is not creating any output. If I move all 1-minute time window code and bring them at the end, then 1-minute time window does not generate any output since it is at the end.

Does Spark have a limitation in defining more than two number of time window? If not why the last time window does not generate any output in the code?


Solution

  • Problem solved. The last time window does not print any output because of resource limitations, so, spark does not have any limitation on number of time window. I use this code which stops each query after running to make resources free for other activated queries. I check it and it runs all time windows output without problem:

    import threading
    import time
    import uuid
    from pyspark.sql.functions import col, count, window, avg, sum as spark_sum, current_timestamp, countDistinct, \
        collect_set, approx_count_distinct, rand
    from pyspark.sql.types import StructType, StructField, StringType, TimestampType, LongType, DoubleType
    from pyspark.sql import SparkSession
    from pyspark.sql.functions import from_json, window, count, current_timestamp
    from pyspark.sql.types import StructType, StringType
    import uuid
    from pyspark.sql.functions import min as spark_min, max as spark_max
    
    
    
    spark = SparkSession.builder \
        .appName("KafkaWindowedStream") \
        .master("local[2]") \
        .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2") \
        .config("spark.driver.memory", "8g") \
        .config("spark.executor.memory", "8g") \
        .getOrCreate()
    
    spark.sparkContext.setLogLevel("WARN")
    
    unique_group = f"spark-consumer-{uuid.uuid4().hex[:8]}"
    print(f"Using consumer group: {unique_group}")
    
    df = spark.readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "localhost:19092") \
        .option("subscribe", "chavosh") \
        .option("startingOffsets", "earliest") \
        .option("kafka.group.id", unique_group) \
        .option("failOnDataLoss", "false") \
        .option("maxOffsetsPerTrigger", "1000") \
        .load()
    
    # Define the schema for your JSON message
    json_schema = StructType([
        StructField("S_C", StringType(), True),
        StructField("TERM_TYPE", StringType(), True),
        StructField("O_ID", StringType(), True),
        StructField("TERM_ID", StringType(), True),
        StructField("TRAN_ID", StringType(), True),
        StructField("OP_BRAN_ID", StringType(), True),
        StructField("C_NUMBER", StringType(), True),
        StructField("DEV_ID", StringType(), True),
        StructField("ISF", StringType(), True),
        StructField("C_TYPE_ID", StringType(), True),
        StructField("AM", StringType(), True),
        StructField("PROVINCE_ID", StringType(), True),
        StructField("ID", StringType(), True),
        StructField("AC_ID", StringType(), True),
        StructField("TRN_DATE", StringType(), True),
        StructField("SRCCTYPE_ID", StringType(), True),
        StructField("GUID", StringType(), True),
        StructField("TRN_DATE_MILADY", StringType(), True),
        StructField("IS_TERM_R", StringType(), True),
        StructField("TRN_TYPE", StringType(), True),
        StructField("ORIG_RETURNED", StringType(), True),
        StructField("TRNFLAG", StringType(), True),
        StructField("NATURE", StringType(), True),
        StructField("RNUM", StringType(), True),
        StructField("CITY_ID", StringType(), True),
        StructField("C_TRANSFER", StringType(), True),
        StructField("CITY_NAME", StringType(), True),
        StructField("OP_PARENT_ID", StringType(), True),
        StructField("PROVINCE", StringType(), True),
        StructField("TRN_TIMESTAMP", StringType(), True),
        StructField("RULE_ID", StringType(), True),
        StructField("PROCESSING_TIME", StringType(), True)
    ])
    
    parsed_df = df.selectExpr("CAST(value AS STRING) as message", "timestamp") \
        .select(
            from_json(col("message"), json_schema).alias("data"),
            col("timestamp")
        ) \
        .select("data.*", "timestamp")
    
    complex_windowed_df = parsed_df \
        .filter(col("PROVINCE").isNotNull()) \
        .filter(col("AM").cast("long") > 0) \
        .withWatermark("timestamp", "10 minutes") \
        .groupBy(
            window("timestamp", "5 minutes")
        ) \
        .agg(
            count("*").alias("event_count"),
            collect_set("CITY_NAME").alias("cities")
        )
    
    one_minute_df = parsed_df \
        .filter(col("PROVINCE").isNotNull()) \
        .filter(col("AM").cast("long") > 0) \
        .withWatermark("timestamp", "5 minutes") \
        .withColumn("processing_time",
                    # Normal distribution simulation using multiple random values
                    ((rand() + rand() + rand() + rand()) / 4) * 200 + 50)\
        .groupBy(
            window("timestamp", "1 minute"),
            col("PROVINCE"),
            col("TERM_TYPE"),
            col("TRN_TYPE")
        ) \
        .agg(
            count("*").alias("event_count_1min"),
            avg(col("processing_time").cast("double")).alias("avg_processing_time"),
            spark_min(col("processing_time").cast("double")).alias("min_processing_time"),
            spark_max(col("processing_time").cast("double")).alias("max_processing_time"),
            avg(col("AM").cast("double")).alias("avg_am_1min")
    )
    
    one_second_df = parsed_df \
        .filter(col("PROVINCE").isNotNull()) \
        .filter(col("AM").cast("long") > 0) \
        .withWatermark("timestamp", "30 seconds") \
        .groupBy(
            window("timestamp", "1 second"),
            col("PROVINCE"),
            col("TERM_TYPE"),
            col("TRN_TYPE")
        ) \
        .agg(
            count("*").alias("event_count"),
            avg(col("AM").cast("double")).alias("avg_am_1second"),
            count('RULE_ID').alias('rule_id_count')
        )
    
    def run_windowed_queries():
        while True:
            # Run 5-minute for 30 seconds
            print("Starting 5-minute window...")
            query = complex_windowed_df.writeStream \
                .outputMode("update") \
                .format("console") \
                .option("numRows", 5) \
                .start()
    
            time.sleep(30)
            query.stop()
    
            # Run 1-minute for 30 seconds
            print("Starting 1-minute window...")
            query_1min = one_minute_df.writeStream \
                .outputMode("update") \
                .format("console") \
                .option("numRows", 5) \
                .start()
    
            time.sleep(30)
            query_1min.stop()
    
            # Run 1-second for 30 seconds
            print("Starting 1-second window...")
            query_1sec = one_second_df.writeStream \
                .outputMode("update") \
                .format("console") \
                .option("numRows", 5) \
                .start()
    
            time.sleep(30)
            query_1sec.stop()
           
    
    
    # Start the rotation in background thread
    query_thread = threading.Thread(target=run_windowed_queries)
    query_thread.start()
    
    query_thread.join()