I am building a pipeline composed of 6 components using Azure Machine Learning SDK2 Python, and I am currently working on the 4 component.
In my script, I created a Sweep Job to hyper-tune my model, which is an autoencoder. The script performs the following tasks:
I am saving the following:
mlflow.sklearn_log()
raised a warning that .predict
method is missing (To me, this function was built for End-to-end Pipeline + Algo from Sklearn) therefore I use a custom pyfunc from mlflowAfter several job experiments, I realized that the SweepJob component automatically outputs the best child. I cannot use mlflow_model
as an output because I have four different outputs, not just one (I need all four).
I thought of using uri_folder, but I'm unsure how iterate thorugh the uri_folder to get my ScalerAutoencoderWrapper or use mlflow to deploy the model in the next step
My question is:
Any feedback on how the files are saved is also welcome. Thanks a lot!
I have attached my code.
.... PARTIAL CODE ...
# Log Model
with mlflow.start_run() as run:
# Build Model
autoencoder, encoder = build_model(
input_dim=input_dim,
hidden_layers=hidden_layers,
encoded_dim=encoded_dim,
l1_regularizer=l1_regularizer,
learning_rate=learning_rate,
return_encoder=return_encoder,
)
# Define Strategy
early_stopping = EarlyStopping(
monitor=MONITOR,
patience=patience,
restore_best_weights=True
)
# Fit & Keep History
autoencoder.fit(
X_scaled,
X_scaled,
epochs=epochs,
batch_size=batch_size,
validation_data=(X_validate_scaled, X_validate_scaled),
callbacks=[early_stopping, MLflowCallback()], # Log the final validation loss
)
# Save Model artifacts
input_raw_example = X_train.iloc[:5]
input_transformed = scalerObj.fit_transform(input_raw_example)
# Artifact Names
scaler_pkl = 'scaler.pkl'
encoder_folder = 'encoder'
autoencoder_folder= 'autoencoder'
autoencoder_wrapper_folder = 'scaler_autoencoder_wrapper'
# Save StandardScaler Object
print("--------------> Save Object Scaler")
with open('scaler.pkl', "wb") as f:
pickle.dump(scalerObj, f)
mlflow.log_artifact('scaler.pkl', run_id=run_id)
# Save encoder layers
print("--------------> Save Encoder")
mlflow.keras.log_model(encoder, 'encoder', input_example=input_transformed)
# Save Autoencoder model Only
print("--------------> Save AutoEncoder")
mlflow.keras.log_model(autoencoder, 'autoencoder', input_example=input_transformed)
# Save StandardScaler + Autoencoder
print("--------------> Save ScalerAutoencoderWrapper")
scaler_autoencoder_wrapper = ScalerAutoencoderWrapper(
scaler=scalerObj,
autoencoder=autoencoder
)
mlflow.pyfunc.log_model(
artifact_path='scaler_autoencoder_wrapper',
python_model=scaler_autoencoder_wrapper,
input_example=input_transformed,
signature=infer_signature(
model_input=input_transformed,
model_output=scaler_autoencoder_wrapper.predict(
context=None,
model_input=input_raw_example
)
),
)
print(f"Training Completed, Model and Scaler saved with id : {run_id}")
My Python code
# Create Nodes for Pipelines
@pipeline(default_compute = 'XXXX',
display_name="ABCDE",
experiment_name = "EFGH",
tags={'objective':'DONTKNOW'})
def pipeline_autoencoder(input_file):
# Step 1: Local Feature Selection
feature_extraction_step = feature_extraction(
input_file = input_file,
)
# Step 2: Local Split Selection
data_split_step = data_split (
input_file = feature_extraction_step.outputs.output_file,
)
# Step 3: Hyperparameter tuning (Sweep Job)
train_model_step = train_tune_model(
x_train=data_split_step.outputs.x_train_path,
y_train=data_split_step.outputs.y_train_path,
x_validate=data_split_step.outputs.x_validate_path,
y_validate=data_split_step.outputs.y_validate_path,
hidden_layers = Choice([str, str]),
encoded_dim=Choice([int]),
l1_regularizer=Choice([float, float),
learning_rate=Choice([float, float]),
batch_size=Choice([int, int]),
epochs=Choice([int, int]),
patience=Choice([int, int]),
)
# OverWrite
sweep_step = train_model_step.sweep(
compute='XXXX',
primary_metric = "METRIC",
goal = "MINIMIZE",
sampling_algorithm="RANDOM",
)
sweep_step.early_termination = BanditPolicy(
evaluation_interval=INT,
slack_factor=FLOAT,
delay_evaluation=INT)
sweep_step.set_limits(max_total_trials=INT, max_concurrent_trials=INT, timeout=INT)
# Step 4
deploybestchild (NOT DONE ALREADY)
)
return {
'model_output': sweep_step.outputs.model_output,
"x_test": data_split_step.outputs.x_test_path,
"y_test": data_split_step.outputs.y_test_path,
}
uri_folder
and got only a blob storage file encodedos.walk(path)
but I got nothing is saying empty# train model
model = train_model(params, X_train, X_test, y_train, y_test)
# Output the model and test data
# write to local folder first, then copy to output folder
mlflow.sklearn.save_model(model, "model")
from distutils.dir_util import copy_tree
# copy subdirectory example
from_directory = "model"
to_directory = args.model_output
copy_tree(from_directory, to_directory)
update ont @JayashankarGS answer
I truly appreciate your support, but I'm currently facing an error that says either the blob storage does not exist or I get the following: yaml.representer.RepresenterError: ('cannot represent an object')
.
I've noticed that you have a strong reputation for resolving these types of issues. Could you assist me with the following questions?
pyfunc.log_model
. Should I switch from an MLflow model to a custom model since I’m using pyfunc
? (specifically regarding YAML input/output)outputs:
model_ouput:
type: **mlflow_model** or **custom_model**
description: ....
artifacts_output:
type: uri_folder
description:.....
I tried setting (as you suggested)
autoencoder_wrapper_folder = args.artifacts+'scaler_autoencoder_wrapper'`, but I encountered `yaml.representer.RepresenterError: ('cannot represent an object', PosixPath('/mnt/azureml/cr/j/ABCDE/cap/data-capability/wd/model_output/model'))
.
Following the Azure example:
# copy subdirectory example
from_directory = "model"
to_directory = args.model_output
copy_tree(from_directory, to_directory)
I got the error 'model is not a directory.' I then tried using os.getcwd()
because i assumed my model is saved in the current dir, but I received 'No such file or directory: '/mnt/azureml/cr/j/ABCDS/exe/wd/model'.' It seems the model isn't being saved there, and I’m unsure where Azure Machine Learning is saving it.
.h5
file and the StandardScaler
object to another component for evaluation and registration. This isn't ideal, but it's the only workaround I’ve found. I suspect pyfunc
might need a different approach for saving, but I’m unsure as the documentation only provides simple examples using sweep job and Sklearn.log_model or mlflow but only with one model but not sweep + mlflow save models."You need to log the files in the output location you configured for training component.
Below is the sample training component yaml file.
$schema: https://azuremlschemas.azureedge.net/latest/commandComponent.schema.json
type: command
name: train_model
display_name: train_model
version: 1
inputs:
data:
type: uri_folder
....Your extra inputs
max_iter:
type: integer
default: -1
decision_function_shape:
type: string
default: ovr
break_ties:
type: boolean
default: false
random_state:
type: integer
default: 42
outputs:
model_output:
type: mlflow_model
artifacts:
type: uri_folder
code: ./train-src
environment: azureml:AzureML-sklearn-1.0-ubuntu20.04-py38-cpu@latest
command: >-
python train.py
--data ${{inputs.data}}
...You extra arguments
--model_output ${{outputs.model_output}}
--artifacts ${{outputs.artifacts}}
Here, if see the outputs having the model_output
and artifacts
which is used to log model and save your files.
Pass these values in training component arguments like below and it will be saved.
train_model_step = train_tune_model(
x_train=data_split_step.outputs.x_train_path,
y_train=data_split_step.outputs.y_train_path,
x_validate=data_split_step.outputs.x_validate_path,
y_validate=data_split_step.outputs.y_validate_path,
hidden_layers = Choice([str, str]),
encoded_dim=Choice([int]),
l1_regularizer=Choice([float, float),
learning_rate=Choice([float, float]),
batch_size=Choice([int, int]),
epochs=Choice([int, int]),
patience=Choice([int, int]),
model_output="Path to save model",
artifacts="Path to save artifacts."
)
Below the code you need to save.
# Artifact Names
scaler_pkl = args.artifacts+'scaler.pkl'
encoder_folder = args.artifacts+'encoder'
autoencoder_folder= args.artifacts+'autoencoder'
autoencoder_wrapper_folder = args.artifacts+'scaler_autoencoder_wrapper'
# Save StandardScaler Object
print("--------------> Save Object Scaler")
with open(scaler_pkl, "wb") as f:
pickle.dump(scalerObj, f)
# Save encoder layers
print("--------------> Save Encoder")
mlflow.keras.log_model(encoder, encoder_folder, input_example=input_transformed)
# Save Autoencoder model Only
print("--------------> Save AutoEncoder")
mlflow.keras.log_model(autoencoder, autoencoder_folder, input_example=input_transformed)
# Save StandardScaler + Autoencoder
print("--------------> Save ScalerAutoencoderWrapper")
scaler_autoencoder_wrapper = ScalerAutoencoderWrapper(
scaler=scalerObj,
autoencoder=autoencoder
)
mlflow.pyfunc.log_model(
artifact_path=autoencoder_wrapper_folder,
python_model=scaler_autoencoder_wrapper,
input_example=input_transformed,
signature=infer_signature(
model_input=input_transformed,
model_output=scaler_autoencoder_wrapper.predict(
context=None,
model_input=input_raw_example
)
),
)
and save the mlflow model in args.model_output
.
Next, you access model in sweep_step.outputs.model_output
and
artifacts in sweep_step.outputs.artifacts
.
The same thing is done in azure sample code which you have given.
mlflow.sklearn.save_model(model, "model")
from distutils.dir_util import copy_tree
# copy subdirectory example
from_directory = "model"
to_directory = args.model_output
copy_tree(from_directory, to_directory)
After saving the mlflow model in the folder model, all the files inside is copied to the output folder given args.model_output
.
But i have given directly the path to save instead of copy.