pythonpython-3.xtemporal-workflow

How to write a temporal worker Interceptor class in Temporal Python-SDK


I am trying to implement an interceptor class for our temporal workflows written in Python. But since the documentation is not completely available, I have tried multiple ways to achieve it. But no success till now.

Below are some sample code for the temporal activities, workflows, interceptor class, worker and workflow executor.

activities.py

from temporalio import activity


@activity.defn
async def activity1():
    print("Hi from activity 1")

@activity.defn
async def activity2():
    print("Hi from activity 2")

workflow.py

from datetime import timedelta
from temporalio import workflow


with workflow.unsafe.imports_passed_through():
    from mactivities import activity1, activity2


@workflow.defn
class MyWorkflow1:

    @workflow.run
    async def run(self):
        output = await workflow.execute_activity(activity1, start_to_close_timeout=timedelta(5))
        return True



@workflow.defn
class MyWorkflow2:

    @workflow.run
    async def run(self):
        output = await workflow.execute_activity(activity2, start_to_close_timeout=timedelta(5))
        return True


my_interceptor.py

from temporalio.worker import Interceptor


class MyInterceptor(Interceptor):  # NEED HELP IN THIS CLASS
    def __init__(self, next):
        super().__init__(next)

    async def intercept_activity(self, input):
        print("I am printing before activity execution start")
        return await.self.next.intercept_activity(input)

    async workflow_interceptor_class(self, input):
        print("Doing something before a workflow execution start")
        return None

run_workflows.py

import asyncio
from temporalio.client import Client
from workflows import MyWorkflow1, MyWorkflow2

async def main():
    client = await Client.connect("localhost:7233")

    await client.execute_workflow(
                  MyWorkflow1.run,
                  id="123",
                  task_queue="MY_QUEUE"
                   )

    await client.execute_workflow(
                  MyWorkflow2.run,
                  id="123",
                  task_queue="MY_QUEUE"
                   )
    return True


if __name__=="__main__":
    asyncio.run(main())

run_worker.py

import asyncio
from temporalio.client import Client
from temporalio.worker import Worker

from activities import activity1, activity2
from workflow import MyWorkflow1, MyWorkflow2
from my_interceptor import MyInterceptor  # <--- NEED HELP IN IMPLEMENTING THIS CLASS

async def main():
    client = await Client.connect("localhost:7233")
    worker = Worker(
             client,
             task_queue="MY_QUEUE",
             workflows=[MyWorkflow1, MyWorkflow2],
             activities=[activity1, activity2],
             interceptors=[MyInterceptor]  # <--- NEED HELP IN IMPLEMENTING THIS CLASS
              )
     await worker.run()


if __name__=="__main__":
    asyncio.run(main())


If I use the Interceptor class I wrote, I am getting exceptions. e.g. workflow_interceptor_class required a parameter "input", but while the worker initialized, it is not available. My activities are failing with a message "Completing activity as failed" etc.

Basically the way of implementation of the interceptor class in wrong.

What I want to achieve ?

  1. Print the name of the workflow before starting the execution of that workflow.
  2. I will keep some informations (arguments passed while executing) in a variable for future use.

Solution

  • First of all, you may want to have a look at the following for examples on how to implement Activity and Workflow interceptors in the Temporal Python SDK:

    Based on code snippets you provided, I think you may have missed a very important detail: the implementation of Interceptor that you provide to Worker will not actually handle the interception of Workflow/Activity calls by itself.

    Instead, the Interceptor object you provide will be used to build a chain of concrete interceptors. The responsibility of your Interceptor class are thus the following:

    Consequently, the complete implementation of an Interceptor will generally contain two or three classes, which could for example be named as so:

    A simple implementation of MyInterceptor would look something like this:

    class MyInterceptor(Interceptor):
        def intercept_activity(
            self, next: ActivityInboundInterceptor
        ) -> ActivityInboundInterceptor:
            return MyActivityInboundInterceptor(next)
    
        def workflow_interceptor_class(
            self, input: WorkflowInterceptorClassInput
        ) -> Optional[Type[WorkflowInboundInterceptor]]:
            return MyWorkflowInboundInterceptor
    

    Now, that being said, I think the rest of it should fall into place much more easily, but just add a comment if you need more details on something.

    By the way, you may think that all this gymnastic is unintuitive, complex and useless, but let's not stop at this and just accept that there are strong reasons for this.