apache-sparkhadooppysparkazure-arc

how to set "api-version" dynamically in fs.azure.account.oauth2.msi.endpoint


Currently I'm using hadoop-azure-3.4.1 via pyspark library to connect to ABFS. According to the documentation - https://hadoop.apache.org/docs/stable/hadoop-azure/abfs.html#Azure_Managed_Identity - there is an option to use Managed Identity for authentication. This feature works perfectly from an Azure VM, but it fails on my local server with the following error:

Caused by: org.apache.hadoop.fs.azurebfs.oauth2.AzureADAuthenticator$HttpException: HTTP Error 400; url='http://127.0.0.1:40342/metadata/identity/oauth2/token' AADToken: HTTP connection to http://127.0.0.1:40342/metadata/identity/oauth2/token failed for getting token from AzureAD.; contentType='application/json'; response '{"error":"invalid_request","error_description":"The api-version '2018-02-01' could not be found. Supported are 2021-02-01 2020-06-01 2019-11-01 2019-08-15","error_codes":[404],"timestamp":"2024-11-20 00:21:23.47342756 +0000 UTC m=+4427.811320871","trace_id":"","correlation_id":"efee3d6c-ddde-465c-96dd-d0c68c7f7ebd"}'

http://127.0.0.1:40342/metadata/identity/oauth2/token is the endpoint used to obtain tokens on machines connected to Azure through Arc: Azure official doc to use MI via ARC.

Based on hadoop-azure code, looks like this version is hardcoded: https://github.com/apache/hadoop/blob/trunk/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/AzureADAuthenticator.java#L176

My python code example:

from pyspark.sql import SparkSession

# Spark add-ons
path_to_hadoop_azure_jar = "/opt/hadoop-azure-3.4.1.jar"
path_to_hadoop_common_jar = "/opt/hadoop-common-3.4.1.jar"
path_to_azure_storage_jar = "/opt/azure-storage-8.6.6.jar"
path_to_azure_datalake_jar = "/opt/hadoop-azure-datalake-3.4.1.jar"

# ABFS variables
account_name = "pilotdbwsa"
container_name = "pilot-dbw"
container_path = "test1-test1-arc"
abfs_path = f"abfss://{container_name}@{account_name}.dfs.core.windows.net/{container_path}"

# Spark Session setup
spark = SparkSession.builder.appName("AzureDataRead") \
    .config("spark.jars", f"{path_to_hadoop_common_jar},{path_to_hadoop_azure_jar},{path_to_azure_storage_jar},{path_to_azure_datalake_jar}") \
    .getOrCreate()
spark.conf.set("fs.azure.createRemoteFileSystemDuringInitialization", "true")

spark.conf.set(f"fs.azure.account.auth.type.{account_name}.dfs.core.windows.net", "OAuth")
spark.conf.set(f"fs.azure.account.oauth.provider.type.{account_name}.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.MsiTokenProvider")
spark.conf.set(f"fs.azure.account.oauth2.msi.endpoint.{account_name}.dfs.core.windows.net", "http://127.0.0.1:40342/metadata/identity/oauth2/token")

# Logging
spark.sparkContext.setLogLevel("DEBUG")

# Create a simple DataFrame
data = [("Alice", 25), ("Bob", 30), ("Cathy", 28)]
columns = ["Name", "Age"]
df = spark.createDataFrame(data, columns)

# Write the DataFrame to ABFS as Parquet
try:
    df.write.parquet(abfs_path)
    print(f"Parquet file successfully written to {abfs_path}")
except Exception as e:
    print(f"Error writing Parquet file: {e}")

QUESTION: Is it possible to programmatically override api-version or specify it in the Spark configuration?

UPDATES:

11/25 - I have prepared changes for hadoop-azure and submitted a pull request github.com/apache/hadoop/pull/7186


Solution

  • Yes, you are right getting access token from arc enabled servers is different from regular one.

    It calls the endpoint for intermediate key and with basic authorization the access token is generated.

    When making request to endpoint for the first time it should handle exception and using that metadata further request is made to get token, but in hadoop azure jar it is not handled raising error even after using supported Api version.

    So, below are the possible workaround.

    1. Create a custom class in java where it has similar functionality like the code given here and use that class in spark.conf as for custom token provider like mentioned here

    2. For now you can use pandas to read/write the file with storage options giving default credentials.

    First, run below command.

    pip install fsspec,adlfs
    

    code:

    import pandas as pd
    
    strg_opt = {'account_name': "jadls", 'anon': False}
    p_df = pd.read_csv("abfs://data/csv/bulk.csv" ,storage_options=strg_opt)
    
    df = spark.createDataFrame(p_df)
    df.show()
    p_df[["Date","Symbol"]].to_csv("abfs://data/csv/new.csv",storage_options=strg_opt)
    

    Here, passing anon:False to storage_options uses default credential that is managed identity and creating spark dataframe from it.

    Next, the path should be like in below format

    abfs://{CONTAINER}/{FOLDER}/filename.csv

    Output:

    enter image description here

    and in storage account

    enter image description here