javaactiviti

Activiti parallel Service Tasks


I am trying to implement two service tasks in Activiti that are supposed to run in parallel. The code written below works fine randomly (and interestingly).

What I mean by that is that it occasionally prints only "first" (or "second") OR it prints two "first" one "second" etc.

QUESTION: How can I make these services run in parallel constantly; regardless of the number of services currently running?

PS: When I DELETED activiti:async="true" from the process definition, it only printed "first" or "second". I guess I need that :)

Process Definition

<?xml version='1.0' encoding='UTF-8'?>
<definitions xmlns="http://www.omg.org/spec/BPMN/20100524/MODEL"
xmlns:activiti="http://activiti.org/bpmn" targetNamespace="Examples">

    <process id='testparallelact' name="Developer Hiring" isExecutable="true"
             activiti:exclusive="false" activiti:async="true">

        <startEvent id="theStart" />
        <sequenceFlow id="flow1" sourceRef="theStart" targetRef="fork" />

        <parallelGateway id="fork"  activiti:async="true" />
        <sequenceFlow sourceRef="fork" targetRef="receivePayment" />
        <sequenceFlow sourceRef="fork" targetRef="shipOrder" />


        <serviceTask id="receivePayment" name="Receive Payment"
                     activiti:async="true" activiti:exclusive="false"
                     activiti:expression="${serviceConnections.runThis2('First')}"/>

        <sequenceFlow sourceRef="receivePayment" targetRef="join" />


        <serviceTask id="shipOrder" name="Ship Order"
                     activiti:async="true" activiti:exclusive="false"
                     activiti:expression="${serviceConnections.runThis2('Second')}"/>

        <sequenceFlow sourceRef="shipOrder" targetRef="join" />

        <parallelGateway id="join" />
        <sequenceFlow sourceRef="join" targetRef="theEnd" />
        <endEvent id="theEnd" />
    </process>
</definitions> 

Graphical rendering of process definition

Graphical rendering of process definition

Code for "runThis2"

public void runThis2(String test1) throws InterruptedException {            
    while(true)
    {
        Thread.sleep(1000);
        System.out.println(test1);              
    }           
}

Solution

  • Combination of "async" and "exclusive" flags matters

    It's important to understand how jobs are executed inside the Activiti engine. The following forum thread does a pretty good job describing it:

    https://community.alfresco.com/thread/221453-multiinstance-wont-run-task-in-parallel

    A key excerpt from one of the original Activiti architects, Tijs Rademakers, on 2013-10-25:

    The parallel gateway and multiinstance constructs are able to run multiple user tasks in parallel for example. But for service and script tasks they are basically executed serial still. Async can change this behavior if you also set exclusive to false (the default is true). Then the job executor will just execute all jobs available and not serially. So give it a try to set async to true and exclusive to false.

    Now, by setting activiti:async="true" and activiti:exclusive="false", what you have effectively done is create a "wait state" in the process by allocating the service tasks (usually processed serially) to the Job Executor.

    But:

    is now completely controlled by the Job Executor configuration. (Size of thread pool, timeouts, number of concurrent jobs, size of job block are all configurable.)

    Now, this isn't exactly what you expect, meaning, it depends on the size of your job queue, how many jobs are taken in a single sweep and the duration of each job as to when the service tasks will be executed. Meaning, they MIGHT be executed in parallel and they MIGHT be executed serially. Equally, you can't control their order since once again it is on the Job Executor to determine what it does and when.

    Ok, so assuming this meets your requirement...

    Optimistic Locking concept

    ...there is still one more issue you may experience (in fact, it is the reason the activiti:exclusive flag was introduced in the first place). On completion of the service tasks, the execution context will be committed to the process instance record in the database as well as to the history records. Activiti uses "Optimistic Locking" for records for performance purposes.

    Now if your process branches complete relatively close to each other in time, then it is possible (actually highly likely) you WILL receive an Optimistic Locking Exception on the DB update, like this:

    09:59:52,432 [flowable-async-job-executor-thread-2] ERROR org.flowable.job.service.impl.asyncexecutor.DefaultAsyncRunnableExecutionExceptionHandler - Job 12575 failed org.flowable.engine.common.api.FlowableOptimisticLockingException: ProcessInstance[12567] was updated by another transaction concurrently

    (Note: The error above is not actually from Activiti but instead from a project called "Flowable". However they both had essentially the same codebase as Activiti 6 at the time this question was originally asked. (November 2017).)

    This will cause the service task to be tagged as FAILED and it will be re-attempted. This can be problematic if you are making external calls to SOR (System of Record) or other legacy systems. (Consider what would happen if your flight was actually successfully booked, but the call to reserve it is made a second time because it was perceived to have failed.)

    All good fun stuff and all things that can be resolved with good design and use of best practices.

    Hope this helps you understand what is going on.

    Greg@BP3

    Further reading

    The Alfresco forums post contains several dead links. Below are live links.

    Dead Issue Tracker links

    Additional note: Activiti currently (2019) uses neither of those two (codehaus.org or atlassian.net) trackers anymore. Instead they use this GitHub tracker: https://github.com/Activiti/Activiti/issues

    Dead FAQ link

    Activiti flags

    Camunda Manual