pythonvisual-studio-codeazure-functionspickleazure-durable-functions

Transfer format for Durable functions


I am trying to implement data analysis by function using Durable Functions.

When implementing by function, I need to transfer data frames, etc., so I am trying to exchange data by making it pickle. However, when I implement the code as shown below, I get an error like the one in the picture and Vscode gets stuck.

What is the cause and how can I fix it?

Below is a part of the code I want to implement.

import azure.functions as func
import azure.durable_functions as df
import pandas as pd
from sklearn.linear_model import LinearRegression
from sklearn.datasets import fetch_california_housing  # Dataset
from sklearn.model_selection import train_test_split 
from sklearn.linear_model import Lasso  
from sklearn.linear_model import Ridge 
from sklearn.metrics import mean_squared_error  # MSE(Mean Squared Error)
from sklearn.preprocessing import StandardScaler 

app = df.DFApp(http_auth_level=func.AuthLevel.ANONYMOUS)
### client function ###
@app.route(route="orchestrators/client_function")
@app.durable_client_input(client_name="client")
async def client_function(req: func.HttpRequest, client: df.DurableOrchestrationClient) -> func.HttpResponse:
    instance_id = await client.start_new("orchestrator", None, {})
    await client.wait_for_completion_or_create_check_status_response(req, instance_id)
    return client.create_check_status_response(req, instance_id)

### orchestrator function ###
@app.orchestration_trigger(context_name="context")
def orchestrator(context: df.DurableOrchestrationContext) -> str:
    data = yield context.call_activity("prepare_data", '')
    simple = yield context.call_activity("simple_regression", {"data": data})
    multiple = yield context.call_activity("multiple_regression", {"data": data})
    return "finished"

### activity function ###
@app.activity_trigger(input_name="blank")
def prepare_data(blank: str):
    # prepare data
    california_housing = fetch_california_housing()
    exp_data = pd.DataFrame(california_housing.data, columns=california_housing.feature_names) # 説明変数
    tar_data = pd.DataFrame(california_housing.target, columns=['HousingPrices']) # 目的変数
    data = pd.concat([exp_data, tar_data], axis=1) # データを結合

    # Delete anomalous values
    data = data[data['HouseAge'] != 52]
    data = data[data['HousingPrices'] != 5.00001]

    # Create useful variables
    data['Household'] = data['Population']/data['AveOccup']
    data['AllRooms'] = data['AveRooms']*data['Household']
    data['AllBedrms'] = data['AveBedrms']*data['Household']

    data = pickle.dumps(data)
    return data

### simple regression analysis ###
@app.activity_trigger(input_name="arg")
def simple_regression(arg: dict):
    data = pickle.loads(arg['data'])
    exp_var = 'MedInc'
    tar_var = 'HousingPrices'

    # Remove outliers
    q_95 = data[exp_var].quantile(0.95)
    data = data[data[exp_var] < q_95]

    # Split data into explanatory and objective variables
    X = data[[exp_var]]
    y = data[[tar_var]]

    # learn
    model = LinearRegression()
    model.fit(X, y)

    model = pickle.dumps(model)
    return model

### multiple regression analysis ###
@app.activity_trigger(input_name="arg")
def multiple_regression(arg: dict):
    data = pickle.loads(arg['data'])
    exp_vars = ['MedInc', 'HouseAge', 'AveRooms', 'AveBedrms', 'Population', 'AveOccup', 'Latitude', 'Longitude']
    tar_var = 'HousingPrices'

    # Remove outliers
    for exp_var in exp_vars:
        q_95 = data[exp_var].quantile(0.95)
        data = data[data[exp_var] < q_95]

    # Split data into explanatory and objective variables
    X = data[exp_vars]
    y = data[[tar_var]]

    # Split into training and test data
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=0)

    #  Standardize X_train
    scaler = StandardScaler()
    scaler.fit(X_train)
    X_train_scaled = scaler.transform(X_train)
    X_train_scaled = pd.DataFrame(X_train_scaled, columns = exp_vars)

    # learn
    model = LinearRegression()
    model.fit(X_train_scaled, y_train)

    model = pickle.dumps(model)
    X_train_scaled = pickle.dumps(X_train_scaled)
    y_train = pickle.dumps(y_train)
    X_test = pickle.dumps(X_test)
    y_test = pickle.dumps(y_test)
    scaler = pickle.dumps(scaler)
    return model, X_train_scaled, y_train, X_test, y_test, scaler

enter image description here


Solution

  • In order to resolve your error, Convert your data to dictionary dict in the prepared_data and then run one sample simple_regression and multiple_regression in your Durable Function code, I have made some changes in your function_app.py code like below:-

    My function_app.py:-

    import pickle
    import azure.functions as func
    import azure.durable_functions as df
    import pandas as pd
    from sklearn.linear_model import LinearRegression
    from sklearn.datasets import fetch_california_housing  # Dataset
    from sklearn.model_selection import train_test_split 
    from sklearn.linear_model import Lasso  
    from sklearn.linear_model import Ridge 
    from sklearn.metrics import mean_squared_error  # MSE(Mean Squared Error)
    from sklearn.preprocessing import StandardScaler 
    
    app = df.DFApp(http_auth_level=func.AuthLevel.ANONYMOUS)
    ### client function ###
    @app.route(route="orchestrators/client_function")
    @app.durable_client_input(client_name="client")
    async def client_function(req: func.HttpRequest, client: df.DurableOrchestrationClient) -> func.HttpResponse:
        instance_id = await client.start_new("orchestrator", None, {})
        await client.wait_for_completion_or_create_check_status_response(req, instance_id)
        return client.create_check_status_response(req, instance_id)
    
    ### orchestrator function ###
    @app.orchestration_trigger(context_name="context")
    def orchestrator(context: df.DurableOrchestrationContext) -> str:
        data = yield context.call_activity("prepare_data", '')
        simple = yield context.call_activity("simple_regression", {"data": data})
        multiple = yield context.call_activity("multiple_regression", {"data": data})
        return "finished"
    
    ### activity function ###
    @app.activity_trigger(input_name="blank")
    def prepare_data(blank: str):
        # prepare data
        california_housing = fetch_california_housing()
        exp_data = pd.DataFrame(california_housing.data, columns=california_housing.feature_names)
        print(exp_data.columns.tolist) # explanatory variables
        tar_data = pd.DataFrame(california_housing.target, columns=['HousingPrices']) # target variable
        data = pd.concat([exp_data, tar_data], axis=1) # merge data
        print("Column Names:")
        print(data.columns.tolist()) 
        # Delete anomalous values
        data = data[data['HouseAge'] != 52]
        data = data[data['HousingPrices'] != 5.00001]
    
        # Create useful variables
        data['Household'] = data['Population']/data['AveOccup']
        data['AllRooms'] = data['AveRooms']*data['Household']
        data['AllBedrms'] = data['AveBedrms']*data['Household']
    
        # Ensure 'MedInc' column doesn't contain null or missing values
        data = data.dropna(subset=['MedInc'])
    
        # Create a dictionary to store multiple data items
        prepared_data = {
            'data': data.to_dict(),
            'columns': data.columns.tolist(),
            'target_column': 'HousingPrices',
            'MedInc': data['MedInc'].tolist()  # Add 'MedInc' column to the dictionary as a list
        }
        return prepared_data
    
    @app.activity_trigger(input_name="arg")
    def simple_regression(arg: dict):
        try:
            # Convert dictionary back to a DataFrame
            data = pd.DataFrame.from_dict(arg['data'])
    
            # Handling missing or NaN values
            data.dropna(inplace=True)
    
            # Selecting the explanatory variable 'MedInc' and target 'HousingPrices'
            X_simple = data[['MedInc']]
            y = data[arg['target_column']]
    
            # Check lengths of X_simple and y
            if len(X_simple) != len(y):
                return "Lengths of X_simple and y do not match"
    
            # Initialize and fit the linear regression model
            simple_model = LinearRegression()
            simple_model.fit(X_simple, y)
    
            return {
                'model': simple_model,
                'status': 'success'
            }
        except Exception as e:
            return f"Error: {str(e)}"
    
    ### multiple regression analysis ###
    @app.activity_trigger(input_name="arg")
    def multiple_regression(arg: dict):
        try:
            # Convert dictionary back to a DataFrame
            data = pd.DataFrame.from_dict(arg['data'])
    
            # Handling missing or NaN values
            data.dropna(inplace=True)
    
            # Selecting multiple explanatory variables and target 'HousingPrices'
            X_multiple = data.drop(columns=[arg['target_column']])  # Drop the target column
            y = data[arg['target_column']]
    
            # Check lengths of X_multiple and y
            if len(X_multiple) != len(y):
                return "Lengths of X_multiple and y do not match"
    
            # Initialize and fit the multiple regression model
            multiple_model = LinearRegression()
            multiple_model.fit(X_multiple, y)
    
            return {
                'model': multiple_model,
                'status': 'success'
            }
        except Exception as e:
            return f"Error: {str(e)}"
    

    Output:-

    enter image description here

    All Orchestration and Activity Function ran successfully, Refer below:-

    enter image description here