I get a nice structure if I do not try to get to nested fields. I am reading from Kafka and writing to a table. The issue happens on the readStream. I get [INVALID_EXTRACT_BASE_FIELD_TYPE] Can't extract a value from "data". Need a complex type [STRUCT, ARRAY, MAP] but got "VARIANT". SQLSTATE: 42000
Here is my readStream:
df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS) \
.option("subscribe", TOPIC) \
.option("startingOffsets", "latest") \
.... \
.load() \
.withColumn("data", parse_json(col("value").cast("string"))) \
.select("data, data:unique_id")\
.withColumn("timestamp", current_timestamp())
display(df)
It turns out that using selectExpr
is required:
df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS) \
.option("subscribe", TOPIC) \
.option("startingOffsets", "latest") \
.... \
.load() \
.withColumn("data", parse_json(col("value").cast("string"))) \
.selectExpr("data, data:unique_id")\
.withColumn("timestamp", current_timestamp())
display(df)