scalaapache-sparkazure-cosmosdbspark-streaming

Spark stream to Azure cosmos DB


I am trying to stream aggregated data to Azure Cosmos DB using spark stream. The sample spark stream application take input from n/w console and then apply aggregation to it and trying write the stream. Below is the code.

import org.apache.spark.sql.SparkSession

object SparkStreamCosmos extends App{

  val cfgMap = Map("spark.cosmos.accountEndpoint" -> "https://xxx-cosmos.documents.azure.com:443/",
    "spark.cosmos.accountKey" -> "xxxxxxx==",
    "spark.cosmos.database" -> "buabookkeeping",
    "spark.cosmos.container" -> "wordscount",
    "spark.cosmos.write.strategy" -> "ItemOverwrite"
  )

  // start the local host n/w server by using the command : nc -lk 9999
  val spark = SparkSession
    .builder
    .appName("StructuredNetworkWordCount")
    .master("local[2]")
    .getOrCreate()

  spark.sparkContext.setLogLevel("ERROR")

  import spark.implicits._

  // Create DataFrame representing the stream of input lines from connection to localhost: 9999
  val lines = spark.readStream
    .format("socket")
    .option("host", "localhost")
    .option("port", 9999)
    .load()

  // Split the lines into words
  val words = lines.as[String].flatMap(_.split(" "))

  // Generate running word count
  var wordCounts = words.groupBy("value").count()

  wordCounts = wordCounts.withColumnRenamed("value", "id")
  // write to cosmos DB
  
  wordCounts.writeStream.
  format("cosmos.oltp").
    outputMode("update")
    .options(cfgMap)
    .option("checkpointLocation", "/Users/k0d03gd/project/code/krushna/spark-hello/checkpointLocation")
    .start()
    .awaitTermination(100000)
}

When I run this code I am getting the error like

java.lang.IllegalArgumentException: requirement failed: com.azure.cosmos.spark.items.bua-cosmos.buabookkeeping.wordscount does not support Update mode.

My aim here UPSERT the updated row into azure cosmos DB. How to do that.

Complete stack trace

java.lang.IllegalArgumentException: requirement failed: com.azure.cosmos.spark.items.bua-cosmos.buabookkeeping.wordscount does not support Update mode.
    at scala.Predef$.require(Predef.scala:281)
    at org.apache.spark.sql.execution.datasources.v2.V2Writes$.org$apache$spark$sql$execution$datasources$v2$V2Writes$$buildWriteForMicroBatch(V2Writes.scala:121)
    at org.apache.spark.sql.execution.datasources.v2.V2Writes$$anonfun$apply$1.applyOrElse(V2Writes.scala:90)
    at org.apache.spark.sql.execution.datasources.v2.V2Writes$$anonfun$apply$1.applyOrElse(V2Writes.scala:43)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
    at org.apache.spark.sql.execution.datasources.v2.V2Writes$.apply(V2Writes.scala:43)
    at org.apache.spark.sql.execution.datasources.v2.V2Writes$.apply(V2Writes.scala:39)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:211)
    at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
    at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
    at scala.collection.immutable.List.foldLeft(List.scala:91)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:208)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:200)
    at scala.collection.immutable.List.foreach(List.scala:431)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:200)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:179)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:179)
    at org.apache.spark.sql.execution.streaming.IncrementalExecution.$anonfun$optimizedPlan$1(IncrementalExecution.scala:81)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:185)
    at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:510)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:185)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
    at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:184)
    at org.apache.spark.sql.execution.streaming.IncrementalExecution.optimizedPlan$lzycompute(IncrementalExecution.scala:82)
    at org.apache.spark.sql.execution.streaming.IncrementalExecution.optimizedPlan(IncrementalExecution.scala:79)
    at org.apache.spark.sql.execution.QueryExecution.assertOptimized(QueryExecution.scala:136)
    at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:154)
    at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:151)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$15(MicroBatchExecution.scala:656)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:646)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:256)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:219)
    at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:67)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:213)
    at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:307)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:285)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:208)

Solution

  • The cosmos DB is not a supported sink for spark stream, here is the list of sinks available for spark streaming https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-sinks

    The solutions may be to use a Kafka as sink and then write the data from kafka to cosmos DB.