Currently I'm using hadoop-azure-3.4.1 via pyspark library to connect to ABFS. According to the documentation - https://hadoop.apache.org/docs/stable/hadoop-azure/abfs.html#Azure_Managed_Identity - there is an option to use Managed Identity for authentication. This feature works perfectly from an Azure VM, but it fails on my local server with the following error:
Caused by: org.apache.hadoop.fs.azurebfs.oauth2.AzureADAuthenticator$HttpException: HTTP Error 400; url='http://127.0.0.1:40342/metadata/identity/oauth2/token' AADToken: HTTP connection to http://127.0.0.1:40342/metadata/identity/oauth2/token failed for getting token from AzureAD.; contentType='application/json'; response '{"error":"invalid_request","error_description":"The api-version '2018-02-01' could not be found. Supported are 2021-02-01 2020-06-01 2019-11-01 2019-08-15","error_codes":[404],"timestamp":"2024-11-20 00:21:23.47342756 +0000 UTC m=+4427.811320871","trace_id":"","correlation_id":"efee3d6c-ddde-465c-96dd-d0c68c7f7ebd"}'
http://127.0.0.1:40342/metadata/identity/oauth2/token is the endpoint used to obtain tokens on machines connected to Azure through Arc: Azure official doc to use MI via ARC.
Based on hadoop-azure code, looks like this version is hardcoded: https://github.com/apache/hadoop/blob/trunk/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/AzureADAuthenticator.java#L176
My python code example:
from pyspark.sql import SparkSession
# Spark add-ons
path_to_hadoop_azure_jar = "/opt/hadoop-azure-3.4.1.jar"
path_to_hadoop_common_jar = "/opt/hadoop-common-3.4.1.jar"
path_to_azure_storage_jar = "/opt/azure-storage-8.6.6.jar"
path_to_azure_datalake_jar = "/opt/hadoop-azure-datalake-3.4.1.jar"
# ABFS variables
account_name = "pilotdbwsa"
container_name = "pilot-dbw"
container_path = "test1-test1-arc"
abfs_path = f"abfss://{container_name}@{account_name}.dfs.core.windows.net/{container_path}"
# Spark Session setup
spark = SparkSession.builder.appName("AzureDataRead") \
.config("spark.jars", f"{path_to_hadoop_common_jar},{path_to_hadoop_azure_jar},{path_to_azure_storage_jar},{path_to_azure_datalake_jar}") \
.getOrCreate()
spark.conf.set("fs.azure.createRemoteFileSystemDuringInitialization", "true")
spark.conf.set(f"fs.azure.account.auth.type.{account_name}.dfs.core.windows.net", "OAuth")
spark.conf.set(f"fs.azure.account.oauth.provider.type.{account_name}.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.MsiTokenProvider")
spark.conf.set(f"fs.azure.account.oauth2.msi.endpoint.{account_name}.dfs.core.windows.net", "http://127.0.0.1:40342/metadata/identity/oauth2/token")
# Logging
spark.sparkContext.setLogLevel("DEBUG")
# Create a simple DataFrame
data = [("Alice", 25), ("Bob", 30), ("Cathy", 28)]
columns = ["Name", "Age"]
df = spark.createDataFrame(data, columns)
# Write the DataFrame to ABFS as Parquet
try:
df.write.parquet(abfs_path)
print(f"Parquet file successfully written to {abfs_path}")
except Exception as e:
print(f"Error writing Parquet file: {e}")
QUESTION: Is it possible to programmatically override api-version or specify it in the Spark configuration?
11/25 - I have prepared changes for hadoop-azure and submitted a pull request github.com/apache/hadoop/pull/7186
Yes, you are right getting access token from arc enabled servers is different from regular one.
It calls the endpoint for intermediate key and with basic authorization the access token is generated.
When making request to endpoint for the first time it should handle exception and using that metadata further request is made to get token, but in hadoop azure jar it is not handled raising error even after using supported Api version.
So, below are the possible workaround.
Create a custom class in java where it has similar functionality like the code given here and use that class in spark.conf
as for custom token provider like mentioned here
For now you can use pandas to read/write the file with storage options giving default credentials.
First, run below command.
pip install fsspec,adlfs
code:
import pandas as pd
strg_opt = {'account_name': "jadls", 'anon': False}
p_df = pd.read_csv("abfs://data/csv/bulk.csv" ,storage_options=strg_opt)
df = spark.createDataFrame(p_df)
df.show()
p_df[["Date","Symbol"]].to_csv("abfs://data/csv/new.csv",storage_options=strg_opt)
Here, passing anon:False
to storage_options
uses default credential that is managed identity and creating spark dataframe from it.
Next, the path should be like in below format
abfs://{CONTAINER}/{FOLDER}/filename.csv
Output:
and in storage account