Hi,
we noticed a strange effect that occurs when trying to access the process variables inside a Mono chain (from project reactor). Our code is usually inside a JavaDelegate that is referenced in the model by a delegate expression.
However, this will produce an exception that I am not allowed to access the serializers outside of a command context
ENGINE-03041 Cannot work with serializers outside of command context.
However, this is not a problem with Project Reactor’s Mono chain (the commented out line in the retrieveJoke method works as expected). I suspect that Camunda cannot handle the parallel thread were the HTTP Request (and the doOnNext side effect method) gets executed.
If I add a .log() statement to the chain, my log output (with the threads) looks like this:
2019-11-14 23:18:07.256 INFO 2565 --- [nio-8080-exec-2] reactor.Mono.MapFuseable.1 : | onSubscribe([Fuseable] FluxMapFuseable.MapFuseableSubscriber)
2019-11-14 23:18:07.257 INFO 2565 --- [nio-8080-exec-2] reactor.Mono.MapFuseable.1 : | request(unbounded)
2019-11-14 23:18:07.695 INFO 2565 --- [or-http-epoll-1] reactor.Mono.MapFuseable.1 : | onNext(Chuck Norris doesn't throw up if he drinks too much. Chuck Norris throws down!)
2019-11-14 23:18:07.698 INFO 2565 --- [or-http-epoll-1] reactor.Mono.MapFuseable.1 : | cancel()
2019-11-14 23:18:07.699 INFO 2565 --- [or-http-epoll-1] reactor.Mono.MapFuseable.1 : | onComplete()
2019-11-14 23:18:07.700 ERROR 2565 --- [nio-8080-exec-2] org.camunda.bpm.engine.context : ENGINE-16006 BPMN Stack Trace:
ReadServiceTask (activity-execute, ProcessInstance[a25b9dd3-072c-11ea-a395-60f677b2052f])
the only valid workaround that I found is to call .block() on the Mono chain to immediately retrieve the result from the HTTP Request and then access the process variables. This is a little bit nasty because it violates idiomatic reactive programming and is hard to test without some rather problematic integration tests.
Is there a better way doing that? I tried to specify a Scheduler with the .publishOn(Scheduler) method from the Mono chain, but I can’t figure out how to get an appropriate Thread from the JobExecutor of Camunda.
I uploaded the complete demo project to my github repo:
A DelegateExecution object is only useful within the thread and callback that you receive it in. It is not useful in other threads, because certain internal logic is bound to a thread (e.g. transaction management, entity caching, etc.). It is not useful outside of a callback, because it is mutable (e.g. when process execution continues, the DelegateExecutions properties will change).
I’m not experienced with reactive programming in general or WebFlux in particular, but one idea is to set the variables via RuntimeService API, which will then use its own transaction, e.g.:
Hey Thorben,
thanks for your reply and helping me understand the inner working of Camunda a little bit better!
Unfortunately, your suggested solution does not work.
While trying to get the runtimeService inside the doOnNext method I get an error
ENGINE-16004 Exception while closing command context: null
So I thought, why not trying to autowire the runtimeService at the construction and just call it in the doOnNext method?
However, this will produce an error that the execution does not exist:
execution 4c6c4baf-07fc-11ea-9ce8-60f677b2052f doesn’t exist: execution is null
PS: the block() statement will ensure that the execution won’t end until the reactive call chain is finished with processing. So I believe the execution (and its underlying process instance) should still exist.
It’s probably the other case then, the execution isn’t persisted yet. You can understand the situation by putting a breakpoint into the delegate and then check in the database what is there.
I assume that I have to make a service task run async in order to persist anything, right?
By doing that, the process will start working, but
only when using runtimeService.setVariable(…) instead of execution.setVariable(…)
with some weird stuff going on with camunda’s transaction handling
org.camunda.bpm.engine.OptimisticLockingException: ENGINE-03005 Execution of ‘UPDATE ExecutionEntity[7fb863cf-0b24-11ea-8f73-60f677b2052f]’ failed. Entity was updated by another transaction concurrently.
For now, this is beyond my understanding of project reactor’s vs camunda’s threading model. I assume that both camunda execution and the http thread that executes onNext are trying to write at the same time.
@Override
public void execute(DelegateExecution execution) {
final var jokeText = retrieveJoke().block();
execution.setVariable("jokeText", jokeText);
}
this blocking call always works (regardless of sync/async or the used setVariable method) and might be better than fiddling around with schedulers (which should be possible with a specialized ExecutorService, if I understand Project Reactor’s documentation on the publishOn method correctly: Mono (reactor-core 3.6.1))