apache-sparkpysparkspark-streaming

Limited in defining Spark time Window


I have a code of pyspark streaming. Which is in following:

 parsed_df = df.selectExpr("CAST(value AS STRING) as message", "timestamp") \
    .select(
    from_json(col("message"), json_schema).alias("data"),
    col("timestamp")
) \
.select("data.*", "timestamp")

 complex_windowed_df = parsed_df \
.filter(col("PROVINCE").isNotNull()) \
.filter(col("AMOUNT").cast("long") > 0) \
.withWatermark("timestamp", "10 minutes") \
.groupBy(
    window("timestamp", "5 minutes")
  ) \
   .agg(
    count("*").alias("event_count"),
    collect_set("CITY_NAME").alias("cities")
   )

one_minute_df = parsed_df \
.filter(col("PROVINCE").isNotNull()) \
.filter(col("AMOUNT").cast("long") > 0) \
.withWatermark("timestamp", "5 minutes") \
.withColumn("processing_time",
            # Normal distribution simulation using multiple random values
            ((rand() + rand() + rand() + rand()) / 4) * 200 + 50)\
.groupBy(
    window("timestamp", "1 minute"),
    col("PROVINCE"),
    col("TERMINAL_TYPE"),
    col("TRANSACTION_TYPE")
) \
.agg(
    count("*").alias("event_count_1min"),
    avg(col("processing_time").cast("double")).alias("avg_processing_time"),
    spark_min(col("processing_time").cast("double")).alias("min_processing_time"),
    spark_max(col("processing_time").cast("double")).alias("max_processing_time"),
    avg(col("AMOUNT").cast("double")).alias("avg_amount_1min")
  )

 one_second_df = parsed_df \
.filter(col("PROVINCE").isNotNull()) \
.filter(col("AMOUNT").cast("long") > 0) \
.withWatermark("timestamp", "30 seconds") \
.groupBy(
    window("timestamp", "1 second"),
    col("PROVINCE"),
    col("TERMINAL_TYPE"),
    col("TRANSACTION_TYPE")
) \
.agg(
    count("*").alias("event_count"),
    avg(col("AMOUNT").cast("double")).alias("avg_amount_1second"),
    count('RULE_ID').alias('rule_id_count')
)

 query = complex_windowed_df.writeStream \
.outputMode("update") \
.format("console") \
.option("truncate", "false") \
.option("numRows", 100) \
.trigger(processingTime="30 seconds") \
.start()

query_1min = one_minute_df.writeStream \
.outputMode("update") \
.format("console") \
.option("truncate", "false") \
.trigger(processingTime="5 seconds") \
.start()

 query_1sec = one_second_df.writeStream \
.outputMode("update") \
.format("console") \
.option("truncate", "false") \
.trigger(processingTime="5 seconds") \
.start()

query.awaitTermination()   
query_1min.awaitTermination()
query_1sec.awaitTermination()

The code calculate some features. Since I need to aggregate some features in different time window, I define three time windows. The problem is that the last code which is for 1-second time window is not creating any output. If I move all 1-minute time window code and bring them at the end, then 1-minute time window does not generate any output since it is at the end. Would you please any one guide me if Spark has limitation in defining more than two number of time window? If not why the last time window does not generate any output in the code?

Any help is really appreciated.


Solution

  • Problem solved. The last time window does not print any output because of resource limitations, so, spark does not have any limitation on number of time window. I use this code which stops each query after running to make resources free for other activated queries. I check it and it runs all time windows output without problem:

    import threading
    import time
    import uuid
    from pyspark.sql.functions import col, count, window, avg, sum as spark_sum, current_timestamp, countDistinct, \
        collect_set, approx_count_distinct, rand
    from pyspark.sql.types import StructType, StructField, StringType, TimestampType, LongType, DoubleType
    from pyspark.sql import SparkSession
    from pyspark.sql.functions import from_json, window, count, current_timestamp
    from pyspark.sql.types import StructType, StringType
    import uuid
    from pyspark.sql.functions import min as spark_min, max as spark_max
    
    
    
    spark = SparkSession.builder \
        .appName("KafkaWindowedStream") \
        .master("local[2]") \
        .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2") \
        .config("spark.driver.memory", "8g") \
        .config("spark.executor.memory", "8g") \
        .getOrCreate()
    
    spark.sparkContext.setLogLevel("WARN")
    
    unique_group = f"spark-consumer-{uuid.uuid4().hex[:8]}"
    print(f"Using consumer group: {unique_group}")
    
    df = spark.readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "localhost:19092") \
        .option("subscribe", "chavosh") \
        .option("startingOffsets", "earliest") \
        .option("kafka.group.id", unique_group) \
        .option("failOnDataLoss", "false") \
        .option("maxOffsetsPerTrigger", "1000") \
        .load()
    
    # Define the schema for your JSON message
    json_schema = StructType([
        StructField("S_C", StringType(), True),
        StructField("TERM_TYPE", StringType(), True),
        StructField("O_ID", StringType(), True),
        StructField("TERM_ID", StringType(), True),
        StructField("TRAN_ID", StringType(), True),
        StructField("OP_BRAN_ID", StringType(), True),
        StructField("C_NUMBER", StringType(), True),
        StructField("DEV_ID", StringType(), True),
        StructField("ISF", StringType(), True),
        StructField("C_TYPE_ID", StringType(), True),
        StructField("AM", StringType(), True),
        StructField("PROVINCE_ID", StringType(), True),
        StructField("ID", StringType(), True),
        StructField("AC_ID", StringType(), True),
        StructField("TRN_DATE", StringType(), True),
        StructField("SRCCTYPE_ID", StringType(), True),
        StructField("GUID", StringType(), True),
        StructField("TRN_DATE_MILADY", StringType(), True),
        StructField("IS_TERM_R", StringType(), True),
        StructField("TRN_TYPE", StringType(), True),
        StructField("ORIG_RETURNED", StringType(), True),
        StructField("TRNFLAG", StringType(), True),
        StructField("NATURE", StringType(), True),
        StructField("RNUM", StringType(), True),
        StructField("CITY_ID", StringType(), True),
        StructField("C_TRANSFER", StringType(), True),
        StructField("CITY_NAME", StringType(), True),
        StructField("OP_PARENT_ID", StringType(), True),
        StructField("PROVINCE", StringType(), True),
        StructField("TRN_TIMESTAMP", StringType(), True),
        StructField("RULE_ID", StringType(), True),
        StructField("PROCESSING_TIME", StringType(), True)
    ])
    
    parsed_df = df.selectExpr("CAST(value AS STRING) as message", "timestamp") \
        .select(
            from_json(col("message"), json_schema).alias("data"),
            col("timestamp")
        ) \
        .select("data.*", "timestamp")
    
    complex_windowed_df = parsed_df \
        .filter(col("PROVINCE").isNotNull()) \
        .filter(col("AM").cast("long") > 0) \
        .withWatermark("timestamp", "10 minutes") \
        .groupBy(
            window("timestamp", "5 minutes")
        ) \
        .agg(
            count("*").alias("event_count"),
            collect_set("CITY_NAME").alias("cities")
        )
    
    one_minute_df = parsed_df \
        .filter(col("PROVINCE").isNotNull()) \
        .filter(col("AM").cast("long") > 0) \
        .withWatermark("timestamp", "5 minutes") \
        .withColumn("processing_time",
                    # Normal distribution simulation using multiple random values
                    ((rand() + rand() + rand() + rand()) / 4) * 200 + 50)\
        .groupBy(
            window("timestamp", "1 minute"),
            col("PROVINCE"),
            col("TERM_TYPE"),
            col("TRN_TYPE")
        ) \
        .agg(
            count("*").alias("event_count_1min"),
            avg(col("processing_time").cast("double")).alias("avg_processing_time"),
            spark_min(col("processing_time").cast("double")).alias("min_processing_time"),
            spark_max(col("processing_time").cast("double")).alias("max_processing_time"),
            avg(col("AM").cast("double")).alias("avg_am_1min")
    )
    
    one_second_df = parsed_df \
        .filter(col("PROVINCE").isNotNull()) \
        .filter(col("AM").cast("long") > 0) \
        .withWatermark("timestamp", "30 seconds") \
        .groupBy(
            window("timestamp", "1 second"),
            col("PROVINCE"),
            col("TERM_TYPE"),
            col("TRN_TYPE")
        ) \
        .agg(
            count("*").alias("event_count"),
            avg(col("AM").cast("double")).alias("avg_am_1second"),
            count('RULE_ID').alias('rule_id_count')
        )
    
    def run_windowed_queries():
        while True:
            # Run 5-minute for 30 seconds
            print("Starting 5-minute window...")
            query = complex_windowed_df.writeStream \
                .outputMode("update") \
                .format("console") \
                .option("numRows", 5) \
                .start()
    
            time.sleep(30)
            query.stop()
    
            # Run 1-minute for 30 seconds
            print("Starting 1-minute window...")
            query_1min = one_minute_df.writeStream \
                .outputMode("update") \
                .format("console") \
                .option("numRows", 5) \
                .start()
    
            time.sleep(30)
            query_1min.stop()
    
            # Run 1-second for 30 seconds
            print("Starting 1-second window...")
            query_1sec = one_second_df.writeStream \
                .outputMode("update") \
                .format("console") \
                .option("numRows", 5) \
                .start()
    
            time.sleep(30)
            query_1sec.stop()
           
    
    
    # Start the rotation in background thread
    query_thread = threading.Thread(target=run_windowed_queries)
    query_thread.start()
    
    query_thread.join()