I have successfully managed to setup an Azure Durable function and I can execute it and see the http trigger activates, evidenced by the log files. I can also see the orchestrator_trigger being called.
I have setup the configuration settings to enable the v2 programming model:
AzureWebJobsFeatureFlags=EnableWorkerIndexing
As per the instructions here Here's the code
import json
import logging
from typing import Any
import azure.functions as func
import azure.durable_functions as df
myapp = df.Blueprint(http_auth_level=func.AuthLevel.ANONYMOUS)
@myapp.route(route="synthetic/solar/snapshot")
@myapp.durable_client_input(client_name="client")
async def http_start(req: func.HttpRequest, client):
function_name = 'my_orchestrator_function'
instance_id = await client.start_new(function_name)
response = client.create_check_status_response(req, instance_id)
return response
@myapp.orchestration_trigger(context_name="context")
def my_orchestrator_function(context):
test_act_data = {"testdata": "test"}
test_serialised_act_data = json.dumps(test_act_data)
logging.info(f"Activity result: {test_serialised_act_data}")
test_result = yield context.call_activity("hello_activity", test_serialised_act_data)
return test_result
@myapp.activity_trigger(input_name="test_json")
def hello_activity(test_json: str):
json_activity_data = json.loads(test_json)
data = json_activity_data["testdata"]
logging.info("Hello Test, Executing Activity Trigger")
return data
My host.json:
{
"version": "2.0",
"logging": {
"applicationInsights": {
"samplingSettings": {
"isEnabled": true
}
}
},
"extensionBundle": {
"id": "Microsoft.Azure.Functions.ExtensionBundle",
"version": "[4.*, 5.0.0)"
}
}
And function_app.py:
import azure.functions as func
app = func.FunctionApp()
app.register_functions(myapp)
I've tried improving the logging. I've tried to step through the stages and refer to lots of examples, but there's really no hint as to why the activity_trigger isn't being triggered.
Azure Durable Function using Python Programming Model V2 not calling activity_trigger
To call durable functions activity trigger use below code:
function_app.py:
import azure.functions as func
import azure.durable_functions as df
import json
import logging
myApp = df.DFApp(http_auth_level=func.AuthLevel.ANONYMOUS)
@myApp.route(route="orchestrators/{functionName}")
@myApp.durable_client_input(client_name="client")
async def http_start(req: func.HttpRequest, client):
function_name = req.route_params.get('functionName')
instance_id = await client.start_new(function_name)
response = client.create_check_status_response(req, instance_id)
return response
@myApp.orchestration_trigger(context_name="context")
def rith_orchestrator(context):
rith_act_data = {"testdata": "Rithwik"}
rith_serialised_act_data = json.dumps(rith_act_data)
logging.info(f"Activity result: {rith_serialised_act_data}")
rith_result = yield context.call_activity("hello_rithwik", rith_serialised_act_data)
return rith_result
@myApp.activity_trigger(input_name="rithjson")
def hello_rithwik(rithjson: str):
json_activity_data = json.loads(rithjson)
data = json_activity_data["testdata"]
logging.info("Helo Rithwik, Executing Activity Trigger")
return data
requirements.txt
azure-functions
azure-functions-durable
host.json
{
"version": "2.0",
"logging": {
"applicationInsights": {
"samplingSettings": {
"isEnabled": true,
"excludedTypes": "Request"
}
}
},
"extensionBundle": {
"id": "Microsoft.Azure.Functions.ExtensionBundle",
"version": "[4.*, 5.0.0)"
}
}
local.settings.json:
{
"IsEncrypted": false,
"Values": {
"AzureWebJobsStorage": "DefaultEndpointsProtocol=https;AccountName=rithwik;AccountKey=pPBW9jWu77Sqp5F+NLWkA==;EndpointSuffix=core.windows.net",
"FUNCTIONS_WORKER_RUNTIME": "python",
"AzureWebJobsFeatureFlags": "EnableWorkerIndexing"
}
}
Call a Function:
In URL replace functionName with Orchestrator Name as below:
http://localhost:7071/api/orchestrators/rith_orchestrator
Output:
Now use the first URL statusQueryGetUri
, You will get serialized data: