databricksazure-cosmosdbazure-databricksazure-cosmosdb-sqlapi

UPSERT /INSERT/ UPDATE between Databricks to Cosmos


Currently we are using Azure Databricks as Transformation layer and transformed data are loaded to Cosmos DB through connector.

Scenario:

We have 2 files as source files.

1st File contains name,age

2nd file contains name, state, country

In Cosmos, I have created collection using id, Partition Key

In databricks, I am loading these 2 files as Dataframe and creating a temp table to query the content.

I am querying the content from the first file [ select name as id, name, age from file ]and loading the same to Cosmos Collection.

From the second file. I am using [ select name as id, state, country] and loading to the same collection expecting the content from the second file get inserted in the same collection in the same document based on id field.

The issue here is when I am loading the content from the second file, the attribute 'age' from the first file gets deleted and only id, name, state, country is seen in the cosmos document. This is happening because I am using UPSERT in databricks to load to Cosmos.

When I am changing the UPSERT to INSERT or UPDATE it throws as error which says 'Resource with id already exists'

Databricks Connection to Cosmos:

val configMap = Map(
  "Endpoint" -> {"https://"},
  "Masterkey" -> {""},
  "Database" -> {"ods"},
  "Collection" -> {"tval"},
  "preferredRegions" -> {"West US"},
  "upsert" -> {"true"}) 
  val config = com.microsoft.azure.cosmosdb.spark.config.Config(configMap)

Is there a way to insert the attributes from second file without deleting the attribute which is already present. I am not using JOIN operation as the use case doesn't fit to use.


Solution

  • From a vague memory of doing this you need to set the id attribute on your data frame to match between the two datasets. If you omit this field Cosmos generates a new record - which is what is happening for you.

    So if df1 & df2 have id=1 on the first record then the first will insert it, the second will update it.

    But if they are the same record then joining in Spark will be far more efficient.