azure-blob-storagemlflow

Protected Azure Blob URL cannot be resolved by mlflow.data.from_pandas as datasetsource


I retrieve from yahoofinance endpoint a time series data, I store it in azure blob that requires a connection string to access it. I then create a dataframe with this time series and add it to the tracking of mlflow using log_input.

When I create the dataframe with mlflow.data.from_pandas I register the blob URL.

def save_df_to_blob(raw_data, blob_name):
    csv_buffer = io.StringIO()
    raw_data.to_csv(csv_buffer, index=True)
    csv_data = csv_buffer.getvalue()

    connect_str = _credentials.AZURE_STORAGE_CONNECTION_STRING
    container_name = _credentials.storage_container_name
    directory = _credentials.blob_directory

    blob_service_client = BlobServiceClient.from_connection_string(connect_str)
    container_client = blob_service_client.get_container_client(f"{container_name}/{directory}")
    try:
        container_client.create_container()
    except Exception as e:
        print(f"Container already exists.")

    blob_client = container_client.get_blob_client(blob_name)
    blob_client.upload_blob(csv_data, blob_type="BlockBlob", overwrite=True)

    print(f"File uploaded to Blob as {blob_name}")
    
    return blob_client.url

dataset = mlflow.data.from_pandas(
        data_df, source=blob_client_url, name=f"{stock_ticker}_{input_or_pred}_{train_or_test}")

mlflow.log_input(dataset, context=train_or_test_or_all)

Since it's protected and requires a connection string to access the blob URI, the data source registry in dataset_source_registry.py cannot resolve and throws:

C:\ProgramData\miniconda3\envs\mlflow-spark\Lib\site-packages\mlflow\data\dataset_source_registry.py:150: UserWarning: Failed to determine whether UCVolumeDatasetSource can resolve source information for 'https://<storagename>.blob.core.windows.net/<container>/<filename>.csv'. Exception:
  return _dataset_source_registry.resolve(

Is there a way to register a protected URI with mlflow.data.from_pandas or a different way to log_input this pd dataframe? I would not want to stop warnings because I may miss something else relevant.


Solution

  • Is there a way to register a protected URI with mlflow.data.from_pandas or a different way to log_input this pd dataframe? I would not want to stop warnings because I may miss something else relevant.

    You can be able to register with mflow using Azure SAS token with protected URI with python SDK.

    Here is the code to access the csv file by creating SAS token using python.

    Code:

    from azure.storage.blob import generate_blob_sas, BlobSasPermissions, BlobServiceClient
    from datetime import datetime, timedelta,timezone
    import mlflow
    import pandas as pd
    from mlflow.data.sources import LocalArtifactDatasetSource
    
    
    # Connection string
    connect_str = "DefaultEndpointsProtocol=https;AccountName=venkat326123;AccountKey=T3czZpu1gZxxxxD9nyWw==;EndpointSuffix=core.windows.net"
    
    parts = connect_str.split(';')
    account_name = ""
    account_key = ""
    
    for part in parts:
        if part.startswith('AccountName='):
            account_name = part[len('AccountName='):]
        elif part.startswith('AccountKey='):
            account_key = part[len('AccountKey='):]
    
    
    container_name = "test"
    blob_name="sample3.csv"
    
    # Function to generate SAS URL
    def generate_sas_url(blob_name):
        blob_service_client = BlobServiceClient.from_connection_string(connect_str)
        blob_client = blob_service_client.get_blob_client(container=container_name, blob=blob_name)
    
        sas_expiry = datetime.now(timezone.utc) + timedelta(hours=1)
        sas_permissions = BlobSasPermissions(read=True, write=True)
        
        blob_sas_token = generate_blob_sas(
            account_name=account_name,
            container_name=container_name,
            blob_name=blob_name,
            account_key=account_key,
            permission=sas_permissions,
            expiry=sas_expiry
            )
    
        blob_url = blob_client.url
        sas_url = f"{blob_url}?{blob_sas_token}"
        return sas_url
    blob_sas_url = generate_sas_url(blob_name)
    data_df=pd.read_csv(blob_sas_url)
    
    # Log DataFrame with MLflow
    dataset = mlflow.data.from_pandas(data_df, source=LocalArtifactDatasetSource(blob_sas_url), name="example_dataset")
    mlflow.log_input(dataset, context="example_context")
    
    # Fetch and print dataset info from MLflow run
    run = mlflow.get_run(mlflow.last_active_run().info.run_id)
    dataset_info = run.inputs.dataset_inputs[0].dataset
    print(f"Dataset name: {dataset_info.name}")
    print(f"Dataset digest: {dataset_info.digest}")
    print(f"Dataset profile: {dataset_info.profile}")
    print(f"Dataset schema: {dataset_info.schema}")
    
    # Load the dataset's source, which downloads the content from the source URL to the local filesystem
    dataset_source = mlflow.data.get_source(dataset_info)
    dataset_source.load()
    

    Output:

    Dataset name: example_dataset
    Dataset digest: 77a19fc0
    Dataset profile: {"num_rows": 1599, "num_elements": 1599}
    Dataset schema: {"mlflow_colspec": [{"type": "string", "name": "fixed acidity;\"volatile acidity\";\"citric acid\";\"residual sugar\";\"chlorides\";\"free sulfur dioxide\";\"total sulfur dioxide\";\"density\";\"pH\";\"sulphates\";\"alcohol\";\"quality\"", "required": true}]}
    

    enter image description here

    Reference: [BUG] Warning about LocalArtifactDatasetSource being ambiguous with itself · Issue #9702 · mlflow/mlflow (github.com)