Dynamically enable/disable AsyncBefore

Hello,

I’m using the Camunda Springboot Starter

Let’s say I have the following processes:

Process A

Process B

Both processes use Call Activity C.
I have the following two requirements:

  1. If Process A is initiated, all Service Tasks (including service tasks of Call Activity C) should be AsyncBefore, in order to commit a storepoint to the database after each task (to be stored as incident in case of error).
  2. If Process B is initiated, all service tasks should be synchronous (in order to have a synchronous response back to the initial caller).

Is there a way I can achieve this business logic? (maybe by dynamically setting the AsyncBefore flag based on a condition? or maybe by forcing the process to wait until Process B has finished? or maybe a BPMN Parse listener somehow?)

As a side-note, I’m using the Java RuntimeService to start a process like below:

...
RuntimeService runtimeService = ProcessEngines.getDefaultProcessEngine().getRuntimeService();
ProcessInstanceWithVariables process;
process = runtimeService.createProcessInstanceByKey(myProcessKey)
                .executeWithVariablesInReturn();
...

Thanks in advance

Hello @nmanitaras ,

First: this is not possible out of the box.

Second: I would wrap the synchronous call, introducing either a callback or a polling tha waits for the process instance to be finished and then return.

In general, wait states should not be used to achieve some kind of synchronous behaviour for api interaction. Furthermore, they serve as save points like you mentioned and will impact performance.

I hope this helps

Jonathan

1 Like

Could you please check the question I asked here: ParseListener spring boot starter configuration - #7 by nmanitaras

If I manage to make this work, it might solve the whole issue.
Thanks!

Also, is there any other way to force the Camunda engine to store the point of each incident, without specifying AsyncBefore/After?

Basically I’m trying to provide operational users a way to retry some failed steps, without having to retry the whole process.

Hello @nmanitaras ,

async after/before exists exactly for this purpose.

Why are you trying to avoid async behaviour?

Jonathan

As I mentioned in my base question, some processes should be completely synchronous.

I believe that the only solution is to make a duplicate of the Call Activity C bpmn, which will contain no asyncBefore, and one which contains asyncBefore on the desired wait states.

Hello @nmanitaras ,

why should the process be completely synchronous?

The only reason I would see is that everything should run in one database transaction.

On top, if anything “unstable” is executed during the process and fails, the initiator will receive an error.

Jonathan

Let’s say Process A is called from System A, which doesn’t care for a synchronous response. Therefore, Process A, as well as Call Activity C, should contain “asyncBefore = true” at each step, in order to keep save points, so that Operational Users can retry the failed steps on-demand.

In the other hand, Process B is called from System B, which is waiting for a synchronous response of the process completion. Therefore, Call Activity C should not contain any “asyncBefore = true”. I don’t care for Operational Users to retry in the case of Process B, only for Process A.

My question is, is there a way I can dynamically set Call Activity C’s service tasks async or sync based on the caller process? Or am I forced to create two Call Activity C bpmns, and one will be synchronous and the other will contain asyncBefore at each step?

Hello @nmanitaras ,

I see 2 options for this:

  1. Create a plugin that copies the process on deployment, changes the process definition key and add/remove async markers. This is like creating a process manually but without the manual work.

  2. Create your custom logic like this:

public Response executeProcess(InputData input, boolean responseRequired){
  ProcessInstance pi = runtimeService.startProcessInstanceByKey("myKey",input.asMap());
  if(responseRequired){
    // here, you can implement waiting for pi complete or incident. If incident occurs, delete pi and return error state
    return waitForProcessInstanceComplete(pi);
  } else {
    return createResponse(pi);
  }
}

Jonathan

2 Likes

Great idea @jonathan.lukas
I’m currently trying to implement the first option that you mentioned. Do you know of a way to create a copy of a process (bpmn) on deployment (specifically when creating the .jar file, since only the jar file will be deployed)

Thanks for your prompt responses!

2 Likes

Hello @nmanitaras ,

this snippets needs to be inserted in a Deployer which is then registered to the process engine as customPreDeployer:

Set<ResourceEntity> resourcesToAdd = new HashSet<>();
    deployment
        // get all resources
        .getResources()
        .entrySet()
        .stream()
        // only inspect bpmn files
        .filter(e -> e
            .getKey()
            .endsWith(".bpmn"))
        .map(Entry::getValue)
        .forEach(resourceEntity -> {
          // create a model instance to make inspection easier
          BpmnModelInstance modelInstance = Bpmn.readModelFromStream(new ByteArrayInputStream(resourceEntity.getBytes()));
          // only handle files that contain the process id I want to look into
          if (modelInstance
              .getModelElementsByType(Process.class)
              .stream()
              .anyMatch(process -> process
                  .getId()
                  .equals("my-sync-process"))) {
            modelInstance
                .getModelElementsByType(Process.class)
                .stream()
                .filter(process -> process
                    .getId()
                    .equals("my-sync-process"))
                // set another process id
                .peek(process -> process.setId(process.getId() + "Async"))
                // set another process name
                .peek(process -> process.setName(process.getName() + " Async"))
                .forEach(process -> process
                    .getFlowElements()
                    .stream()
                    .filter(flowElement -> FlowNode.class.isAssignableFrom(flowElement.getClass()))
                    .map(FlowNode.class::cast)
                    // for each flownode, set async before and async after
                    .peek(flownode -> runNeverThrow(() -> flownode.setCamundaAsyncAfter(true)))
                    .forEach(flownode -> runNeverThrow(() -> flownode.setCamundaAsyncBefore(true))));
            // now, create a new entity, copy most of the data from the handled one and only place the new bytes and name
            ResourceEntity resource = new ResourceEntity();
            resource.setBytes(Bpmn
                .convertToString(modelInstance)
                .getBytes());
            resource.setCreateTime(resourceEntity.getCreateTime());
            resource.setDeploymentId(resourceEntity.getDeploymentId());
            resource.setGenerated(true);
            resource.setId(Context
                .getCommandContext()
                .getProcessEngineConfiguration()
                .getIdGenerator()
                .getNextId());
            resource.setName("async-" + resourceEntity.getName());
            resource.setTenantId(resourceEntity.getDeploymentId());
            resource.setType(resourceEntity.getType());
            // add the resource to the current deployment
            resourcesToAdd.add(resource);
          }
        });
    resourcesToAdd.forEach(deployment::addResource);

Please do not use it directly, but test it in depth before.

Jonathan

2 Likes

Thank you so much for the code snippet Jonathan!
I will check-it out and update

1 Like