apache-sparkmemoryconsolespark-streaming

Want to create Continually running Spark streaming query that reads from a MemoryStream[String] and outputs to the console


the question I am asking will -- once answered -- help me write some tests of a Spark structured streaming pipeline that reads from a streaming source and writes to s3/parquet, but I've simplified things so the objective is to simply write to the console as new records are added to the MemoryStream. I almost have this working. But the only issue with the code below is that the first items I add to the Stream are indeed output to the console. However, after I do a sleep (deliberately beyond the trigger processing interval of the query), I don't see the items that I add after the sleep appear on the console.

The output is basically this:

    Batch: 0
    -------------------------------------------
    +-----+
    |value|
    +-----+
    |Alice|
    |  Bob|
    +-----+

I would like to see a second Batch with the value 'Mouse', but it never appears when i run the code below. Thanks in advance for any guidance !

import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.{Encoder, Encoders, SparkSession}
import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
import org.apache.spark.sql.types.{IntegerType, StringType, StructType}

object StreamTableDemo {
  def main(args: Array[String]): Unit = {
    implicit val spark = SparkSession
      .builder
      .appName("StreamTableDemo")
      .master("local[*]")
      .getOrCreate()

    // Define the schema for the stream data
    val schema = new StructType() .add("id", IntegerType)

    implicit val stringEncoder = ExpressionEncoder[String]

    // Create a MemoryStream and a DataFrame based on the schema
    val stream = new MemoryStream[String](2, spark.sqlContext, Some(1))
    val streamDF = stream.toDF()


    // Create a temporary view from the stream DataFrame
    streamDF.createOrReplaceTempView("T")

    // Define a streaming query to output the records in T to the console
    val query = spark.table("T")
      .writeStream
      .outputMode(OutputMode.Append())
      .trigger(Trigger.ProcessingTime("1 seconds"))
      .format("console")
      .start()

    // Add data to the stream
    stream.addData("Alice")
    stream.addData("Bob")

    // Wait for the query to terminate
    query.awaitTermination()
    Thread.sleep(2 * 1000)
    stream.addData("Mouse")
  }
}

Solution

  • make the statement query.awaitTermination() as the last statement i.e.

    // Wait for the query to terminate 
    Thread.sleep(2 * 1000)
    stream.addData("Mouse")
    query.awaitTermination()
    

    from the description of MemoryStream

    "A Source that produces value stored in memory as they are added by the user. This Source is intended for use in unit tests as it can only replay data when the object is still available."

    The stream object seems to be unavailable after awaitTermination()