I'm reading a text file from adls gen2 using Databricks. This is my code.
I can read successfully, but when I'm defining the query and writing the stream, I'm getting an error: I could not find the ADLS gen2 token. Could you please provide me with a solution to perform spark streaming on a text file?
I have tried to pass the SAS token even though it is not able to stream the file instead it is able to stream the directory.
file_path = "adl://<account-name>.dfs.core.windows.net/<container>/<path>/*.txt"
streaming_df = spark.readStream \
.schema(schema) \
.text(file_path)
query = streaming_df_transformed.writeStream \
.outputMode("append") \
.format("console") \
.start()
query.awaitTermination()
For your first question instead of using adl
filesystem
you can use abfss
.
abfss://<container_name>@<storage_acc_name>.dfs.core.windows.net/
And you can configure the SAS
token using below code.
spark.conf.set("fs.azure.account.auth.type.<storage_acc_name>.dfs.core.windows.net", "SAS")
spark.conf.set("fs.azure.sas.token.provider.type.<storage_acc_name>.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.sas.FixedSASTokenProvider")
spark.conf.set("fs.azure.sas.fixed.token.<storage_acc_name>.dfs.core.windows.net", "Your_SAS_token")
Now for text files, you need to give directory for readstream and to filter out only .txt
you can give
pathGlobFilter
option while reading.
Code:
file_path = f"abfss://data@jadls2.dfs.core.windows.net/databricks/text/"
streaming_df = spark.readStream.schema(schema).option("basePath", file_path)\
.option("pathGlobFilter", "*.txt").text(file_path + "*")
display(streaming_df.select("*","_metadata.file_path"))
Output: