Using a CosmosDBMongoDB linked service, from Azure Cosmos DB for MongoDB, to a Synapse Analytics workspace.
EDIT: I can confirm that the objectID
values in the Cosmos/ Mongo DB are valid, since they are used in an application.
Document snippet, as seen in CosmosDB Data Explorer
{
"_id" : ObjectId("623a3902764504df1bc51620"),
"name": "Dummy data"
}
The Analytical Storage Time to Live option is On in the CosmosDB collections, meaning they show up in the Linked section in Analytics studio.
In a Synapse notebook running
spark.sql("select unhex('623a3902764504df1bc51620') as bytes").show(truncate=False)
returns
+-------------------------------------+
|bytes |
+-------------------------------------+
|[62 3A 39 02 76 45 04 DF 1B C5 16 20]|
+-------------------------------------+
indicating a working PySpark environment. However
df = spark.read\
.format("cosmos.olap")\
.option("spark.synapse.linkedService", "CosmosDbMongoDb1")\
.option("spark.cosmos.container", "TABLENAME")\
.load()
display(df.limit(10))
returns
"{"objectId":"b:9\u0002vE\u0004�\u001b�\u0016 "}"
objectId: ""b:9\u0002vE\u0004�\u001b�\u0016 ""
for all values in _id
column.
EDIT: df.printSchema()
returns
root
|-- _rid: string (nullable = true)
|-- _ts: long (nullable = true)
|-- id: string (nullable = true)
|-- _etag: string (nullable = true)
|-- _id: struct (nullable = true)
| |-- objectId: string (nullable = true)
|-- name: struct (nullable = true)
| |-- string: string (nullable = true)
..... snip ...
Running df3 = df.select(unhex("_id.objectId"))
returns
+-------------------+
|unhex(_id.objectId)|
+-------------------+
| null|
| null|
+-------------------+
Running df.select(unhex('_id.objectId'))
returns
DataFrame[unhex(_id.objectId): binary]
Running
SELECT TOP (100) JSON_VALUE([_id], '$.objectId') AS _id
FROM [DB].[dbo].[TABLENAME]
in the built-in SQL pool returns the same garbled b:9vE��
values.
Trying
%%sql
create table TABLENAME using cosmos.olap options (
spark.synapse.linkedService 'CosmosDbMongoDb1',
spark.cosmos.container 'TABLENAME'
)
SELECT name, _id
FROM TABLENAME;
returns for the _id
column
"{"schema":[{"name":"objectId","dataType":{},"nulla..."
schema: "[{"name":"objectId","dataType":{},"nullable":true,..."
0: "{"name":"objectId","dataType":{},"nullable":true,"..."
name: ""objectId""
dataType: "{}"
nullable: "true"
metadata: "{"map":{}}"
map: "{}"
values: "["b:9\u0002vE\u0004�\u001b�\u0016 "]"
0: ""b:9\u0002vE\u0004�\u001
Sorry for the copypaste additional garble there are expandable fields in the column values.
I found Scala code example in Azure documentation that works for converting ObjectId
field to string:
val df = spark.read.format("cosmos.olap")
.option("spark.synapse.linkedService", "xxxx")
.option("spark.cosmos.container", "xxxx")
.load()
val convertObjectId = udf((bytes: Array[Byte]) => {
val builder = new StringBuilder
for (b <- bytes) {
builder.append(String.format("%02x", Byte.box(b)))
}
builder.toString
}
)
val dfConverted = df.withColumn("objectId", col("_id.objectId")).withColumn("convertedObjectId", convertObjectId(col("_id.objectId"))).select("id", "objectId", "convertedObjectId")
display(dfConverted)