I am trying to stream aggregated data to Azure Cosmos DB using spark stream. The sample spark stream application take input from n/w console and then apply aggregation to it and trying write the stream. Below is the code.
import org.apache.spark.sql.SparkSession
object SparkStreamCosmos extends App{
val cfgMap = Map("spark.cosmos.accountEndpoint" -> "https://xxx-cosmos.documents.azure.com:443/",
"spark.cosmos.accountKey" -> "xxxxxxx==",
"spark.cosmos.database" -> "buabookkeeping",
"spark.cosmos.container" -> "wordscount",
"spark.cosmos.write.strategy" -> "ItemOverwrite"
)
// start the local host n/w server by using the command : nc -lk 9999
val spark = SparkSession
.builder
.appName("StructuredNetworkWordCount")
.master("local[2]")
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
import spark.implicits._
// Create DataFrame representing the stream of input lines from connection to localhost: 9999
val lines = spark.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load()
// Split the lines into words
val words = lines.as[String].flatMap(_.split(" "))
// Generate running word count
var wordCounts = words.groupBy("value").count()
wordCounts = wordCounts.withColumnRenamed("value", "id")
// write to cosmos DB
wordCounts.writeStream.
format("cosmos.oltp").
outputMode("update")
.options(cfgMap)
.option("checkpointLocation", "/Users/k0d03gd/project/code/krushna/spark-hello/checkpointLocation")
.start()
.awaitTermination(100000)
}
When I run this code I am getting the error like
java.lang.IllegalArgumentException: requirement failed: com.azure.cosmos.spark.items.bua-cosmos.buabookkeeping.wordscount does not support Update mode.
My aim here UPSERT the updated row into azure cosmos DB. How to do that.
Complete stack trace
java.lang.IllegalArgumentException: requirement failed: com.azure.cosmos.spark.items.bua-cosmos.buabookkeeping.wordscount does not support Update mode.
at scala.Predef$.require(Predef.scala:281)
at org.apache.spark.sql.execution.datasources.v2.V2Writes$.org$apache$spark$sql$execution$datasources$v2$V2Writes$$buildWriteForMicroBatch(V2Writes.scala:121)
at org.apache.spark.sql.execution.datasources.v2.V2Writes$$anonfun$apply$1.applyOrElse(V2Writes.scala:90)
at org.apache.spark.sql.execution.datasources.v2.V2Writes$$anonfun$apply$1.applyOrElse(V2Writes.scala:43)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
at org.apache.spark.sql.execution.datasources.v2.V2Writes$.apply(V2Writes.scala:43)
at org.apache.spark.sql.execution.datasources.v2.V2Writes$.apply(V2Writes.scala:39)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:211)
at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
at scala.collection.immutable.List.foldLeft(List.scala:91)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:208)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:200)
at scala.collection.immutable.List.foreach(List.scala:431)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:200)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:179)
at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:179)
at org.apache.spark.sql.execution.streaming.IncrementalExecution.$anonfun$optimizedPlan$1(IncrementalExecution.scala:81)
at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:185)
at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:510)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:185)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:184)
at org.apache.spark.sql.execution.streaming.IncrementalExecution.optimizedPlan$lzycompute(IncrementalExecution.scala:82)
at org.apache.spark.sql.execution.streaming.IncrementalExecution.optimizedPlan(IncrementalExecution.scala:79)
at org.apache.spark.sql.execution.QueryExecution.assertOptimized(QueryExecution.scala:136)
at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:154)
at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:151)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$15(MicroBatchExecution.scala:656)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:646)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:256)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:219)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:67)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:213)
at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:307)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:285)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:208)
The cosmos DB is not a supported sink for spark stream, here is the list of sinks available for spark streaming https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-sinks
The solutions may be to use a Kafka as sink and then write the data from kafka to cosmos DB.