cadence-workflowtemporal-workflowuber-cadence

What is the best way/pattern to process a signal in Cadence/Temporal workflow


When using signal like the doc suggests:

public class MyWorkflow{
   public Output myWorkflwMethod(Input input){
      ...
   }

   public void mySignalMethod(request){
     // do actual processing here. 
     ...
   }
}

I may run into the below problems:

  1. I want to guarantee FIFO processing one at a time(within the same signal name or across all signal names)
  2. I want to handle "racing condition" of signalWithStart where signal method invoked too early
  3. I want to be safe to reset a workflow. After reset, the signals could be re-applied early in the history
  4. I want to make sure workflow won't get completed early before signal being processed

Solution

    1. Guarantee FIFO processing one at a time in order
    2. Handle "racing condition" of signalWithStart where signal method invoked too early. Or in reality, with regular signal without signalWithStart, signal could come too soon before workflow is ready to process.
    3. Safe to reset a workflow. After reset, the signals could be re-applied early in the history
    4. Make sure workflow won't get completed early before signal being processed
    5. For FIFO of all signal names to avoid racing conditions, you can use the same queue of Queue to store all signals, and use instance of with casting

    These four are the most common mistakes when using signal in Cadence/Temporal workflow.

    There is a design pattern that you can apply to address all the problem together.

    The idea is to simplify the signal handler to always put the signal into a queue, and workflow method will spin up another workflow thread to process the queue.

    It's based on the sample (Cadence& Temporal)

    Java

    public class MyWorkflow{
       private Queue<SignalRequest> signalRequestQueue = new LinkedList<>(); 
    
       public void mySignalMethod(SignalRequest req){
           signalRequestQueue.add(req);
       }
    
       public Output myWorkflwMethod(Input input){
          //1. do everything necessary/needed before actually processing a signal
          ...
    
          //2. spin up a workflow thread to process 
          Async.procedure(
          () -> {
              while (true) {
                  Workflow.await(() -> !signalRequestQueue.isEmpty());
                  final SignalRequest request = signalRequestQueue.poll();
                  processSignal(request);
              }
          });
    
    
          //3. always wait for queue to be empty before completing/failing/continueAsNew the workflow
          Workflow.await(() -> signalRequestQueue.isEmpty());
          return output
       }
    
       private void processSignal(request){
         // do your actual processing here. 
         // If a process a single signal may take too much time and you don't care about FIFO, you could also start another workflow thread to process signals in parallel.
         ...
       }
    }
    

    migrate existing code to this pattern

    You should use versioning to migrate.

    Assuming you have existing code like this;

    public class MyWorkflow{
       public Output myWorkflwMethod(Input input){
          ...
       }
    
       public void mySignalMethod(request){
         // do your actual processing here. 
         ...
       }
    }
    

    Then you should use versioning like below:

    public class MyWorkflow{
       private Queue<SignalRequest> signalRequestQueue = new LinkedList<>(); 
    
       public void mySignalMethod(SignalRequest req){
           int version = Workflow.getVersion("useSignalQueue", Workflow.DEFAULT_VERSION, 1);
           if( version == 1){
              signalRequestQueue.add(req);
           }else{
              processSignal(req);
           }
       }
    
       public Output myWorkflwMethod(Input input){
          //1. do everything necessary/needed before actually processing a signal
          ...
    
           int version = Workflow.getVersion("useSignalQueue", Workflow.DEFAULT_VERSION, 1);
           if( version == 1){
             //2. spin up a workflow thread to process 
             Async.procedure(
             () -> {
                 while (true) {
                     Workflow.await(() -> !signalRequestQueue.isEmpty());
                     final SignalRequest request = signalRequestQueue.poll();
                     processSignal(request);
                 }
             });
           }
    
          //3. always wait for queue to be empty before completing/failing/continueAsNeww the workflow
          Workflow.await(() -> signalRequestQueue.isEmpty());
          return output
       }
    
       private void processSignal(request){
         // do your actual processing here. 
         // If a process a single signal may take too much time and you don't care about FIFO, you could also start another workflow thread to process signals in parallel.
         ...
       }
    }
    

    Golang

    Golang SDK doesn't have the same issues of 1/2/3. This is because Golang SDK providing a completely different API to process signal.

    Instead of defining a signal method as handler, Golang SDK requires workflow listening to a channel to process signals, which is exactly what this answer is suggesting to do in Java. See example of how signal API. ( see Cadence / Temporal)

    But it has issue #4 -- workflow may get completed early before signal being processed. This is a common mistake with Golang SDK.

    The suggestion is to always drain signal channel before completing or continueAsNew the workflow. See this sample of how to drain signal channel in Golang.

    It’s similar to using Workflow.await in Java to wait for all signals are processed. But because the channel doesn’t have an api to get the size, we have to use “default” branch to check emptiness.

    Thanks @Maxim pointing out the API in Temporal go sdk -- Alternatively, use "HasPending" API in Temporal go-sdk to check if all signals are consumed.

    Also, it's recommended to monitor on "unhandledSignal" metric.