I have a yml template with a job that triggers pytest class.
The file with allt he test classes is (test_invoke_pipelines.py:
import logging
import os
import pytest
from azure.identity import DefaultAzureCredential
from azure.mgmt.datafactory import DataFactoryManagementClient
logging.basicConfig(level=logging.INFO)
LOAD_DATA_ENHANCED_PIPELINE = "my-enhanced-pipeline"
PREPROCESS_PIPELINE = "my-preprocess-pipeline"
TRAIN_MONTHLY_PIPELINE = "my-train-pipeline"
SCORE_MONTHLY_PIPELINE = "my-score-pipeline"
class TestDataLoad:
"""Data Load pipeline test cases."""
@pytest.mark.order(1)
def test_load_data_pipeline_succeeded(self, adf_pipeline_run):
"""Test that pipeline has data in SQL."""
this_run = adf_pipeline_run(LOAD_DATA_ENHANCED_PIPELINE, run_inputs={"pl_full_load": True})
assert this_run.status == "Succeeded" # noqa: S101
@pytest.mark.order(2)
def test_preprocess_pipeline_succeeded(self, adf_pipeline_run):
"""Test that preprocess pipeline works."""
this_run = adf_pipeline_run(PREPROCESS_PIPELINE, run_inputs={})
assert this_run.status == "Succeeded" # noqa: S101
class TestTrainingPipeline:
"""Train pipeline test cases - Used for CT."""
subscription_id = os.environ.get("SUBSCRIPTION_ID")
resource_group_name = os.environ.get("DATA_RESOURCE_GROUP")
factory_name = os.environ.get("ADF_NAME")
pipeline_name = os.environ.get("PIPELINE_NAME")
print(f"Subscription ID: {subscription_id}")
print(f"RG Name: {resource_group_name}")
print(f"Factory Name: {factory_name}")
print(f"Pipeline Name: {pipeline_name}")
def test_pipeline_runs(self):
"""Test train pipeline case."""
credential = DefaultAzureCredential()
adf_client = DataFactoryManagementClient(credential, self.subscription_id)
# Trigger the pipeline run
pipeline_run = adf_client.pipelines.create_run(self.resource_group_name, self.factory_name, self.pipeline_name)
# Assert that the pipeline run was successfully initiated
print(f"Pipeline Run ID: {pipeline_run.run_id}")
assert pipeline_run.run_id is not None # noqa: S101
# Check the pipeline status at the start
pipeline_status = adf_client.pipeline_runs.get(
self.resource_group_name, self.factory_name, pipeline_run.run_id
).status
assert pipeline_status in ["InProgress", "Queued", "Canceling"] # noqa: S101
print(f"##vso[task.setvariable variable=ADF_PIPELINE_RUN_ID]{pipeline_run.run_id}")
for the TestDataLoad, it uses adf test plugin and I call the test using the template (integration-template)below:
- task: AzureCLI@2
displayName: '3 Run Integration Test Data Load'
inputs:
azureSubscription: 'datanalytics-${{ parameters.ENV }}'
scriptType: bash
scriptLocation: inlineScript
inlineScript:
pytest test_invoke_pipelines.py::TestDataLoad --sub_id ${{ parameters.SUBSCRIPTION_ID }} --rg_name ${{ parameters.DATA_RESOURCE_GROUP }} --adf_name ${{ parameters.ADF_NAME }} --sp_tenant_id ${{ parameters.TENANT_ID }} --doctest-modules --junitxml=junit/test-integration-results.xml
addSpnToEnvironment: true
workingDirectory: tests
and pytest automatically picks the arguments like subscription_id, resource_group_name when passed with above command as TestDataLoad uses adf test plugin/extension in pytest.
But for the TestTrainingPipeline, I dnt use the adf plugin and instead use adf sdk to trigger an adf pipeline and write the adf pipeline run id to a variable to it can be used for subsequent steps.
The problem is:
How do I pass the arguments like subscription_id and others to the TestTrainingPipeline class. I have another pipeline that calls this test calss as:
- task: AzureCLI@2
name: CTTraining
displayName: '4 Run Integration Tests Train'
inputs:
azureSubscription: 'datanalytics-${{ parameters.ENV }}'
scriptType: bash
scriptLocation: inlineScript
inlineScript: |
echo "##vso[task.setvariable variable=SUBSCRIPTION_ID]${{ parameters.SUBSCRIPTION_ID }}"
echo "##vso[task.setvariable variable=DATA_RESOURCE_GROUP]${{ parameters.DATA_RESOURCE_GROUP }}"
echo "##vso[task.setvariable variable=ADF_NAME]${{ parameters.ADF_NAME }}"
echo "##vso[task.setvariable variable=PIPELINE_NAME]${{ parameters.TRAIN_PIPELINE_NAME }}"
pytest test_invoke_pipelines.py::TestTrainingPipeline --verbose --capture=no --doctest-modules --junitxml=junit/test-ct-results.xml
addSpnToEnvironment: true
workingDirectory: tests
but it throws the error that arguments are unrecognized.
I do not want to use the conftest.py file. I have also tried writing the parameters to os vars before calling pytest:
echo "##vso[task.setvariable variable=SUBSCRIPTION_ID]${{ parameters.SUBSCRIPTION_ID }}"
echo "##vso[task.setvariable variable=DATA_RESOURCE_GROUP]${{ parameters.DATA_RESOURCE_GROUP }}"
echo "##vso[task.setvariable variable=ADF_NAME]${{ parameters.ADF_NAME }}"
echo "##vso[task.setvariable variable=PIPELINE_NAME]${{ parameters.TRAIN_PIPELINE_NAME }}"
but in TestTrainingPipeline class I get None for all these variables:
subscription_id = os.environ.get("SUBSCRIPTION_ID")
resource_group_name = os.environ.get("DATA_RESOURCE_GROUP")
factory_name = os.environ.get("ADF_NAME")
pipeline_name = os.environ.get("PIPELINE_NAME", "pl_train_monthly")
Is there a way to pass these arguments from a yml pipeline to the pytestclass?
The trace I get from the failed test is:
=================================== FAILURES =================================== ___________________ TestTrainingPipeline.test_pipeline_runs ____________________
self = <test_invoke_pipelines.TestTrainingPipeline object at 0x7f8b5bd82800>
def test_pipeline_runs(self): """Test train pipeline case.""" credential = DefaultAzureCredential()
adf_client = DataFactoryManagementClient(credential, self.subscription_id)
test_invoke_pipelines.py:55: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ ../../../_tool/Python/3.10.12/x64/lib/python3.10/site-packages/azure/mgmt/datafactory/_data_factory_management_client.py:132: in init self._config = DataFactoryManagementClientConfiguration(
self = <azure.mgmt.datafactory._configuration.DataFactoryManagementClientConfiguration object at 0x7f8b5bb7e590> credential = <azure.identity._credentials.default.DefaultAzureCredential object at 0x7f8b5bd83d00> subscription_id = None, kwargs = {}, api_version = '2018-06-01'
def __init__(self, credential: "TokenCredential", subscription_id: str, **kwargs: Any) -> None: api_version: str = kwargs.pop("api_version", "2018-06-01") if credential is None: raise ValueError("Parameter 'credential' must not be None.") if subscription_id is None:
raise ValueError("Parameter 'subscription_id' must not be None.") E ValueError: Parameter 'subscription_id' must not
be None.
../../../_tool/Python/3.10.12/x64/lib/python3.10/site-packages/azure/mgmt/datafactory/_configuration.py:42: ValueError -------- generated Nunit xml file: /azp/_work/3/s/tests/test-output.xml -------- ------ generated xml file: /azp/_work/3/s/tests/junit/test-ct-results.xml ------ =========================== short test summary info ============================ FAILED test_invoke_pipelines.py::TestTrainingPipeline::test_pipeline_runs - V... ============================== 1 failed in 0.61s ===============================
It fails becasue the subscription_id is None
Setting up variable sin the bash worked.
SUBSCRIPTION_ID="${{ parameters.SUBSCRIPTION_ID }}"
DATA_RESOURCE_GROUP="${{ parameters.DATA_RESOURCE_GROUP }}"
ADF_NAME="${{ parameters.ADF_NAME }}"
PIPELINE_NAME="${{ parameters.TRAIN_PIPELINE_NAME }}"
export SUBSCRIPTION_ID DATA_RESOURCE_GROUP ADF_NAME PIPELINE_NAME
And in the script access as:
subscription_id = os.environ.get("SUBSCRIPTION_ID")
resource_group_name = os.environ.get("DATA_RESOURCE_GROUP")
factory_name = os.environ.get("ADF_NAME")
pipeline_name = os.environ.get("PIPELINE_NAME")