I have this code:
@dlt.table(
name="kafka_bronze",
table_properties={"pipelines.autoOptimize.enabled": "true"}
)
def kafka_bronze():
df = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", {Masked})
.option("subscribe", "topic1")
.option("startingOffsets", "earliest")
.option("maxOffsetsPerTrigger", 100)
.load()
.select(col("value").cast(StringType()).alias("json"))
.select(from_json("json", jsonSchema).alias("data"))
.select("data.*"))
return df
However it fails and doesn't write any data. I can stream data into notebook successfully but with pipeline it's not loading the data. FYI I am using a unity catalog
The error you are getting says org.apache.spark.sql.streaming.StreamingQueryException, specifically mentioning a TimeoutException from Kafka, Indicates that your Spark streaming job is timing out while trying to communicate with the Kafka brokers.
I agree with @JayashankarGS can happen due to reasons such as network issues Kafka broker overload, or incorrect Kafka configuration in your Spark job.
The below code helps you increase the timeout settings for your Kafka:
.option("kafka.consumer.request.timeout.ms", "60000")
.option("kafka.session.timeout.ms", "30000")
I have tried the below code:
def kafka_bronze():
df = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "your_kafka_bootstrap_servers")
.option("subscribe", "topic1")
.option("startingOffsets", "earliest")
.option("maxOffsetsPerTrigger", 100)
.option("request.timeout.ms", "60000")
.option("session.timeout.ms", "30000")
.load()
.select(col("value").cast(StringType()).alias("json"))
.select(from_json("json", jsonSchema).alias("data"))
.select("data.*"))
return df
Results:
Name Type
field1 string
field2 string