I retrieve from yahoofinance endpoint a time series data, I store it in azure blob that requires a connection string to access it. I then create a dataframe with this time series and add it to the tracking of mlflow using log_input.
When I create the dataframe with mlflow.data.from_pandas I register the blob URL.
def save_df_to_blob(raw_data, blob_name):
csv_buffer = io.StringIO()
raw_data.to_csv(csv_buffer, index=True)
csv_data = csv_buffer.getvalue()
connect_str = _credentials.AZURE_STORAGE_CONNECTION_STRING
container_name = _credentials.storage_container_name
directory = _credentials.blob_directory
blob_service_client = BlobServiceClient.from_connection_string(connect_str)
container_client = blob_service_client.get_container_client(f"{container_name}/{directory}")
try:
container_client.create_container()
except Exception as e:
print(f"Container already exists.")
blob_client = container_client.get_blob_client(blob_name)
blob_client.upload_blob(csv_data, blob_type="BlockBlob", overwrite=True)
print(f"File uploaded to Blob as {blob_name}")
return blob_client.url
dataset = mlflow.data.from_pandas(
data_df, source=blob_client_url, name=f"{stock_ticker}_{input_or_pred}_{train_or_test}")
mlflow.log_input(dataset, context=train_or_test_or_all)
Since it's protected and requires a connection string to access the blob URI, the data source registry in dataset_source_registry.py cannot resolve and throws:
C:\ProgramData\miniconda3\envs\mlflow-spark\Lib\site-packages\mlflow\data\dataset_source_registry.py:150: UserWarning: Failed to determine whether UCVolumeDatasetSource can resolve source information for 'https://<storagename>.blob.core.windows.net/<container>/<filename>.csv'. Exception:
return _dataset_source_registry.resolve(
Is there a way to register a protected URI with mlflow.data.from_pandas or a different way to log_input this pd dataframe? I would not want to stop warnings because I may miss something else relevant.
Is there a way to register a protected URI with mlflow.data.from_pandas or a different way to log_input this pd dataframe? I would not want to stop warnings because I may miss something else relevant.
You can be able to register with mflow
using Azure SAS
token with protected URI with python SDK.
Here is the code to access the csv file by creating SAS token using python.
Code:
from azure.storage.blob import generate_blob_sas, BlobSasPermissions, BlobServiceClient
from datetime import datetime, timedelta,timezone
import mlflow
import pandas as pd
from mlflow.data.sources import LocalArtifactDatasetSource
# Connection string
connect_str = "DefaultEndpointsProtocol=https;AccountName=venkat326123;AccountKey=T3czZpu1gZxxxxD9nyWw==;EndpointSuffix=core.windows.net"
parts = connect_str.split(';')
account_name = ""
account_key = ""
for part in parts:
if part.startswith('AccountName='):
account_name = part[len('AccountName='):]
elif part.startswith('AccountKey='):
account_key = part[len('AccountKey='):]
container_name = "test"
blob_name="sample3.csv"
# Function to generate SAS URL
def generate_sas_url(blob_name):
blob_service_client = BlobServiceClient.from_connection_string(connect_str)
blob_client = blob_service_client.get_blob_client(container=container_name, blob=blob_name)
sas_expiry = datetime.now(timezone.utc) + timedelta(hours=1)
sas_permissions = BlobSasPermissions(read=True, write=True)
blob_sas_token = generate_blob_sas(
account_name=account_name,
container_name=container_name,
blob_name=blob_name,
account_key=account_key,
permission=sas_permissions,
expiry=sas_expiry
)
blob_url = blob_client.url
sas_url = f"{blob_url}?{blob_sas_token}"
return sas_url
blob_sas_url = generate_sas_url(blob_name)
data_df=pd.read_csv(blob_sas_url)
# Log DataFrame with MLflow
dataset = mlflow.data.from_pandas(data_df, source=LocalArtifactDatasetSource(blob_sas_url), name="example_dataset")
mlflow.log_input(dataset, context="example_context")
# Fetch and print dataset info from MLflow run
run = mlflow.get_run(mlflow.last_active_run().info.run_id)
dataset_info = run.inputs.dataset_inputs[0].dataset
print(f"Dataset name: {dataset_info.name}")
print(f"Dataset digest: {dataset_info.digest}")
print(f"Dataset profile: {dataset_info.profile}")
print(f"Dataset schema: {dataset_info.schema}")
# Load the dataset's source, which downloads the content from the source URL to the local filesystem
dataset_source = mlflow.data.get_source(dataset_info)
dataset_source.load()
Output:
Dataset name: example_dataset
Dataset digest: 77a19fc0
Dataset profile: {"num_rows": 1599, "num_elements": 1599}
Dataset schema: {"mlflow_colspec": [{"type": "string", "name": "fixed acidity;\"volatile acidity\";\"citric acid\";\"residual sugar\";\"chlorides\";\"free sulfur dioxide\";\"total sulfur dioxide\";\"density\";\"pH\";\"sulphates\";\"alcohol\";\"quality\"", "required": true}]}