And after a little bit of research I found multiple resources online that discussed this issues already. But I’m a little bit confused why asynchronous continuation is needed for concurrent execution.
My current understanding is that with an asychronous continuation AFTER the task, the engine commits the user tasks to the database which is being watched by the job executor to create the transaction boundary. The job exeuctor then pipes those tasks to available worker threads. The async continuation article here basically only talks about setting up new transaction boundaries to which can be rolled back on exception
Now the exclusive flag is quite simple in that the job executor checks that it it acquires all tasks flagged as exclusive at the same time to avoid the optimistic locking excpetion on the join of fork join. How does the async continuation relate to with its transaction boundaries?
In my process definition above I’ve flagged the gateway as non exclusive async after and the two service tasks as non exclusive async before, but my external workers still only acquire one task after the other, not concurrenlty.
Okay, so after marking both tasks as async before and after it seems to work but on completion I’m getting this exception:
08-Feb-2020 15:16:38.956 WARNING [http-nio-8080-exec-5] org.camunda.bpm.engine.rest.exception.RestExceptionHandler.toResponse org.camunda.bpm.engine.rest.exception.RestException: External Task 9cf94255-4a7d-11ea-a5cd-00ff29e821a6 cannot be completed by worker ‘268735e5-99c5-45ac-bfdc-6f940f9c1c8b’. It is locked by worker ‘b4fb320b-bca2-4249-bd0c-891939ae1430’.
at org.camunda.bpm.engine.rest.sub.externaltask.impl.ExternalTaskResourceImpl.complete(ExternalTaskResourceImpl.java:122)
During runtime I’m logging this output to std out:
ServiceTask_18zdwfi is executing from 1: 356
ServiceTask_185coqn is executing from 2: 5
ServiceTask_18zdwfi is executing from 1: 357
Which basically means that in the beginning the first task is correctly fetched by worker 1 and then worker 2 chimes in with the second task. However after worker 1 completes its execution it locks the second task too.
ServiceTask_185coqn is executing from 2: 35
ServiceTask_185coqn is executing from 1: 0
ServiceTask_185coqn is executing from 1: 1
ServiceTask_185coqn is executing from 1: 2
ServiceTask_185coqn is executing from 1: 3
ServiceTask_185coqn is executing from 1: 4
As you correctly found out, to achieve parallel execution you need to set the asynchronous before flags for the parallel task. To avoid a concurrency exception you also want to set this flag on the synchronizing gateway following the parallel activities.
You do not have to work with an external task pattern to achieve parallel execution. However, if your parallel tasks take long (maybe several seconds or more). then this is advisable as the parallel threads would be blocked for the duration of the execution. Hence you might run out of threads, reach transaction timeouts, etc.
Here is an example with parallel execution of POJOs: ParallelABC.bpmn
package org.camunda.example.service;
import org.camunda.bpm.engine.delegate.DelegateExecution;
import org.camunda.bpm.engine.delegate.JavaDelegate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class LoggerDelegate implements JavaDelegate {
final Logger log = LoggerFactory.getLogger(LoggerDelegate.class);
public void execute(DelegateExecution execution) throws Exception {
log.info("\n\n ... LoggerDelegate invoked by "
+ "processDefinitionId=" + execution.getProcessDefinitionId()
+ ", activityId=" + execution.getCurrentActivityId()
+ " \n\n");
Thread.sleep(3000);
}
}
Example output shows parallel start times and process duration <4 sec instead of >9 sec:
2020-02-10 14:55:06.538 INFO 7748 — [aTaskExecutor-3] o.c.example.service.LoggerDelegate :
… LoggerDelegate invoked by processDefinitionId=ParallelABC:1:70629c3d-4bd1-11ea-92a9-9c899b574756, activityId=Task_B
2020-02-10 14:55:06.538 INFO 7748 — [aTaskExecutor-2] o.c.example.service.LoggerDelegate :
… LoggerDelegate invoked by processDefinitionId=ParallelABC:1:70629c3d-4bd1-11ea-92a9-9c899b574756, activityId=Task_A
2020-02-10 14:55:06.538 INFO 7748 — [aTaskExecutor-1] o.c.example.service.LoggerDelegate :
… LoggerDelegate invoked by processDefinitionId=ParallelABC:1:70629c3d-4bd1-11ea-92a9-9c899b574756, activityId=Task_C
2020-02-10 14:55:09.575 INFO 7748 — [aTaskExecutor-1] o.c.example.service.LoggerDelegate :
… LoggerDelegate invoked by processDefinitionId=ParallelABC:1:70629c3d-4bd1-11ea-92a9-9c899b574756, activityId=Task_Final
Why does camunda execute sequentially if I do not explicitly configure the parallel execution with the async flags?
By default Camunda minimizes database access, reduces latency and allows complete rollback in one TX by executing synchronous parallel service tasks sequentially (even if modeled in parallel).
If your service executions take milliseconds, then you usually do not gain by executing them in parallel. You may safe 50ms due to truly parallel execution, but you lose those savings again due to the more complex threading model and more DB access.
If your service execution takes long, then you gain more from truly parallel execution (as in the example above), but then you should work with an asynchronous service integration pattern (send/receive or external task). If you apply such a pattern then the sequential creation (not execution) of the tasks in Camunda (taking few ms, see above) is usually not a concern and you automatically create the wait states we introduced via the asynchronous before flags in the example.
(Check the client.subscribe(“worker-id”) part of your client code, what topics you have created and subscribed to. Are the ids and topics unique?)
first of all I would like to thank you for your very detailed answer!
By default Camunda minimizes database access, reduces latency and allows complete rollback in one TX by executing synchronous parallel service tasks sequentially (even if modeled in parallel).
So the external flag doesn’t kick in if no TX separation is provided via asynchronous continuation? How does this translate to external workers that only fetch and lock one task at a time? My understanding after studying the user guide is that if no wait state is reached by either progressing to a timer event or an asynchronous continuation the tasks are scheduled to be created as Jobs in ACT_RU_JOB. With async continuation however the Jobs following a parallel fork are pushed to ACT_RU_JOB immediately and allow the Job Executor to lock parallel jobs and distribute them to worker threads (Don’t know about this one really).
Shouldn’t this here read maxJobs instead of maxTasks ?
ExternalTaskClientBuilder maxTasks(int maxTasks)
Specifies the maximum amount of tasks that can be fetched within one request. This information is optional. Default is 10
After implementing the LoggerDelegate version with the ParallelABC process myself it seemed to work at last.
However when I switched from the Delegate Implementation to external workers the workers seemed to fetch the same tasks over and over again as seen in the following logging output.
10-Feb-2020 13:38:23.482 INFO [TopicSubscriptionManager] external.LoggerTestWorker.executeBusinessLogic Logger Test Worker with ID DESKTOP-31cc8a48-5fb2-4acb-b68b-102ad38432cc working
10-Feb-2020 13:38:23.483 INFO [TopicSubscriptionManager] external.LoggerTestWorker.executeBusinessLogic
... LoggerDelegate invoked by processDefinitionId=parallelabc:3:141afd06-4c02-11ea-b588-00ff29e821a6, activityId=Task_B
10-Feb-2020 13:38:23.487 INFO [TopicSubscriptionManager] external.LoggerTestWorker.executeBusinessLogic Logger Test Worker with ID DESKTOP-2174c612-15bb-455c-bce4-420e93625c81 working
10-Feb-2020 13:38:23.487 INFO [TopicSubscriptionManager] external.LoggerTestWorker.executeBusinessLogic
... LoggerDelegate invoked by processDefinitionId=parallelabcd:3:141afd06-4c02-11ea-b588-00ff29e821a6, activityId=Task_C
10-Feb-2020 13:38:26.534 INFO [TopicSubscriptionManager] external.LoggerTestWorker.executeBusinessLogic Logger Test Worker with ID DESKTOP-31cc8a48-5fb2-4acb-b68b-102ad38432cc working
10-Feb-2020 13:38:26.534 INFO [TopicSubscriptionManager] external.LoggerTestWorker.executeBusinessLogic
... LoggerDelegate invoked by processDefinitionId=parallelabcd:3:141afd06-4c02-11ea-b588-00ff29e821a6, activityId=Task_C
10-Feb-2020 13:38:26.547 INFO [TopicSubscriptionManager] external.LoggerTestWorker.executeBusinessLogic Logger Test Worker with ID DESKTOP-2174c612-15bb-455c-bce4-420e93625c81 working
10-Feb-2020 13:38:26.547 INFO [TopicSubscriptionManager] external.LoggerTestWorker.executeBusinessLogic
... LoggerDelegate invoked by processDefinitionId=parallelabcd:3:141afd06-4c02-11ea-b588-00ff29e821a6, activityId=Task_B
So the external flag doesn’t kick in if no TX separation is provided via asynchronous continuation?
In case of a synchronous invocation (Java Delegate) you need the async before flag to achieve the concurrency. An external service task introduces an asynchronous continuation (per definition), so the flag is not required in this scenario. The parallel tasks will be created in the same engine transaction, but then the transaction ends and the external workers can fetch and work on the tasks in parallel.
Please review/share your current external worker code. Is it completing the task? (sorry if this is an obvious question)
I’ve followed the tutorial for external workers as seen here https://github.com/camunda/camunda-external-task-client-java/tree/master/examples/loan-granting. You were right, I was missing the complete call in the demo logging worker I set up for this … I think with async before on the service tasks and async before exclusive on the joining gateway I can make my production code work.
A short follow up question: You said that external service tasks introduce asynchronous continuation per definition and therefore obviously also are marked to be executed non exclusively per definition right? (I’ve tested this and it seems to work)
Anyway since I’ve already copy pasted my classes, here is my code if you’re interested.
Hmm at first I thought it worked but after introducing some “actual” work instead of letting the threads sleep for 3 seconds things seem to break a little for the workers.
The business logic part changes as follows:
@Override
public void executeBusinessLogic(ExternalTask externalTask, ExternalTaskService externalTaskService) {
log.info("Logger Test Worker with ID " + externalTask.getWorkerId() + " working");
log.info("\n\n ... LoggerDelegate invoked by "
+ "processDefinitionId=" + externalTask.getProcessDefinitionId()
+ ", activityId=" + externalTask.getActivityId()
+ " \n\n");
for(int i = 0; i < 500; i++) {
log.info(externalTask.getWorkerId() + " " + externalTask.getActivityId() + " " + i);
}
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("Logger Test Worker with ID " + externalTask.getWorkerId() + " finished sleeping ... completing");
externalTaskService.complete(externalTask);
log.info("Logger Test Worker with ID " + externalTask.getWorkerId() + " completed!");
}
I’ve attached a log file that shows the iterations from the code snippet above and also logged the exceptions from two workers fetching and locking the same task and being unable to complete them.
I see no issues. Please try with and compare to this example:
package org.camunda.example.external.worker;
import lombok.extern.slf4j.Slf4j;
import org.camunda.bpm.client.ExternalTaskClient;
import org.camunda.bpm.client.task.ExternalTask;
import org.camunda.bpm.client.task.ExternalTaskHandler;
import org.camunda.bpm.client.task.ExternalTaskService;
import org.camunda.bpm.client.topic.TopicSubscriptionBuilder;
import java.time.LocalTime;
@Slf4j
public class ExternalWorker implements ExternalTaskHandler {
private ExternalTaskClient externalTaskClient;
public ExternalWorker(String workerId, String topic) {
externalTaskClient = ExternalTaskClient.create()
.baseUrl("http://localhost:8080/rest")
.asyncResponseTimeout(10000)
.workerId(workerId)
.maxTasks(1)
.build();
TopicSubscriptionBuilder builder = externalTaskClient
.subscribe(topic)
.lockDuration(1000)
.handler(this);
builder.open();
}
public static void main(String[] args)
{
ExternalWorker workerA = new ExternalWorker("Worker-A", "TopicA");
ExternalWorker workerB = new ExternalWorker("Worker-B","TopicB");
ExternalWorker workerC = new ExternalWorker("Worker-C","TopicC");
}
@Override
public void execute(ExternalTask externalTask, ExternalTaskService externalTaskService) {
log.info("Worker with ID " + externalTask.getWorkerId() + " working from " + LocalTime.now());
log.info("... LoggerDelegate invoked by "
+ "processDefinitionId=" + externalTask.getProcessDefinitionId()
+ ", activityId=" + externalTask.getActivityId());
for(int i = 0; i < 500; i++) {
log.debug(externalTask.getWorkerId() + " " + externalTask.getActivityId() + " " + i);
}
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
externalTaskService.complete(externalTask);
log.info("Worker with ID " + externalTask.getWorkerId() + " completed at " + LocalTime.now());
}
}
[TopicSubscriptionManager] INFO org.camunda.example.external.worker.ExternalWorker - Worker with ID Worker-B working from 16:26:40.290363400
[TopicSubscriptionManager] INFO org.camunda.example.external.worker.ExternalWorker - Worker with ID Worker-C working from 16:26:40.290363400
[TopicSubscriptionManager] INFO org.camunda.example.external.worker.ExternalWorker - Worker with ID Worker-A working from 16:26:40.290363400
[TopicSubscriptionManager] INFO org.camunda.example.external.worker.ExternalWorker - … LoggerDelegate invoked by processDefinitionId=ParallelABC:2:282204f4-4e36-11ea-b026-9c899b574756, activityId=Task_C
[TopicSubscriptionManager] INFO org.camunda.example.external.worker.ExternalWorker - … LoggerDelegate invoked by processDefinitionId=ParallelABC:2:282204f4-4e36-11ea-b026-9c899b574756, activityId=Task_B
[TopicSubscriptionManager] INFO org.camunda.example.external.worker.ExternalWorker - … LoggerDelegate invoked by processDefinitionId=ParallelABC:2:282204f4-4e36-11ea-b026-9c899b574756, activityId=Task_A
[TopicSubscriptionManager] INFO org.camunda.example.external.worker.ExternalWorker - Worker with ID Worker-A completed at 16:26:43.304397100
[TopicSubscriptionManager] INFO org.camunda.example.external.worker.ExternalWorker - Worker with ID Worker-C completed at 16:26:43.304397100
[TopicSubscriptionManager] INFO org.camunda.example.external.worker.ExternalWorker - Worker with ID Worker-B completed at 16:26:43.304397100
[TopicSubscriptionManager] INFO org.camunda.example.external.worker.ExternalWorker - Worker with ID Worker-B working from 16:26:43.309396300
[TopicSubscriptionManager] INFO org.camunda.example.external.worker.ExternalWorker - … LoggerDelegate invoked by processDefinitionId=ParallelABC:2:282204f4-4e36-11ea-b026-9c899b574756, activityId=Task_B
[TopicSubscriptionManager] INFO org.camunda.example.external.worker.ExternalWorker - Worker with ID Worker-A working from 16:26:43.309396300
[TopicSubscriptionManager] INFO org.camunda.example.external.worker.ExternalWorker - … LoggerDelegate invoked by processDefinitionId=ParallelABC:2:282204f4-4e36-11ea-b026-9c899b574756, activityId=Task_A
[TopicSubscriptionManager] INFO org.camunda.example.external.worker.ExternalWorker - Worker with ID Worker-C working from 16:26:43.309396300
[TopicSubscriptionManager] INFO org.camunda.example.external.worker.ExternalWorker - … LoggerDelegate invoked by processDefinitionId=ParallelABC:2:282204f4-4e36-11ea-b026-9c899b574756, activityId=Task_C
[TopicSubscriptionManager] INFO org.camunda.example.external.worker.ExternalWorker - Worker with ID Worker-C completed at 16:26:46.321148400
[TopicSubscriptionManager] INFO org.camunda.example.external.worker.ExternalWorker - Worker with ID Worker-B completed at 16:26:46.322148800
[TopicSubscriptionManager] INFO org.camunda.example.external.worker.ExternalWorker - Worker with ID Worker-A completed at 16:26:46.323152200
I do not know exactly why but this exact example seems to fail on my end when I add logging output to test CPU bound tasks before the thread goes to sleep.
log.info("Executing business logic for worker with id {}", this.getId());
for (int i = 0; i < 500; i++) {
log.info(externalTask.getWorkerId() + " " + externalTask.getActivityId() + " " + i);
}
Are you able to reproduce this issue on your end @rob2universe? The resulting log on my end can be seen in the file I uploaded a couple of posts ago if you want to cross check with your own output.
EDIT: Nevermind, of course it failed because I set the lock duration too low for my external workers.
“TASK/CLIENT-01007 Exception while completing the external task: The task’s most recent lock could not be acquired” is an message which is expected in case of concurrent access to the same execution. However, the retries will fix this and in the audit history you will see that all process instances get completed.
Switching from Thread.sleep to “actual work” increases CPU load and hence, when running on the same CPUs, likely makes the DB and server slower. This would explain why you see increased locking when the external task clients create CPU load.
Taking a step back:
a) Can Camunda execute sync. services truly concurrently: Yes (but it is questionable if it makes sense. Please see reasoning in thread)
b) Can external (async) Camunda clients work on tasks concurrently? Yes, as shown
c) Can multiple external client workers work on the same task type (topic)? Yes, as shown
d) Do external workers lock (and complete) tasks exclusively? Yes (if the work is completed withing the defined lock duration)
The
higher the concurrency (parallel tasks, number of clients)
and lower the system resources on server-side (DB!)
the more locking conflicts can be expected.
e) Does locking have an impact on the successful execution of my processes? Retries will help with the successful complete of the process instance, but consume time.
Is our example a somewhat realistic test case? No. We create parallel task, which take evenly distributed time, run on the same hardware sharing system resources and on a dev level DB (H2, at least me).
What would your real world scenario look like wrt task execution times, DB, concurrency, etc ?
We are facing a similar issue in parallel gateway and getting an Optimistic Locking Exception. Could you please explain in brief how adding the async flag to the inclusive gateway will resolve the issue?
An OLE protects you from overwriting an already updated version of your execution with another concurrent update. Retrying the job based on the latest execution version ensures a clean update. An async before the gateway merging the executions decouples the previous execution from the gateway and adds an automatic retry mechanism. You may also find this interesting: PVM Execution Tree · camunda/camunda-bpm-platform Wiki · GitHub