Reactive zeebe client (vert.x)

Hi,

How to limit number of tasks processed at the given moment when I need to using vert.x (or any other reactive worker implementation)?

Test case is simple:

  • bpmn process with single service task ‘test’.
  • spring boot zeebe client

Implementation of worker is following:

@JobWorker(autoComplete = false, timeout = 60000, maxJobsActive = 3, requestTimeout = 10, pollInterval = 10000)
    public void test(JobClient client, ActivatedJob job) {
        log.info("Task consumed: {}-{}", job.getProcessInstanceKey(), job.getKey());

        vertx.eventBus().request("hello", job.getProcessInstanceKey(), new DeliveryOptions().setSendTimeout(60000), reply -> {
            if(reply.succeeded()) {
                log.info("Event bus success {}-{}", job.getProcessInstanceKey(), job.getKey());
                client.newCompleteCommand(job.getKey())
                        .send()
                        .whenComplete((result, exception) -> {
                            log.info("Job completed {}", job.getKey());
                        })
                        .exceptionally(throwable -> { throw new RuntimeException("Couldn't complete job" + job, throwable);});
            } else {
                log.info("Event bus failure {}", job.getProcessInstanceKey(), reply.cause());
                client.newFailCommand(job.getKey()).retries(job.getRetries() - 1).send();
            }
        });
    }

Application logs:

2023-10-22 17:48:31,125 [main] INFO  ai.minte.core.Application - Started Application in 3.563 seconds (process running for 3.708)
2023-10-22 17:48:33,088 [grpc-default-executor-1] INFO  a.m.c.v.RestControllerVerticle - Created process 2251799813899767
2023-10-22 17:48:33,104 [grpc-default-executor-0] INFO  a.m.c.v.RestControllerVerticle - Created process 2251799813899774
2023-10-22 17:48:33,111 [grpc-default-executor-1] INFO  a.m.c.v.RestControllerVerticle - Created process 2251799813899781
2023-10-22 17:48:33,118 [grpc-default-executor-0] INFO  a.m.c.v.RestControllerVerticle - Created process 2251799813899788
2023-10-22 17:48:33,125 [grpc-default-executor-1] INFO  a.m.c.v.RestControllerVerticle - Created process 2251799813899795
2023-10-22 17:48:33,134 [grpc-default-executor-0] INFO  a.m.c.v.RestControllerVerticle - Created process 2251799813899802
2023-10-22 17:48:33,142 [grpc-default-executor-1] INFO  a.m.c.v.RestControllerVerticle - Created process 2251799813899809
2023-10-22 17:48:33,149 [grpc-default-executor-0] INFO  a.m.c.v.RestControllerVerticle - Created process 2251799813899816
2023-10-22 17:48:33,155 [grpc-default-executor-1] INFO  a.m.c.v.RestControllerVerticle - Created process 2251799813899823
2023-10-22 17:48:33,162 [grpc-default-executor-0] INFO  a.m.c.v.RestControllerVerticle - Created process 2251799813899830
2023-10-22 17:48:41,122 [pool-2-thread-1] TRACE io.camunda.zeebe.client.job.poller - Polling at max 3 jobs for worker testWorker#test and job type test
2023-10-22 17:48:41,148 [grpc-default-executor-1] DEBUG io.camunda.zeebe.client.job.poller - Activated 3 jobs for worker testWorker#test and job type test
2023-10-22 17:48:41,148 [pool-2-thread-1] INFO  ai.minte.core.worker.TestWorker - Task consumed: 2251799813899767-2251799813899773
2023-10-22 17:48:41,149 [pool-2-thread-1] INFO  ai.minte.core.worker.TestWorker - Task consumed: 2251799813899774-2251799813899780
2023-10-22 17:48:41,150 [pool-2-thread-1] TRACE io.camunda.zeebe.client.job.poller - Polling at max 2 jobs for worker testWorker#test and job type test
2023-10-22 17:48:41,150 [pool-2-thread-1] INFO  ai.minte.core.worker.TestWorker - Task consumed: 2251799813899781-2251799813899787
2023-10-22 17:48:41,152 [grpc-default-executor-1] DEBUG io.camunda.zeebe.client.job.poller - Activated 2 jobs for worker testWorker#test and job type test
2023-10-22 17:48:41,152 [pool-2-thread-1] INFO  ai.minte.core.worker.TestWorker - Task consumed: 2251799813899788-2251799813899794
2023-10-22 17:48:41,153 [pool-2-thread-1] TRACE io.camunda.zeebe.client.job.poller - Polling at max 2 jobs for worker testWorker#test and job type test
2023-10-22 17:48:41,153 [pool-2-thread-1] INFO  ai.minte.core.worker.TestWorker - Task consumed: 2251799813899795-2251799813899801
2023-10-22 17:48:41,155 [grpc-default-executor-0] DEBUG io.camunda.zeebe.client.job.poller - Activated 2 jobs for worker testWorker#test and job type test
2023-10-22 17:48:41,155 [pool-2-thread-1] INFO  ai.minte.core.worker.TestWorker - Task consumed: 2251799813899802-2251799813899808
2023-10-22 17:48:41,155 [pool-2-thread-1] TRACE io.camunda.zeebe.client.job.poller - Polling at max 2 jobs for worker testWorker#test and job type test
2023-10-22 17:48:41,155 [pool-2-thread-1] INFO  ai.minte.core.worker.TestWorker - Task consumed: 2251799813899809-2251799813899815
2023-10-22 17:48:41,157 [grpc-default-executor-1] DEBUG io.camunda.zeebe.client.job.poller - Activated 2 jobs for worker testWorker#test and job type test
2023-10-22 17:48:41,157 [pool-2-thread-1] INFO  ai.minte.core.worker.TestWorker - Task consumed: 2251799813899816-2251799813899822
2023-10-22 17:48:41,157 [pool-2-thread-1] TRACE io.camunda.zeebe.client.job.poller - Polling at max 2 jobs for worker testWorker#test and job type test
2023-10-22 17:48:41,157 [pool-2-thread-1] INFO  ai.minte.core.worker.TestWorker - Task consumed: 2251799813899823-2251799813899829
2023-10-22 17:48:41,159 [grpc-default-executor-0] DEBUG io.camunda.zeebe.client.job.poller - Activated 1 jobs for worker testWorker#test and job type test
2023-10-22 17:48:41,159 [pool-2-thread-1] INFO  ai.minte.core.worker.TestWorker - Task consumed: 2251799813899830-2251799813899836
2023-10-22 17:48:41,159 [pool-2-thread-1] TRACE io.camunda.zeebe.client.job.poller - Polling at max 3 jobs for worker testWorker#test and job type test
2023-10-22 17:48:42,152 [vert.x-eventloop-thread-3] INFO  a.m.c.verticle.HelloEventBusVerticle - Bus replied: Hello 2251799813899767 from f4267d90-6d09-409f-9868-41a464e6ac92
2023-10-22 17:48:42,152 [vert.x-eventloop-thread-5] INFO  ai.minte.core.worker.TestWorker - Event bus success 2251799813899767-2251799813899773
2023-10-22 17:48:42,159 [grpc-default-executor-1] INFO  ai.minte.core.worker.TestWorker - Job completed 2251799813899773
2023-10-22 17:48:43,152 [vert.x-eventloop-thread-3] INFO  a.m.c.verticle.HelloEventBusVerticle - Bus replied: Hello 2251799813899774 from f4267d90-6d09-409f-9868-41a464e6ac92
2023-10-22 17:48:43,152 [vert.x-eventloop-thread-5] INFO  ai.minte.core.worker.TestWorker - Event bus success 2251799813899774-2251799813899780
2023-10-22 17:48:43,167 [grpc-default-executor-0] INFO  ai.minte.core.worker.TestWorker - Job completed 2251799813899780
2023-10-22 17:48:44,152 [vert.x-eventloop-thread-3] INFO  a.m.c.verticle.HelloEventBusVerticle - Bus replied: Hello 2251799813899781 from f4267d90-6d09-409f-9868-41a464e6ac92
2023-10-22 17:48:44,153 [vert.x-eventloop-thread-5] INFO  ai.minte.core.worker.TestWorker - Event bus success 2251799813899781-2251799813899787
2023-10-22 17:48:44,167 [grpc-default-executor-1] INFO  ai.minte.core.worker.TestWorker - Job completed 2251799813899787
2023-10-22 17:48:45,153 [vert.x-eventloop-thread-3] INFO  a.m.c.verticle.HelloEventBusVerticle - Bus replied: Hello 2251799813899788 from f4267d90-6d09-409f-9868-41a464e6ac92
2023-10-22 17:48:45,153 [vert.x-eventloop-thread-5] INFO  ai.minte.core.worker.TestWorker - Event bus success 2251799813899788-2251799813899794
2023-10-22 17:48:45,167 [grpc-default-executor-0] INFO  ai.minte.core.worker.TestWorker - Job completed 2251799813899794
2023-10-22 17:48:46,153 [vert.x-eventloop-thread-3] INFO  a.m.c.verticle.HelloEventBusVerticle - Bus replied: Hello 2251799813899795 from f4267d90-6d09-409f-9868-41a464e6ac92
2023-10-22 17:48:46,153 [vert.x-eventloop-thread-5] INFO  ai.minte.core.worker.TestWorker - Event bus success 2251799813899795-2251799813899801
2023-10-22 17:48:46,167 [grpc-default-executor-1] INFO  ai.minte.core.worker.TestWorker - Job completed 2251799813899801
2023-10-22 17:48:47,153 [vert.x-eventloop-thread-3] INFO  a.m.c.verticle.HelloEventBusVerticle - Bus replied: Hello 2251799813899802 from f4267d90-6d09-409f-9868-41a464e6ac92
2023-10-22 17:48:47,154 [vert.x-eventloop-thread-5] INFO  ai.minte.core.worker.TestWorker - Event bus success 2251799813899802-2251799813899808
2023-10-22 17:48:47,167 [grpc-default-executor-0] INFO  ai.minte.core.worker.TestWorker - Job completed 2251799813899808
2023-10-22 17:48:48,154 [vert.x-eventloop-thread-3] INFO  a.m.c.verticle.HelloEventBusVerticle - Bus replied: Hello 2251799813899809 from f4267d90-6d09-409f-9868-41a464e6ac92
2023-10-22 17:48:48,154 [vert.x-eventloop-thread-5] INFO  ai.minte.core.worker.TestWorker - Event bus success 2251799813899809-2251799813899815
2023-10-22 17:48:48,168 [grpc-default-executor-1] INFO  ai.minte.core.worker.TestWorker - Job completed 2251799813899815
2023-10-22 17:48:49,154 [vert.x-eventloop-thread-3] INFO  a.m.c.verticle.HelloEventBusVerticle - Bus replied: Hello 2251799813899816 from f4267d90-6d09-409f-9868-41a464e6ac92
2023-10-22 17:48:49,154 [vert.x-eventloop-thread-5] INFO  ai.minte.core.worker.TestWorker - Event bus success 2251799813899816-2251799813899822
2023-10-22 17:48:49,157 [grpc-default-executor-0] INFO  ai.minte.core.worker.TestWorker - Job completed 2251799813899822
2023-10-22 17:48:50,154 [vert.x-eventloop-thread-3] INFO  a.m.c.verticle.HelloEventBusVerticle - Bus replied: Hello 2251799813899823 from f4267d90-6d09-409f-9868-41a464e6ac92
2023-10-22 17:48:50,155 [vert.x-eventloop-thread-5] INFO  ai.minte.core.worker.TestWorker - Event bus success 2251799813899823-2251799813899829
2023-10-22 17:48:50,158 [grpc-default-executor-1] INFO  ai.minte.core.worker.TestWorker - Job completed 2251799813899829
2023-10-22 17:48:51,155 [vert.x-eventloop-thread-3] INFO  a.m.c.verticle.HelloEventBusVerticle - Bus replied: Hello 2251799813899830 from f4267d90-6d09-409f-9868-41a464e6ac92
2023-10-22 17:48:51,155 [vert.x-eventloop-thread-5] INFO  ai.minte.core.worker.TestWorker - Event bus success 2251799813899830-2251799813899836
2023-10-22 17:48:51,169 [grpc-default-executor-0] INFO  ai.minte.core.worker.TestWorker - Job completed 2251799813899836
2023-10-22 17:48:51,194 [grpc-default-executor-1] TRACE io.camunda.zeebe.client.job.poller - No jobs activated for worker testWorker#test and job type test
2023-10-22 17:49:01,194 [pool-2-thread-1] TRACE io.camunda.zeebe.client.job.poller - Polling at max 3 jobs for worker testWorker#test and job type test

First I created 10 processes. When zeebe client try to consume ‘test’ jobs I see that once above code block is finished, client decreases number of activatedJobs counter and therefore consumes ALL the tasks (If there is thousands, It get it all).
I would like the client to respect the maxJobsActive parameter. How to achieve that?

Camunda version:
dependency 'io.camunda.spring:spring-boot-starter-camunda:8.3.0'

Regards,
Art.

Hi @Art - based on what you’ve described, I think things are working as intended. Let me explain a bit and please correct me if I’ve misunderstood your situation!

The first setting is the pollInterval, which you have set to 10 seconds. When the worker has no jobs to process, it will wait for the poll interval before checking to see if any jobs need to be executed. The maxActiveJobs setting defines the maximum amount of jobs the worker fetches from the process engine; it also has an internal counter to track how many of those jobs are left to process. In this case, you started 10 processes and the worker fetched 3.

When the internal job counter has less than or equal to 30% (rounded up) of the maximum active jobs, it will then poll for more jobs. In your scenario, 30% rounded up is 1 remaining job, so after 2 jobs are finished, it will poll for and begin executing 2 additional jobs. Note that the poll interval only applies when the worker is idle, not while there is a queue of jobs.

This is the behavior I’m seeing in your logs. It pulls 3 jobs, executes 2, then polls for 2 more, and repeats until all the jobs are complete. At no point is the worker processing more than 3 jobs at the same time.

The problem is that the worker fetched all 10 jobs, but none of them has been completed yet. All are still active. It seems that as soon as vertx puts an event to the event bus (and releases the thread), the worker thinks the job is done, even though the ‘complete’ command hasn’t been called.
How to achieve some limitation of processing jobs at the given moment (in order not to overload the database or another external service)?

@Art - got it, I slightly misunderstood the question! Let me dig a little deeper and see what I can find out!

@Art - good news and not as good news … the not as good news is that this absolutely a gap in the Zeebe client library. The internal counter for the number of active jobs thinks the job is complete because the call to the job handler completed. There isn’t a great workaround at the moment other than going against the asynchronous pattern you’ve implemented and block the thread until it completes.

However, there is good news! This is something that the engineers are currently work on. I don’t have a release date, but they are working to support async job handlers, which would solve this problem.

Thank you for the answer (for the good news only :smiley:).
For now I ended up with something like this:

val future = new CompletableFuture<>();

vertx
    .eventBus()
    .<ObjectNode>request(
        GET_CASE_DATA,
        objectMapper.valueToTree(eventDto)
    ).compose(asyncResult -> {
        ...
    ).onComplete(ignored -> future.complete(true));

future.get(jobTimeout, TimeUnit.MILLISECONDS);

It seems to work.
Additionally I had to provide my own ZeebeClientExecutorService in order to have more than one worker thread.
Thanks.

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.