Message correlation from process to event subprocess

Hi everyone, I hope you can help me with this issue.

We are trying to correlate a message via a intermediate message throw event to a message start event in an event subprocess (see attached example). The message correlation seems to work, at least i don’t get a “can’t correlate message” Exception (which i get when i e.g. set a wrong message name) but somehow the tasks in the subprocess are not executed. Event worse: also the parent process instance is terminated afterwards.
When i look at the activity history, the last executed Task is the intermediate throw event.
Is there something wrong with this kind of construct or is that a bug ?

Here are some files to illustrate the problem (BPMN, Unittest, Delegate).

ProcessWithMessageEventSubprocess.bpmn (7.2 KB)

Test:

import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;

import java.util.List;

import org.camunda.bpm.engine.ManagementService;
import org.camunda.bpm.engine.ProcessEngine;
import org.camunda.bpm.engine.repository.DeploymentBuilder;
import org.camunda.bpm.engine.runtime.Job;
import org.camunda.bpm.engine.runtime.ProcessInstance;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.test.context.support.AnnotationConfigContextLoader;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(loader=AnnotationConfigContextLoader.class)
public class TestProzessMitEventSubprozess {
    

    @Configuration
    @Import(value={ProzessTestConfig.class})
    static class Config{
        @Bean
        public SendMessageDelegate sendMessageDelegate(){
            return new SendMessageDelegate();
        }
        
    }
    
    @Autowired
    private ProcessEngine engine;

    
    @Before
    public void setUp() {
        deploy("ProcessWithMessageEventSubprocess.bpmn");
    }
    
    @Test
    public void test(){
        ProcessInstance processInstance = engine.getRuntimeService().startProcessInstanceByKey("processWithMessageEventSubprocess");
        
        System.out.println("!!!!!!!!!!!!!! Task 1 !!!!!!!!!!!!!!");
        
        printActiveJobs();
        printActivityInstanceTree(processInstance);
        printActivityHistory();
        
        executeAsyncTask("task1");
        
        
        System.out.println("!!!!!!!!!!!!!! Message !!!!!!!!!!!!!!");
        printActiveJobs();
        printActivityInstanceTree(processInstance);
        printActivityHistory();
        
        executeAsyncTask("messageToSubprocess");
        
        System.out.println("!!!!!!!!!!!!!! Task im Subprozess !!!!!!!!!!!!!!");
        printProcessInstances();
        printActiveJobs();
        printActivityInstanceTree(processInstance);
        printActivityHistory();
        
        executeAsyncTask("subprocessTask");
        
        
        System.out.println("!!!!!!!!!!!!!! Task 2 !!!!!!!!!!!!!!");
        printActiveJobs();
        printActivityInstanceTree(processInstance);
        printActivityHistory();
        
        executeAsyncTask("task2");
        assertThat(engine.getRuntimeService().createProcessInstanceQuery().processInstanceId(processInstance.getId()).list().size(), is(0));
    }

    private void printProcessInstances() {
        System.out.println("---------------- Process Instances ----------------");
        System.out.println(engine.getRuntimeService().createProcessInstanceQuery().list());
        System.out.println("----------------------------------------------------");
    }


    public void printActivityInstanceTree(ProcessInstance processInstance) {
        System.out.println("---------------- Activity Instance Tree ----------------");
        System.out.println(engine.getRuntimeService().getActivityInstance(processInstance.getId()));
        System.out.println("----------------------------------------------------");
    }


    public void printActiveJobs() {
        System.out.println("---------------- Pending Jobs ----------------");
        engine.getManagementService().createJobQuery().active().list()
            .stream()
            .forEach(job -> System.out.println(job.toString()));
        System.out.println("----------------------------------------------");
    }


    public void printActivityHistory() {
        System.out.println("---------------- Activity History ----------------");
        engine.getHistoryService().createHistoricActivityInstanceQuery().orderByHistoricActivityInstanceStartTime().desc().list()
            .stream()
            .forEach(activityInstance -> System.out.println(activityInstance.toString()));
        System.out.println("--------------------------------------------------");
    }

    
    private void executeAsyncTask(String activityId){
        ManagementService managementService = engine.getManagementService();
        List<Job> jobs = managementService.createJobQuery().active().activityId(activityId).list();
        assertThat(jobs.size(), is(1));
        
        managementService.executeJob(jobs.get(0).getId());
    }
    
    private void deploy(String... resources) {
        
        
        DeploymentBuilder deploymentBuilder = engine.getRepositoryService().createDeployment();
        for (String resource: resources) {
            deploymentBuilder.addClasspathResource(resource);
        }
        deploymentBuilder.deploy();
    }


}

Delegate:

import org.camunda.bpm.engine.delegate.DelegateExecution;

public class SendMessageDelegate {

    public void sendMessage(DelegateExecution execution, String messageName){
        execution.getProcessEngineServices().getRuntimeService().createMessageCorrelation(messageName).processInstanceId(execution.getProcessInstanceId()).correlate();
    }
}

This is the resulting sysout that is printed by the test:

!!!!!!!!!!!!!! Task 1 !!!!!!!!!!!!!!
---------------- Pending Jobs ----------------
MessageEntity[repeatnull, id=11, revision=1, duedate=null, lockOwner=null, lockExpirationTime=null, executionId=8, processInstanceId=8, isExclusive=true, retries=1, jobHandlerType=async-continuation, jobHandlerConfiguration=transition-create-scope, exceptionByteArray=null, exceptionByteArrayId=null, exceptionMessage=null, deploymentId=1]
----------------------------------------------
---------------- Activity Instance Tree ----------------
└── processWithMessageEventSubprocess:1:3=>8
    └── transition to/from task1:8

----------------------------------------------------
---------------- Activity History ----------------
HistoricActivityInstanceEntity[activityId=StartEvent_1, activityName=null, activityType=startEvent, activityInstanceId=null, activityInstanceState=0, parentActivityInstanceId=8, calledProcessInstanceId=null, calledCaseInstanceId=null, taskId=null, taskAssignee=null, durationInMillis=2, startTime=Wed Aug 31 15:50:42 CEST 2016, endTime=Wed Aug 31 15:50:42 CEST 2016, eventType=null, executionId=8, processDefinitionId=processWithMessageEventSubprocess:1:3, processInstanceId=8]
--------------------------------------------------
!!!!!!!!!!!!!! Message !!!!!!!!!!!!!!
---------------- Pending Jobs ----------------
MessageEntity[repeatnull, id=13, revision=1, duedate=null, lockOwner=null, lockExpirationTime=null, executionId=8, processInstanceId=8, isExclusive=true, retries=1, jobHandlerType=async-continuation, jobHandlerConfiguration=transition-create-scope, exceptionByteArray=null, exceptionByteArrayId=null, exceptionMessage=null, deploymentId=1]
----------------------------------------------
---------------- Activity Instance Tree ----------------
└── processWithMessageEventSubprocess:1:3=>8
    └── transition to/from messageToSubprocess:8

----------------------------------------------------
---------------- Activity History ----------------
HistoricActivityInstanceEntity[activityId=task1, activityName=Task 1, activityType=task, activityInstanceId=null, activityInstanceState=0, parentActivityInstanceId=8, calledProcessInstanceId=null, calledCaseInstanceId=null, taskId=null, taskAssignee=null, durationInMillis=0, startTime=Wed Aug 31 15:50:43 CEST 2016, endTime=Wed Aug 31 15:50:43 CEST 2016, eventType=null, executionId=8, processDefinitionId=processWithMessageEventSubprocess:1:3, processInstanceId=8]
HistoricActivityInstanceEntity[activityId=StartEvent_1, activityName=null, activityType=startEvent, activityInstanceId=null, activityInstanceState=0, parentActivityInstanceId=8, calledProcessInstanceId=null, calledCaseInstanceId=null, taskId=null, taskAssignee=null, durationInMillis=2, startTime=Wed Aug 31 15:50:42 CEST 2016, endTime=Wed Aug 31 15:50:42 CEST 2016, eventType=null, executionId=8, processDefinitionId=processWithMessageEventSubprocess:1:3, processInstanceId=8]
--------------------------------------------------
!!!!!!!!!!!!!! Task im Subprozess !!!!!!!!!!!!!!
---------------- Process Instances ----------------
[]
----------------------------------------------------
---------------- Pending Jobs ----------------
----------------------------------------------
---------------- Activity Instance Tree ----------------
null
----------------------------------------------------
---------------- Activity History ----------------
HistoricActivityInstanceEntity[activityId=messageToSubprocess, activityName=Message to Subprocess, activityType=intermediateMessageThrowEvent, activityInstanceId=null, activityInstanceState=2, parentActivityInstanceId=8, calledProcessInstanceId=null, calledCaseInstanceId=null, taskId=null, taskAssignee=null, durationInMillis=12, startTime=Wed Aug 31 15:50:43 CEST 2016, endTime=Wed Aug 31 15:50:43 CEST 2016, eventType=null, executionId=8, processDefinitionId=processWithMessageEventSubprocess:1:3, processInstanceId=8]
HistoricActivityInstanceEntity[activityId=task1, activityName=Task 1, activityType=task, activityInstanceId=null, activityInstanceState=0, parentActivityInstanceId=8, calledProcessInstanceId=null, calledCaseInstanceId=null, taskId=null, taskAssignee=null, durationInMillis=0, startTime=Wed Aug 31 15:50:43 CEST 2016, endTime=Wed Aug 31 15:50:43 CEST 2016, eventType=null, executionId=8, processDefinitionId=processWithMessageEventSubprocess:1:3, processInstanceId=8]
HistoricActivityInstanceEntity[activityId=StartEvent_1, activityName=null, activityType=startEvent, activityInstanceId=null, activityInstanceState=0, parentActivityInstanceId=8, calledProcessInstanceId=null, calledCaseInstanceId=null, taskId=null, taskAssignee=null, durationInMillis=2, startTime=Wed Aug 31 15:50:42 CEST 2016, endTime=Wed Aug 31 15:50:42 CEST 2016, eventType=null, executionId=8, processDefinitionId=processWithMessageEventSubprocess:1:3, processInstanceId=8]

Thanks in advance for your help,
Jan

Hi @JHoelter,

you have asyncBefore on task1. Are you executing it before you expect correlation?

Cheers,
Askar

Hi Askar,

yes. As you can see in the sysout, the process arrives at the intermediate message throw event.

!!!!!!!!!!!!!! Message !!!!!!!!!!!!!!
---------------- Pending Jobs ----------------
MessageEntity[repeatnull, id=13, revision=1, duedate=null, lockOwner=null, lockExpirationTime=null, executionId=8, processInstanceId=8, isExclusive=true, retries=1, jobHandlerType=async-continuation, jobHandlerConfiguration=transition-create-scope, exceptionByteArray=null, exceptionByteArrayId=null, exceptionMessage=null, deploymentId=1]
----------------------------------------------
---------------- Activity Instance Tree ----------------
└── processWithMessageEventSubprocess:1:3=>8
    └── transition to/from messageToSubprocess:8

The problem is not the correlation, it’s that the tasks in the subprocess are not executed although the message is correlated.

Regards
Jan

Hey Jan,

You are using interrupting subProcess that means, that you won’t get to task2 in any case. Though it is strange that you can’t see execution of subprocess happening. Could you may be put your unit test in a separate repository on github so that I could try to run it? You can use unit test repository as example https://github.com/camunda/camunda-engine-unittest.

Cheers,
Askar

Hey Askar,

thanks for your reply. That’s actually very important to know. I assumed “interrupting” means blocking the throw event and then continuing at the same position after the subprocess is finished.
It’s a whole other discussion then so let me change my question.

To give you some context: what we want to achieve in our real process is this.
We have a sequence of three service tasks that have to be executed at many different paths in the process. There are just some variations on the input of the tasks. So we want to be DRY here and extract the sequence of tasks to be reusable everywhere in the process.

Here’s a little example:

If the way we chose (event subprocess) won’t be working. Do you have any idea how to do it ?
Our process is much more complex so joining and forking again would totally mess up the readability.

Regards,
Jan

About the Unittest: that will take me some time as it has a strong dependency on our Spring Configuration right now. I’ll try to set something up in the next few days.
Regards
Jan

Hi Jan,

you can change start event to be not interrupting, but if then you have to implement wait yourself if subprocess has to be finished before next task is starting. And if you do that, then terminating end event is not an option for you.

Does that help?

Hey Jan,

if I get you right you have a process flow which should be reused on different places in your process model.
So like reusing some part of the process model.

In this case you can use a call activity.
With the help of the call activity you are able to reuse an existing process definition. Call activities execute the given process model, after that they will return to the calling process and proceed with the execution of the process.
See the documentation for more information.

Hope it helps.

Best regards,
Chris

1 Like

Hi Askar,

yes actively waiting is a good hint, thanks. I would send a message on the end event of the subprocess back to the parent process then. That should work, right ?

I already tried the non-interrupting start event but unfortunately that gives me an NPE. Seems like that is a known bug:
https://groups.google.com/forum/#!topic/camunda-bpm-users/V2s3HRirPoU

Actually I just found out that even Bernd tried to do (almost) the same thing:
https://groups.google.com/forum/?utm_medium=email&utm_source=footer#!msg/camunda-bpm-users/LeeLSK-eM04/JwdyC2aqBgAJ

So i’ll go with his solution and add the “wait for message from subprocess” part you suggested. It’s kinda ugly but it will do the trick i guess.

Thanks and regards,
Jan

Hi Chris,
thanks for your hint.
I know about call activities but i wanted to avoid extracting an independent process for that case. It somewhat fells like bad design for me. Imho call activites are more about sharing and reusing process flows among different process models than about reusing them in a single processs model. At least that is how we use them in our project. I actually think it’s a gap in the bpmn spec that it does not allow reusing internal subprocesses.
But you’re right, this solution would work, so thanks for the hint.
Regards
Jan

@JHoelter,

I am not sure if this issue in google groups is up to date as well as ticket, we have a unit tests for that as I see and it works. org.camunda.bpm.engine.test.bpmn.event.message.MessageEventSubprocessTest#testNonInterruptingEventSubprocessListenersInvoked

Cheers,
Askar

@aakhmerov
We’re still on 7.4. Maybe this works on 7.5 ? Would be great because you can’t have a non-interrupting error start event. So another dead end :frowning:.

Hey Jan,

I think call activity is the right way, also the spec says something similar:

The BPMN 2.0 Call Activity corresponds to the Reusable Sub-Process of BPMN 1.2. A BPMN 2.0 Sub-Process
corresponds to the Embedded Sub-Process of BPMN 1.2 (see the previous section).

See BPMN on page 183.
Also see the camunda reference of the call activity. It contains a good example, on which different pools are called with the help of a call activity.

Greetings,
Chris

Hey Chris,

ok i am convinced :smiley:.
Still feels a little odd to me. Somewhat like having to extract a class and a method and making that public where you naturally would just extract a private method within your current class.
But I guess I am being too much developer and too little BPMN Spec guy here ;).

Anyway, thanks a lot for your help guys.

Regards
Jan

Hey Jan,

I agree with @Zelldon, it would be cleaner to do it with call activity.

Cheers,
Askar

Hi @aakhmerov,

I set up a repository to reproduce the NPE that occurs when correlating the message: https://github.com/jlhoelter/camunda-bug-eventsubprocess-message

This refers to CAM-3802.

Although I agree that the call activity is the “clean” choice in terms of the BPMN spec I am still not very happy with it. Not only for the reasons i already mentioned but now i also have some weird process i can see in my cockpit that makes no sense without the context it is used in ;).

Maybe you consider fixing CAM-3802 some day.

Kind regards
Jan