azure-functionsazure-servicebus-topicscold-start

Azure Function Cold Start Never Happens


Update: Solution was found, the ServiceBusConnString was referring to the url of the service bus (testservicebus.servicebus.windows.net) which worked until the function spun down. Switching to the 'Primary Connection String' of the service bus did the trick, and worked for 4+ cold start attempts.

I have multiple functions using the serverless consumption model that will not spin up to process new messages from a topic on a service bus unless that function has been visited in Azure portal, or the logs of that function have been connected to recently.

If I wait about half an hour after the last message was processed, the next message will never be processed until I visit the portal or connect to logs.

I would appreciate any details on why this is the case, and how to resolve it.

My functions are running on:

Host.json

{
    "version": "2.0",
    "extensionBundle": {
        "id": "Microsoft.Azure.Functions.ExtensionBundle",
        "version": "[4.*, 5.0.0)"
    }
}

index.ts

import { app, InvocationContext } from '@azure/functions';
import { handleActionEvent } from './src/actionEventHandler';

process.env.AUTH0_CLIENT_ID = process.env.AUTH_AUTH0_CLIENT_ID;
process.env.AUTH0_CLIENT_SECRET = process.env.AUTH_AUTH0_CLIENT_SECRET;

export async function serviceBusTopicTrigger(
    sbMsg: unknown,
    context: InvocationContext
): Promise<void> {
    context.log('MessageId =', context.triggerMetadata.messageId);
    try {
        await handleActionEvent(sbMsg);
    } catch (err) {
        context.log(err);
    }
}

app.serviceBusTopic('serviceBusTopicTrigger', {
    topicName: '%TOPIC_NAME%',
    subscriptionName: '%SUBSCRIPTION_NAME%',
    connection: 'ServiceBusConnString',
    handler: serviceBusTopicTrigger
});

actionEventHandler.ts (I don't think the code here would have an effect)

export const handleActionEvent = async (
    eventBody: any
): Promise<void> => {
    try {
        let policyId: number | null;
        const event = eventBody.event;
        switch (event) {
            case POLICY_ACTION_EVENT.PolicyChanged:
                if (eventBody.action === POLICY_ACTION.deleted) {
                    policyId = null;
                } else {
                    policyId = eventBody.record.id;
                }
                break;
            case POLICY_ACTION_EVENT.PolicyTicketChanged:
                policyId = eventBody.record.policy_id;
                break;
            // Whenever a ticket rule changes, the policy ticket events will fire,
            // and we handle those.
            case POLICY_ACTION_EVENT.PolicyTicketRuleChanged:
                return;
            case POLICY_ACTION_EVENT.PolicyRiskCalculationTriggered:
                await calculatePolicyRisk(eventBody);
                return;
            default:
                logger.log(
                    LOG_LEVELS.DEBUG,
                    `Debug: Unhandled event ${event} was encountered`
                );
                logger.log(LOG_LEVELS.DEBUG, eventBody.record);
                return;
        }
        const orgId = eventBody.record?.organization_id ?? eventBody.orgId;

        logger.log(
            LOG_LEVELS.DEBUG,
            `Debug: the event is: ${event} the policy id is ${policyId} the project array is ${eventBody?.projectIds}`
        );

        if (eventBody?.projectIds.length) {
            for (const projId of eventBody.projectIds) {
                await scheduleRiskCalculation(
                    orgId,
                    policyId,
                    eventBody.timeOfAction,
                    projId
                );
            }
        } else {
            await scheduleRiskCalculation(
                orgId,
                policyId,
                eventBody.timeOfAction,
                getProjectId(eventBody)
            );
        }
    } catch (err) {
        logCatchError('handlePolicyActionEvent', err.message);
        logger.log(LOG_LEVELS.DEBUG, 'Debug: Event body with issue is:');
        logger.log(LOG_LEVELS.DEBUG, eventBody);
    }
};

/** Send request to update scheduled policy risk calc.
 * If one is already scheduled it will just ignore and return 200.
 */
async function scheduleRiskCalculation(
    orgId,
    policyId,
    timeOfAction,
    projectId = null
) {
    const accessHeaders = {
        Authorization: `Bearer ${await getAccessToken()}`,
        ['Content-Type']: 'application/json'
    };
    const result = await httpRequest2(
        HTTP_VERBS.POST,
        `${process.env.RESOURCE_HOST}:${process.env.RESOURCE_PORT}/api/v3/schedule-process`,
        { orgId },
        accessHeaders,
        undefined,
        {
            type: SCHEDULED_PROCESS_TYPES.POLICY_RISK_CALCULATION,
            scheduled_date: timeOfAction,
            context: {
                project_id: projectId,
                policy_id: policyId
            }
        }
    );
    if (result.status === 'rejected') {
        throw new Error(
            'Failed to successfully schedule policy risk calculation'
        );
    }
}

function getProjectId(eventBody) {
    if (eventBody?.record?.project_id) {
        return eventBody?.record?.project_id;
    }
    if (eventBody?.projectId) {
        return eventBody.projectId;
    }
    return null;
}

async function calculatePolicyRisk(calcBody) {
    const policyId = calcBody.policyId;
    const projectId = getProjectId(calcBody);
    const orgId = calcBody.orgId;
    try {
        // Post policy risk
        const policyRiskTrendBody = {
            policy_id: policyId,
            project_id: null
        };
        if (projectId) {
            policyRiskTrendBody.project_id = projectId;
        }

        const { risk } = await httpRequest2(
            HTTP_VERBS.POST,
            `${process.env.RESOURCE_HOST}:${process.env.RESOURCE_PORT}/api/v3/policies/risk-trends?orgId=${orgId}`,
            undefined,
            undefined,
            undefined,
            policyRiskTrendBody
        );
        logger.log(
            LOG_LEVELS.INFO,
            `The policy risk for id ${policyId} and project id ${projectId} is ${risk}`
        );
    } catch (err) {
        logCatchError('calculatePolicyRisk', err.message);
    }
}

I tried upgrading the function from Node 16 to 18, and from Azure Function version 3 to 4.

Locally I see no errors or crashes happen with the functions.


Solution

  • I am using the default typescript Node 18 Service Bus topic triggered function and has deployed on consumption plan.

    import { app, InvocationContext } from "@azure/functions";
    
    export async function serviceBusTopicTrigger1(message: unknown, context: InvocationContext): Promise<void> {
        context.log('Service bus topic function processed message:', message);
    }
    
    app.serviceBusTopic('serviceBusTopicTrigger1', {
        connection: 'afreenSb_SERVICEBUS',
        topicName: 'mytopic',
        subscriptionName: 'mysubscription',
        handler: serviceBusTopicTrigger1
    });
    

    I am using manage rights connection string.

    local settings-

    {
      "IsEncrypted": false,
      "Values": {
        "AzureWebJobsStorage": "UseDevelopmentStorage=true",
        "FUNCTIONS_WORKER_RUNTIME": "node",
        "afreenSb_SERVICEBUS": "Endpoint=sb://afreensb.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=7Aoiytco*****SbMHfxhI="
      }
    }
    

    enter image description here enter image description here

    If your function app is idle for around 20min then the job host gets terminated. In order to avoid this, either you can add a timer trigger function in your function app to get it warmed up or consider upgrading to App Service Plan which comes with an Always on option to keep the function app alive.

    You can also refer to this github issue which says the same.