There is only one execution for the process instance itself and none for the service task. In addition the instance is still running and there is no way to signal the other service task and complete the workflow.
To illustrate the issue here is a workflow that has parallel service tasks:
package com.resilient.workflow.camunda;
import org.camunda.bpm.engine.impl.bpmn.behavior.TaskActivityBehavior;
import org.camunda.bpm.engine.impl.pvm.delegate.ActivityExecution;
public class ServiceTaskImpl extends TaskActivityBehavior {
@Override
public void execute(final ActivityExecution execution) {
}
@Override
public void signal(ActivityExecution execution, String signalName, Object data) throws Exception {
leave(execution);
}
}
And the test:
package com.resilient.workflow.camunda;
import static org.hamcrest.collection.IsCollectionWithSize.hasSize;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertThat;
import org.camunda.bpm.engine.impl.persistence.entity.ExecutionEntity;
import org.camunda.bpm.engine.runtime.Execution;
import org.camunda.bpm.engine.runtime.ProcessInstance;
import org.camunda.bpm.engine.test.Deployment;
import org.camunda.bpm.engine.test.ProcessEngineRule;
import org.junit.Rule;
import org.junit.Test;
import java.util.List;
import java.util.Objects;
public class SignalServiceTaskConcurrentPathsTest {
@Rule
public ProcessEngineRule processEngineRule = new ProcessEngineRule("camunda.cfg.xml");
@Deployment(resources = {"signalservicetaskconcurrentpaths.bpmn20.xml"})
@Test
public void testSignalServiceTaskOnConcurrentPaths() throws InterruptedException {
//Given a workflow with parallel service tasks
ProcessInstance processInstance = processEngineRule.getRuntimeService().startProcessInstanceByKey("twoservicetasks");
assertThat(processInstance.isEnded(), is(false));
//when the workflow is run
List<Execution> executions = processEngineRule.getRuntimeService().createExecutionQuery().processInstanceId(processInstance.getProcessInstanceId()).list();
//then there are three executions: one for each service task and one for the process instance
assertThat(executions, hasSize(3));
//when one of the service tasks is signalled
ExecutionEntity task_01qgj19 = executions.stream().map(ExecutionEntity.class::cast)
.filter(entity -> Objects.equals(entity.getActivityId(), "Task_01qgj19"))
.findFirst().get();
processEngineRule.getRuntimeService().signal(task_01qgj19.getId());
//then there are two executions: one for the remaining service task and one for the process instance
executions = processEngineRule.getRuntimeService().createExecutionQuery().processInstanceId(processInstance.getProcessInstanceId()).list();
assertThat(executions, hasSize(2));
}
}
It is very likely that
leave()
is not the correct method to use in the service task implementation, so let me know if something else should be used there to progress the service task.
when you signal the first task, related execution goes straight to the “end” node. After this, one execution still left, related with second service task, and you can signal it. This works for me with your model:
@Test @Deployment(resources = {“signalservicetaskconcurrentpaths.bpmn”})
public void threeServiceTasksAndAGateway() {
//Given a workflow with parallel service tasks
ProcessInstance processInstance = runtimeService.startProcessInstanceByKey("twoservicetasks");
assertFalse(processInstance.isEnded());
Execution executionEntity = runtimeService.createExecutionQuery().processInstanceId(processInstance.getProcessInstanceId()).activityId("Task_01qgj19").singleResult();
runtimeService.signal(executionEntity.getId());
executionEntity = runtimeService.createExecutionQuery().processInstanceId(processInstance.getProcessInstanceId()).activityId("Task_0i098fv").singleResult();
assertNotNull(executionEntity);
runtimeService.signal(executionEntity.getId());
}
If you want the execution to “wait” for the other task to be signaled, consider using parallel gateways:
Thank you for the reply. This makes perfect sense.
I wanted to be able to capture, via an event listener, when the process instance has ended, which in this case is immediately after the first signal. I registered an event listener for the ExecutionListener.EVENTNAME_END. However I don’t get the callback as expected after the signal.
Am I registering for the event correctly?
Thanks,
Ben
Here is a test that illustrates the problem:
package com.resilient.workflow.camunda;
import static org.hamcrest.collection.IsCollectionWithSize.hasSize;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertThat;
import com.google.common.collect.Lists;
import org.camunda.bpm.engine.ProcessEngineConfiguration;
import org.camunda.bpm.engine.delegate.DelegateExecution;
import org.camunda.bpm.engine.delegate.ExecutionListener;
import org.camunda.bpm.engine.impl.bpmn.parser.AbstractBpmnParseListener;
import org.camunda.bpm.engine.impl.cfg.ProcessEngineConfigurationImpl;
import org.camunda.bpm.engine.impl.persistence.entity.ExecutionEntity;
import org.camunda.bpm.engine.impl.persistence.entity.ProcessDefinitionEntity;
import org.camunda.bpm.engine.impl.util.xml.Element;
import org.camunda.bpm.engine.runtime.Execution;
import org.camunda.bpm.engine.runtime.ProcessInstance;
import org.camunda.bpm.engine.test.Deployment;
import org.camunda.bpm.engine.test.ProcessEngineRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.Description;
import java.util.List;
import java.util.Objects;
public class SignalServiceTaskConcurrentPathsTest {
public class CustomEngineRule extends ProcessEngineRule {
public CustomEngineRule() {
super(false /* don't assert if database is not cleaned up, just clean it up */);
}
@Override
protected void initializeProcessEngine() {
ProcessEngineConfigurationImpl processEngineConfiguration =
ResilientStandaloneProcessEngineConfiguration.create()
.setJdbcUrl("jdbc:h2:mem:camunda;DB_CLOSE_DELAY=1000")
.setJdbcDriver("org.h2.Driver")
.setJdbcUsername("sa")
.setJdbcPassword("");
processEngineConfiguration.setDatabaseSchemaUpdate(ProcessEngineConfiguration.DB_SCHEMA_UPDATE_TRUE);
processEngineConfiguration.setCustomPostBPMNParseListeners(Lists.newArrayList(new CustomListener()));
super.processEngine = processEngineConfiguration.buildProcessEngine();
}
@Override
public void finished(Description description) {
super.finished(description);
}
}
@Rule
public ProcessEngineRule processEngineRule = new CustomEngineRule();
public class CustomListener extends AbstractBpmnParseListener {
@Override
public void parseProcess(Element processElement, ProcessDefinitionEntity processDefinition) {
processDefinition.addListener(ExecutionListener.EVENTNAME_END, new CustomEndListener());
}
}
private boolean instancedEnded = false;
public class CustomEndListener implements ExecutionListener {
@Override
public void notify(DelegateExecution execution) {
instancedEnded = true;
}
}
@Deployment(resources = {"signalservicetaskconcurrentpaths.bpmn20.xml"})
@Test
public void testSignalServiceTaskOnConcurrentPaths() throws InterruptedException {
//Given a workflow with parallel service tasks
ProcessInstance processInstance = processEngineRule.getRuntimeService().startProcessInstanceByKey("twoservicetasks");
assertThat(processInstance.isEnded(), is(false));
//when the workflow is run
List<Execution> executions = processEngineRule.getRuntimeService().createExecutionQuery().processInstanceId(processInstance.getProcessInstanceId()).list();
//then there are three executions: one for each service task and one for the process instance
assertThat(executions, hasSize(3));
//when one of the service tasks is signalled
ExecutionEntity task_01qgj19 = executions.stream().map(ExecutionEntity.class::cast)
.filter(entity -> Objects.equals(entity.getActivityId(), "Task_01qgj19"))
.findFirst().get();
processEngineRule.getRuntimeService().signal(task_01qgj19.getId());
//then make sure we were notified that the instance ended.
assertThat(instancedEnded, is(true));
}
}