I am trying to implement an interceptor class for our temporal workflows written in Python. But since the documentation is not completely available, I have tried multiple ways to achieve it. But no success till now.
Below are some sample code for the temporal activities, workflows, interceptor class, worker and workflow executor.
activities.py
from temporalio import activity
@activity.defn
async def activity1():
print("Hi from activity 1")
@activity.defn
async def activity2():
print("Hi from activity 2")
workflow.py
from datetime import timedelta
from temporalio import workflow
with workflow.unsafe.imports_passed_through():
from mactivities import activity1, activity2
@workflow.defn
class MyWorkflow1:
@workflow.run
async def run(self):
output = await workflow.execute_activity(activity1, start_to_close_timeout=timedelta(5))
return True
@workflow.defn
class MyWorkflow2:
@workflow.run
async def run(self):
output = await workflow.execute_activity(activity2, start_to_close_timeout=timedelta(5))
return True
my_interceptor.py
from temporalio.worker import Interceptor
class MyInterceptor(Interceptor): # NEED HELP IN THIS CLASS
def __init__(self, next):
super().__init__(next)
async def intercept_activity(self, input):
print("I am printing before activity execution start")
return await.self.next.intercept_activity(input)
async workflow_interceptor_class(self, input):
print("Doing something before a workflow execution start")
return None
run_workflows.py
import asyncio
from temporalio.client import Client
from workflows import MyWorkflow1, MyWorkflow2
async def main():
client = await Client.connect("localhost:7233")
await client.execute_workflow(
MyWorkflow1.run,
id="123",
task_queue="MY_QUEUE"
)
await client.execute_workflow(
MyWorkflow2.run,
id="123",
task_queue="MY_QUEUE"
)
return True
if __name__=="__main__":
asyncio.run(main())
run_worker.py
import asyncio
from temporalio.client import Client
from temporalio.worker import Worker
from activities import activity1, activity2
from workflow import MyWorkflow1, MyWorkflow2
from my_interceptor import MyInterceptor # <--- NEED HELP IN IMPLEMENTING THIS CLASS
async def main():
client = await Client.connect("localhost:7233")
worker = Worker(
client,
task_queue="MY_QUEUE",
workflows=[MyWorkflow1, MyWorkflow2],
activities=[activity1, activity2],
interceptors=[MyInterceptor] # <--- NEED HELP IN IMPLEMENTING THIS CLASS
)
await worker.run()
if __name__=="__main__":
asyncio.run(main())
If I use the Interceptor class I wrote, I am getting exceptions. e.g. workflow_interceptor_class required a parameter "input", but while the worker initialized, it is not available. My activities are failing with a message "Completing activity as failed" etc.
Basically the way of implementation of the interceptor class in wrong.
What I want to achieve ?
First of all, you may want to have a look at the following for examples on how to implement Activity and Workflow interceptors in the Temporal Python SDK:
Based on code snippets you provided, I think you may have missed a very important detail: the implementation of Interceptor
that you provide to Worker
will not actually handle the interception of Workflow/Activity calls by itself.
Instead, the Interceptor
object you provide will be used to build a chain of concrete interceptors. The responsibility of your Interceptor
class are thus the following:
intercept_activity()
: Instantiate, chain and return an object that extends ActivityInboundInterceptor
; simply return next
if you do not want to intercept activities;workflow_interceptor_class()
: Return a class that extends WorkflowInboundInterceptor
; return None
if you do not want to intercept Workflows.Consequently, the complete implementation of an Interceptor will generally contain two or three classes, which could for example be named as so:
MyInterceptor
which extends Interceptor
MyActivityInboundInterceptor
which extends ActivityInboundInterceptor
MyWorkflowInboundInterceptor
which extends WorkflowInboundInterceptor
A simple implementation of MyInterceptor
would look something like this:
class MyInterceptor(Interceptor):
def intercept_activity(
self, next: ActivityInboundInterceptor
) -> ActivityInboundInterceptor:
return MyActivityInboundInterceptor(next)
def workflow_interceptor_class(
self, input: WorkflowInterceptorClassInput
) -> Optional[Type[WorkflowInboundInterceptor]]:
return MyWorkflowInboundInterceptor
Now, that being said, I think the rest of it should fall into place much more easily, but just add a comment if you need more details on something.
By the way, you may think that all this gymnastic is unintuitive, complex and useless, but let's not stop at this and just accept that there are strong reasons for this.