apache-sparkspark-structured-streamingapache-spark-2.0

How to pivot streaming dataset?


I am trying to pivot a Spark streaming dataset (structured streaming) but I get an AnalysisException (excerpt below).

Could someone confirm pivoting is indeed not supported in structured streams (Spark 2.0), perhaps suggest alternative approaches?

Exception in thread "main" org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();; kafka at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:297) at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:36) at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:34) at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)


Solution

  • tl;dr pivot aggregation is not directly supported by Spark Structured Streaming up to and including 2.4.4.

    As a workaround, use DataStreamWriter.foreachBatch or more generic DataStreamWriter.foreach.


    I use the latest version of Spark 2.4.4 as of now.

    scala> spark.version
    res0: String = 2.4.4
    

    UnsupportedOperationChecker (that you can find in the stack trace) checks whether (the logical plan of) a streaming query uses supported operations only.

    When you execute pivot you had to groupBy first as that's the only interface to give you pivot available.

    There are two issues with pivot:

    1. pivot wants to know how many columns to generate values for and hence does collect which is not possible with streaming Datasets.

    2. pivot is actually another aggregation (beside groupBy) that Spark Structured Streaming does not support

    Let's look at the issue 1 with no columns to pivot on defined.

    val sq = spark
      .readStream
      .format("rate")
      .load
      .groupBy("value")
      .pivot("timestamp") // <-- pivot with no values
      .count
      .writeStream
      .format("console")
    scala> sq.start
    org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
    rate
      at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.throwError(UnsupportedOperationChecker.scala:389)
      at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.$anonfun$checkForBatch$1(UnsupportedOperationChecker.scala:38)
      at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.$anonfun$checkForBatch$1$adapted(UnsupportedOperationChecker.scala:36)
      at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
      at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:126)
      at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:126)
      at scala.collection.immutable.List.foreach(List.scala:392)
      at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
      at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:126)
      at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:126)
      at scala.collection.immutable.List.foreach(List.scala:392)
      at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
      at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:126)
      at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:126)
      at scala.collection.immutable.List.foreach(List.scala:392)
      at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
      at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:126)
      at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:126)
      at scala.collection.immutable.List.foreach(List.scala:392)
      at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
      at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:126)
      at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:126)
      at scala.collection.immutable.List.foreach(List.scala:392)
      at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
      at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForBatch(UnsupportedOperationChecker.scala:36)
      at org.apache.spark.sql.execution.QueryExecution.assertSupported(QueryExecution.scala:51)
      at org.apache.spark.sql.execution.QueryExecution.withCachedData$lzycompute(QueryExecution.scala:62)
      at org.apache.spark.sql.execution.QueryExecution.withCachedData(QueryExecution.scala:60)
      at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:66)
      at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:66)
      at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:72)
      at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:68)
      at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:77)
      at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:77)
      at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3365)
      at org.apache.spark.sql.Dataset.collect(Dataset.scala:2788)
      at org.apache.spark.sql.RelationalGroupedDataset.pivot(RelationalGroupedDataset.scala:384)
      ... 49 elided
    

    The last two lines show the issue, i.e. pivot does collect under the covers and hence the issue.

    The other issue is that even though you'd specify the values for columns to pivot on you'd then get the other issue due to multiple aggregations (and you can see that it's actually a check for streaming not batch as has happened with the first case).

    val sq = spark
      .readStream
      .format("rate")
      .load
      .groupBy("value")
      .pivot("timestamp", Seq(1)) // <-- pivot with explicit values
      .count
      .writeStream
      .format("console")
    scala> sq.start
    org.apache.spark.sql.AnalysisException: Multiple streaming aggregations are not supported with streaming DataFrames/Datasets;;
    Project [value#128L, __pivot_count(1) AS `count` AS `count(1) AS ``count```#141[0] AS 1#142L]
    +- Aggregate [value#128L], [value#128L, pivotfirst(timestamp#127, count(1) AS `count`#137L, 1000000, 0, 0) AS __pivot_count(1) AS `count` AS `count(1) AS ``count```#141]
       +- Aggregate [value#128L, timestamp#127], [value#128L, timestamp#127, count(1) AS count(1) AS `count`#137L]
          +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@5dd63368,rate,List(),None,List(),None,Map(),None), rate, [timestamp#127, value#128L]
    
      at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.throwError(UnsupportedOperationChecker.scala:389)
      at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForStreaming(UnsupportedOperationChecker.scala:93)
      at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:250)
      at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:326)
      at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:325)
      ... 49 elided