Hi,
How to limit number of tasks processed at the given moment when I need to using vert.x (or any other reactive worker implementation)?
Test case is simple:
- bpmn process with single service task ‘test’.
- spring boot zeebe client
Implementation of worker is following:
@JobWorker(autoComplete = false, timeout = 60000, maxJobsActive = 3, requestTimeout = 10, pollInterval = 10000)
public void test(JobClient client, ActivatedJob job) {
log.info("Task consumed: {}-{}", job.getProcessInstanceKey(), job.getKey());
vertx.eventBus().request("hello", job.getProcessInstanceKey(), new DeliveryOptions().setSendTimeout(60000), reply -> {
if(reply.succeeded()) {
log.info("Event bus success {}-{}", job.getProcessInstanceKey(), job.getKey());
client.newCompleteCommand(job.getKey())
.send()
.whenComplete((result, exception) -> {
log.info("Job completed {}", job.getKey());
})
.exceptionally(throwable -> { throw new RuntimeException("Couldn't complete job" + job, throwable);});
} else {
log.info("Event bus failure {}", job.getProcessInstanceKey(), reply.cause());
client.newFailCommand(job.getKey()).retries(job.getRetries() - 1).send();
}
});
}
Application logs:
2023-10-22 17:48:31,125 [main] INFO ai.minte.core.Application - Started Application in 3.563 seconds (process running for 3.708)
2023-10-22 17:48:33,088 [grpc-default-executor-1] INFO a.m.c.v.RestControllerVerticle - Created process 2251799813899767
2023-10-22 17:48:33,104 [grpc-default-executor-0] INFO a.m.c.v.RestControllerVerticle - Created process 2251799813899774
2023-10-22 17:48:33,111 [grpc-default-executor-1] INFO a.m.c.v.RestControllerVerticle - Created process 2251799813899781
2023-10-22 17:48:33,118 [grpc-default-executor-0] INFO a.m.c.v.RestControllerVerticle - Created process 2251799813899788
2023-10-22 17:48:33,125 [grpc-default-executor-1] INFO a.m.c.v.RestControllerVerticle - Created process 2251799813899795
2023-10-22 17:48:33,134 [grpc-default-executor-0] INFO a.m.c.v.RestControllerVerticle - Created process 2251799813899802
2023-10-22 17:48:33,142 [grpc-default-executor-1] INFO a.m.c.v.RestControllerVerticle - Created process 2251799813899809
2023-10-22 17:48:33,149 [grpc-default-executor-0] INFO a.m.c.v.RestControllerVerticle - Created process 2251799813899816
2023-10-22 17:48:33,155 [grpc-default-executor-1] INFO a.m.c.v.RestControllerVerticle - Created process 2251799813899823
2023-10-22 17:48:33,162 [grpc-default-executor-0] INFO a.m.c.v.RestControllerVerticle - Created process 2251799813899830
2023-10-22 17:48:41,122 [pool-2-thread-1] TRACE io.camunda.zeebe.client.job.poller - Polling at max 3 jobs for worker testWorker#test and job type test
2023-10-22 17:48:41,148 [grpc-default-executor-1] DEBUG io.camunda.zeebe.client.job.poller - Activated 3 jobs for worker testWorker#test and job type test
2023-10-22 17:48:41,148 [pool-2-thread-1] INFO ai.minte.core.worker.TestWorker - Task consumed: 2251799813899767-2251799813899773
2023-10-22 17:48:41,149 [pool-2-thread-1] INFO ai.minte.core.worker.TestWorker - Task consumed: 2251799813899774-2251799813899780
2023-10-22 17:48:41,150 [pool-2-thread-1] TRACE io.camunda.zeebe.client.job.poller - Polling at max 2 jobs for worker testWorker#test and job type test
2023-10-22 17:48:41,150 [pool-2-thread-1] INFO ai.minte.core.worker.TestWorker - Task consumed: 2251799813899781-2251799813899787
2023-10-22 17:48:41,152 [grpc-default-executor-1] DEBUG io.camunda.zeebe.client.job.poller - Activated 2 jobs for worker testWorker#test and job type test
2023-10-22 17:48:41,152 [pool-2-thread-1] INFO ai.minte.core.worker.TestWorker - Task consumed: 2251799813899788-2251799813899794
2023-10-22 17:48:41,153 [pool-2-thread-1] TRACE io.camunda.zeebe.client.job.poller - Polling at max 2 jobs for worker testWorker#test and job type test
2023-10-22 17:48:41,153 [pool-2-thread-1] INFO ai.minte.core.worker.TestWorker - Task consumed: 2251799813899795-2251799813899801
2023-10-22 17:48:41,155 [grpc-default-executor-0] DEBUG io.camunda.zeebe.client.job.poller - Activated 2 jobs for worker testWorker#test and job type test
2023-10-22 17:48:41,155 [pool-2-thread-1] INFO ai.minte.core.worker.TestWorker - Task consumed: 2251799813899802-2251799813899808
2023-10-22 17:48:41,155 [pool-2-thread-1] TRACE io.camunda.zeebe.client.job.poller - Polling at max 2 jobs for worker testWorker#test and job type test
2023-10-22 17:48:41,155 [pool-2-thread-1] INFO ai.minte.core.worker.TestWorker - Task consumed: 2251799813899809-2251799813899815
2023-10-22 17:48:41,157 [grpc-default-executor-1] DEBUG io.camunda.zeebe.client.job.poller - Activated 2 jobs for worker testWorker#test and job type test
2023-10-22 17:48:41,157 [pool-2-thread-1] INFO ai.minte.core.worker.TestWorker - Task consumed: 2251799813899816-2251799813899822
2023-10-22 17:48:41,157 [pool-2-thread-1] TRACE io.camunda.zeebe.client.job.poller - Polling at max 2 jobs for worker testWorker#test and job type test
2023-10-22 17:48:41,157 [pool-2-thread-1] INFO ai.minte.core.worker.TestWorker - Task consumed: 2251799813899823-2251799813899829
2023-10-22 17:48:41,159 [grpc-default-executor-0] DEBUG io.camunda.zeebe.client.job.poller - Activated 1 jobs for worker testWorker#test and job type test
2023-10-22 17:48:41,159 [pool-2-thread-1] INFO ai.minte.core.worker.TestWorker - Task consumed: 2251799813899830-2251799813899836
2023-10-22 17:48:41,159 [pool-2-thread-1] TRACE io.camunda.zeebe.client.job.poller - Polling at max 3 jobs for worker testWorker#test and job type test
2023-10-22 17:48:42,152 [vert.x-eventloop-thread-3] INFO a.m.c.verticle.HelloEventBusVerticle - Bus replied: Hello 2251799813899767 from f4267d90-6d09-409f-9868-41a464e6ac92
2023-10-22 17:48:42,152 [vert.x-eventloop-thread-5] INFO ai.minte.core.worker.TestWorker - Event bus success 2251799813899767-2251799813899773
2023-10-22 17:48:42,159 [grpc-default-executor-1] INFO ai.minte.core.worker.TestWorker - Job completed 2251799813899773
2023-10-22 17:48:43,152 [vert.x-eventloop-thread-3] INFO a.m.c.verticle.HelloEventBusVerticle - Bus replied: Hello 2251799813899774 from f4267d90-6d09-409f-9868-41a464e6ac92
2023-10-22 17:48:43,152 [vert.x-eventloop-thread-5] INFO ai.minte.core.worker.TestWorker - Event bus success 2251799813899774-2251799813899780
2023-10-22 17:48:43,167 [grpc-default-executor-0] INFO ai.minte.core.worker.TestWorker - Job completed 2251799813899780
2023-10-22 17:48:44,152 [vert.x-eventloop-thread-3] INFO a.m.c.verticle.HelloEventBusVerticle - Bus replied: Hello 2251799813899781 from f4267d90-6d09-409f-9868-41a464e6ac92
2023-10-22 17:48:44,153 [vert.x-eventloop-thread-5] INFO ai.minte.core.worker.TestWorker - Event bus success 2251799813899781-2251799813899787
2023-10-22 17:48:44,167 [grpc-default-executor-1] INFO ai.minte.core.worker.TestWorker - Job completed 2251799813899787
2023-10-22 17:48:45,153 [vert.x-eventloop-thread-3] INFO a.m.c.verticle.HelloEventBusVerticle - Bus replied: Hello 2251799813899788 from f4267d90-6d09-409f-9868-41a464e6ac92
2023-10-22 17:48:45,153 [vert.x-eventloop-thread-5] INFO ai.minte.core.worker.TestWorker - Event bus success 2251799813899788-2251799813899794
2023-10-22 17:48:45,167 [grpc-default-executor-0] INFO ai.minte.core.worker.TestWorker - Job completed 2251799813899794
2023-10-22 17:48:46,153 [vert.x-eventloop-thread-3] INFO a.m.c.verticle.HelloEventBusVerticle - Bus replied: Hello 2251799813899795 from f4267d90-6d09-409f-9868-41a464e6ac92
2023-10-22 17:48:46,153 [vert.x-eventloop-thread-5] INFO ai.minte.core.worker.TestWorker - Event bus success 2251799813899795-2251799813899801
2023-10-22 17:48:46,167 [grpc-default-executor-1] INFO ai.minte.core.worker.TestWorker - Job completed 2251799813899801
2023-10-22 17:48:47,153 [vert.x-eventloop-thread-3] INFO a.m.c.verticle.HelloEventBusVerticle - Bus replied: Hello 2251799813899802 from f4267d90-6d09-409f-9868-41a464e6ac92
2023-10-22 17:48:47,154 [vert.x-eventloop-thread-5] INFO ai.minte.core.worker.TestWorker - Event bus success 2251799813899802-2251799813899808
2023-10-22 17:48:47,167 [grpc-default-executor-0] INFO ai.minte.core.worker.TestWorker - Job completed 2251799813899808
2023-10-22 17:48:48,154 [vert.x-eventloop-thread-3] INFO a.m.c.verticle.HelloEventBusVerticle - Bus replied: Hello 2251799813899809 from f4267d90-6d09-409f-9868-41a464e6ac92
2023-10-22 17:48:48,154 [vert.x-eventloop-thread-5] INFO ai.minte.core.worker.TestWorker - Event bus success 2251799813899809-2251799813899815
2023-10-22 17:48:48,168 [grpc-default-executor-1] INFO ai.minte.core.worker.TestWorker - Job completed 2251799813899815
2023-10-22 17:48:49,154 [vert.x-eventloop-thread-3] INFO a.m.c.verticle.HelloEventBusVerticle - Bus replied: Hello 2251799813899816 from f4267d90-6d09-409f-9868-41a464e6ac92
2023-10-22 17:48:49,154 [vert.x-eventloop-thread-5] INFO ai.minte.core.worker.TestWorker - Event bus success 2251799813899816-2251799813899822
2023-10-22 17:48:49,157 [grpc-default-executor-0] INFO ai.minte.core.worker.TestWorker - Job completed 2251799813899822
2023-10-22 17:48:50,154 [vert.x-eventloop-thread-3] INFO a.m.c.verticle.HelloEventBusVerticle - Bus replied: Hello 2251799813899823 from f4267d90-6d09-409f-9868-41a464e6ac92
2023-10-22 17:48:50,155 [vert.x-eventloop-thread-5] INFO ai.minte.core.worker.TestWorker - Event bus success 2251799813899823-2251799813899829
2023-10-22 17:48:50,158 [grpc-default-executor-1] INFO ai.minte.core.worker.TestWorker - Job completed 2251799813899829
2023-10-22 17:48:51,155 [vert.x-eventloop-thread-3] INFO a.m.c.verticle.HelloEventBusVerticle - Bus replied: Hello 2251799813899830 from f4267d90-6d09-409f-9868-41a464e6ac92
2023-10-22 17:48:51,155 [vert.x-eventloop-thread-5] INFO ai.minte.core.worker.TestWorker - Event bus success 2251799813899830-2251799813899836
2023-10-22 17:48:51,169 [grpc-default-executor-0] INFO ai.minte.core.worker.TestWorker - Job completed 2251799813899836
2023-10-22 17:48:51,194 [grpc-default-executor-1] TRACE io.camunda.zeebe.client.job.poller - No jobs activated for worker testWorker#test and job type test
2023-10-22 17:49:01,194 [pool-2-thread-1] TRACE io.camunda.zeebe.client.job.poller - Polling at max 3 jobs for worker testWorker#test and job type test
First I created 10 processes. When zeebe client try to consume ‘test’ jobs I see that once above code block is finished, client decreases number of activatedJobs
counter and therefore consumes ALL the tasks (If there is thousands, It get it all).
I would like the client to respect the maxJobsActive
parameter. How to achieve that?
Camunda version:
dependency 'io.camunda.spring:spring-boot-starter-camunda:8.3.0'
Regards,
Art.