When using signal like the doc suggests:
public class MyWorkflow{
public Output myWorkflwMethod(Input input){
...
}
public void mySignalMethod(request){
// do actual processing here.
...
}
}
I may run into the below problems:
instance of
with castingThese four are the most common mistakes when using signal in Cadence/Temporal workflow.
There is a design pattern that you can apply to address all the problem together.
The idea is to simplify the signal handler to always put the signal into a queue, and workflow method will spin up another workflow thread to process the queue.
It's based on the sample (Cadence& Temporal)
public class MyWorkflow{
private Queue<SignalRequest> signalRequestQueue = new LinkedList<>();
public void mySignalMethod(SignalRequest req){
signalRequestQueue.add(req);
}
public Output myWorkflwMethod(Input input){
//1. do everything necessary/needed before actually processing a signal
...
//2. spin up a workflow thread to process
Async.procedure(
() -> {
while (true) {
Workflow.await(() -> !signalRequestQueue.isEmpty());
final SignalRequest request = signalRequestQueue.poll();
processSignal(request);
}
});
//3. always wait for queue to be empty before completing/failing/continueAsNew the workflow
Workflow.await(() -> signalRequestQueue.isEmpty());
return output
}
private void processSignal(request){
// do your actual processing here.
// If a process a single signal may take too much time and you don't care about FIFO, you could also start another workflow thread to process signals in parallel.
...
}
}
You should use versioning to migrate.
Assuming you have existing code like this;
public class MyWorkflow{
public Output myWorkflwMethod(Input input){
...
}
public void mySignalMethod(request){
// do your actual processing here.
...
}
}
Then you should use versioning like below:
public class MyWorkflow{
private Queue<SignalRequest> signalRequestQueue = new LinkedList<>();
public void mySignalMethod(SignalRequest req){
int version = Workflow.getVersion("useSignalQueue", Workflow.DEFAULT_VERSION, 1);
if( version == 1){
signalRequestQueue.add(req);
}else{
processSignal(req);
}
}
public Output myWorkflwMethod(Input input){
//1. do everything necessary/needed before actually processing a signal
...
int version = Workflow.getVersion("useSignalQueue", Workflow.DEFAULT_VERSION, 1);
if( version == 1){
//2. spin up a workflow thread to process
Async.procedure(
() -> {
while (true) {
Workflow.await(() -> !signalRequestQueue.isEmpty());
final SignalRequest request = signalRequestQueue.poll();
processSignal(request);
}
});
}
//3. always wait for queue to be empty before completing/failing/continueAsNeww the workflow
Workflow.await(() -> signalRequestQueue.isEmpty());
return output
}
private void processSignal(request){
// do your actual processing here.
// If a process a single signal may take too much time and you don't care about FIFO, you could also start another workflow thread to process signals in parallel.
...
}
}
Golang SDK doesn't have the same issues of 1/2/3. This is because Golang SDK providing a completely different API to process signal.
Instead of defining a signal method as handler, Golang SDK requires workflow listening to a channel to process signals, which is exactly what this answer is suggesting to do in Java. See example of how signal API. ( see Cadence / Temporal)
But it has issue #4 -- workflow may get completed early before signal being processed. This is a common mistake with Golang SDK.
The suggestion is to always drain signal channel before completing or continueAsNew the workflow. See this sample of how to drain signal channel in Golang.
It’s similar to using Workflow.await in Java to wait for all signals are processed. But because the channel doesn’t have an api to get the size, we have to use “default” branch to check emptiness.
Thanks @Maxim pointing out the API in Temporal go sdk -- Alternatively, use "HasPending" API in Temporal go-sdk to check if all signals are consumed.
Also, it's recommended to monitor on "unhandledSignal" metric.