I am trying to migrate Pipelines from Azure Machine Learning SDK V1 to V2, but sometimes I don't understand the logic behind the V2 and I get stuck.
In V1, I just had to create PythonScriptStep and wrap it into a StepSequence and deploy the pipeline. My scripts are simple, no input, no outputs. We store data in ADLS Gen2 and use databricks tables as inputs. This is why I don't have any inputs/outputs.
script_step_1 = PythonScriptStep(
name="step1",
script_name="main.py",
arguments=arguments, # list of PipelineParameter
compute_target=ComputeTarget(workspace=ws, name="cpu-16-128"),
source_directory="./my_project_folder",
runconfig=runconfig, # Conda + extra index url + custom dockerfile
allow_reuse=False,
)
script_step_2 = PythonScriptStep(
name="step2",
...
)
step_sequence = StepSequence(
steps=[
script_step_1,
script_step_2,
]
)
# Create Pipeline
pipeline = Pipeline(
workspace=ws,
steps=step_sequence,
)
pipeline_run = experiment.submit(pipeline)
With V2, we need to create a "node" in a component that will be use by a pipeline.
I've made my Environment with dockerfile with BuildContext, and feed a representation of requirements.txt to a conda environment dictionary where I added my extra index url.
azureml_env = Environment(
build=BuildContext(
path="./docker_folder", # With Dockerfile and requirements.txt
),
name="my-project-env",
)
Now I make a command component that will invoke python and a script with some arguments:
step_1 = command(
environment=azureml_env ,
command="python main.py",
code="./my_project_folder",
)
Now that I have my step1 and step2 in SDK V2, I have no clue on how to make a sequence without Input/Output
@pipeline(compute="serverless")
def default_pipeline():
return {
"my_pipeline": [step_1, step_2]
}
I can not manage to make the pipeline
work to make a basic run a 2 consecutive steps.
I guess after I manage to get this right, I can create/update the pipeline like this:
my_pipeline = default_pipeline()
# submit the pipeline job
pipeline_job = ml_client.jobs.create_or_update(
my_pipeline,
experiment_name=experiment_name,
)
UPDATE 1:
Tried to create my own StepSequence
(very naive) with dummies input/outputs
class CommandSequence:
def __init__(self, commands, ml_client):
self.commands = commands
self.ml_client = ml_client
def build(self):
for i in range(len(self.commands)):
cmd = self.commands[i]
if i == 0:
cmd = command(
display_name=cmd.display_name,
description=cmd.description,
environment=cmd.environment,
command=cmd.command,
code=cmd.code,
is_deterministic=cmd.is_deterministic,
outputs=dict(
my_output=Output(type="uri_folder", mode="rw_mount"),
),
)
else:
cmd = command(
display_name=cmd.display_name,
description=cmd.description,
environment=cmd.environment,
command=cmd.command,
code=cmd.code,
is_deterministic=cmd.is_deterministic,
inputs=self.commands[i - 1].outputs.my_output,
outputs=dict(
my_output=Output(type="uri_folder", mode="rw_mount"),
),
)
cmd = self.ml_client.create_or_update(cmd.component)
self.commands[i] = cmd
print(self.commands[i])
return self.commands
I had to recreate command
because they protected a lot of stuff in the object...
@pipeline(compute="serverless")
def default_pipeline():
command_sequence = CommandSequence([step_1, step_2], ml_client).build()
return {
"my_pipeline": command_sequence[-1].outputs.my_output
}
But it fails to link the output of step 1 to input of step 2.
inputs=self.commands[i - 1].outputs.my_output, AttributeError: 'dict' object has no attribute 'my_output'
I made my own tools to recreate something that can acheive the same output.
I build a graph of steps (commands or node in Azure language) and then get the dependecy order of this graph and build the pipeline. This methods fits if you want to create a pipeline with sequencial or parallel steps. Which means, no custom Inputs/Outputs like Azure Machine Learning forces us to use to define the worflow logic. Some people, like me, just want to execute step 1 before step 2 with no data passing between them because the data is stored in a Database or a Azure Storage.
from collections import OrderedDict
from pathlib import Path
from typing import List
import random
import string
from azure.ai.ml import Input, Output, command
from azure.ai.ml.dsl import pipeline
from azure.ai.ml.entities import BuildContext, Environment
class StepsGraph:
def __init__(self):
"""
Initialize a StepsGraph instance to manage step dependencies.
The StepsGraph uses an ordered dictionary to store steps and their dependencies.
"""
self.steps = OrderedDict()
def add_edges(self, step_1, step_2):
"""
Add a dependency relationship between two steps.
Args:
step_1: The first step.
step_2: The step that depends on the first step.
"""
if step_1 not in self.steps:
self.steps[step_1] = []
if step_2 not in self.steps:
self.steps[step_2] = []
self.steps[step_1].append(step_2)
def get_dependency(self):
"""
Get the steps in the order of their dependencies.
Returns:
List: A list of steps in the order they need to be executed to satisfy all dependencies.
"""
def dfs(node, visited, result):
visited[node] = True
if node in self.steps:
for neighbor in self.steps[node]:
if not visited[neighbor]:
dfs(neighbor, visited, result)
result.append(node)
visited = {step: False for step in self.steps}
result = []
for step in self.steps:
if not visited[step]:
dfs(step, visited, result)
return result[::-1]
def get_parents(self, step):
"""
Get the steps that are dependent on a given step.
Args:
step: The step to find dependent steps for.
Returns:
List: A list of steps that depend on the given step.
"""
parents = []
for s, connections in self.steps.items():
if step in connections:
parents.append(s)
return parents
def print_steps(self):
for step, edges in self.steps.items():
print(f"Step {step} -> {edges}")
def create_input(step):
"""
Create an input dictionary for a step.
Args:
step (str): The name of the step for which to create an input.
Returns:
dict: A dictionary representing the input for the specified step with the following structure:
{step: Input(type="uri_folder", mode="rw_mount")}
"""
return {f"{step}": Input(type="uri_folder", mode="rw_mount")}
def create_output(step):
"""
Create an output dictionary for a step.
Args:
step (str): The name of the step for which to create an output.
Returns:
dict: A dictionary representing the output for the specified step with the following structure:
{step: Output(type="uri_folder", mode="rw_mount")}
"""
return {f"{step}": Output(type="uri_folder", mode="rw_mount")}
def create_pipeline(steps_graph, default_compute, name, experiment_name):
"""
Create a pipeline with specified steps and dependencies.
Args:
steps_graph (Step or StepsGraph): A Step or StepsGraph object representing the steps and their dependencies in the pipeline.
If a Step is provided, it will be treated as a standalone step.
default_compute: The default compute target for the pipeline (or None for serverless execution).
name (str): The name of the pipeline.
experiment_name (str): The name of the experiment associated with the pipeline.
Returns:
Callable: A callable function representing the created pipeline.
Raises:
ValueError: If `name` or `experiment_name` is not provided.
Note:
- The `steps_graph` argument can be a single Step or a StepsGraph object.
- The pipeline's structure is determined by the dependencies defined in the `steps_graph`.
- The pipeline is created as a Python function and can be executed when called.
Example:
# Create a pipeline with specific steps and dependencies
steps_graph = StepsGraph()
step1 = Step(...)
step2 = Step(...)
step3 = Step(...)
steps_graph.add_edges(step_1, step_2)
steps_graph.add_edges(step_2, step_3)
steps_graph.add_edges(step_2, step_4)
steps_graph.add_edges(step_2, step_6)
steps_graph.add_edges(step_4, step_5)
steps_graph.add_edges(step_3, step_7)
steps_graph.add_edges(step_6, step_7)
steps_graph.add_edges(step_5, step_7)
pipeline_job = create_pipeline(steps_graph, default_compute="my_compute", name="my_pipeline", experiment_name="my_experiment")
"""
# default_compute = None => Serverless
if not name:
raise ValueError("Please provide a `name` for your pipeline.")
if not experiment_name:
raise ValueError("Please provide an `experiment_name` for your pipeline.")
@pipeline(
default_compute=default_compute,
name=experiment_name,
experiment_name=experiment_name,
)
def default_pipeline():
if isinstance(steps_graph, Step):
steps_graph.build()()
return
dependency_oder = steps_graph.get_dependency()
command_dict = {}
parent_dict = {}
for step, edges in steps_graph.steps.items():
print(f"Step {step} -> {edges}")
parent_dict[str(step)] = steps_graph.get_parents(step)
print(f"parent_dict : {parent_dict}")
print(f"dependency_oder: {dependency_oder}")
for step in dependency_oder:
print(f"step : {step}")
inputs_dict = {}
step.update_link(
outputs=create_output(step),
)
for parent_node in reversed(parent_dict[str(step)]):
step.update_link(
inputs=create_input(parent_node),
)
custom_output = getattr(
command_dict[str(parent_node)].outputs, str(parent_node)
)
input_name = list(parent_node.outputs.keys())[
0
] # Because we know we have only one output per steps
inputs_dict[input_name] = custom_output
print(inputs_dict)
for key, value in inputs_dict.items():
print(key, value._port_name)
print(step.inputs)
command_dict[str(step)] = step.build()(**inputs_dict)
return default_pipeline()
def generate_custom_uuid(length=8, parts=4):
custom_uuid = ""
for _ in range(parts):
part = "".join(random.choices(string.ascii_letters + string.digits, k=length))
custom_uuid += part + "_"
custom_uuid = custom_uuid[:-1]
return custom_uuid
class Step:
"""
Represents a step in a StepsGraph.
This class is used to define and manage the properties of a step,
including its inputs and outputs. It provides methods for updating
the input and output links and for building the step's command.
Attributes:
inputs (dict): A dictionary of input values for the step.
outputs (dict): A dictionary of output values for the step.
Methods:
__init__(self, **kwargs): Initializes a Step object with optional
keyword arguments to set initial properties.
__str__(self): Returns a string representation of the step.
update_link(self, inputs=None, outputs=None): Updates the input and
output links with the provided dictionaries.
build(self): Builds and returns the command for executing the step.
Example usage:
>>> my_step = Step(name="Sample Step", inputs={"input_1": "value1"})
>>> my_step.update_link(outputs={"output_1": "result"})
>>> command = my_step.build()
>>> # Then you need to call the command to build the inputs/outputs. Use `create_pipeline` for this.
"""
def __init__(self, **kwargs):
self.inputs = None
self.outputs = None
self.__dict__.update(kwargs)
self.uuid = self.display_name + "_" + generate_custom_uuid()
def __str__(self):
return self.uuid
def update_link(self, inputs=None, outputs=None):
if self.inputs and inputs:
self.inputs.update(inputs)
elif inputs:
self.inputs = inputs
if self.outputs and outputs:
self.outputs.update(outputs)
elif outputs:
self.outputs = outputs
def build(self):
return command(**self.__dict__)
Basically Step
can get the same arguments as the command
function from azure.ai.ml.command
Here is how I use it:
step_1 = Step(
display_name="step_1",
description="step_1",
environment=...,
command="python main.py",
code...,
is_deterministic=False,
environment_variables=...,
)
step_2 = Step(...)
step_3 = Step(...)
step_4 = Step(...)
step_5 = Step(...)
step_6 = Step(...)
step_7 = Step(...)
steps_graph = StepsGraph()
steps_graph.add_edges(step_1, step_2)
steps_graph.add_edges(step_2, step_3)
steps_graph.add_edges(step_2, step_4)
steps_graph.add_edges(step_2, step_6)
steps_graph.add_edges(step_4, step_5)
steps_graph.add_edges(step_3, step_7)
steps_graph.add_edges(step_6, step_7)
steps_graph.add_edges(step_5, step_7)
pipeline_job = create_pipeline(steps_graph, default_compute="my_compute", name="my_pipeline", experiment_name="my_experiment")
pipeline_job = ml_client.jobs.create_or_update(pipeline_job)
print(pipeline_job) # To get URL in terminal, or you can play with it for schedules