Unit Test Complete Job is failing

Environment : Java 17 Spring Boot, Junit5
Error in console:

java.lang.AssertionError: Expected element with id task1 to be passed 1 times, but was 0
	at io.camunda.zeebe.process.test.assertions.ProcessInstanceAssert.hasPassedElement(ProcessInstanceAssert.java:225)
	at io.camunda.zeebe.process.test.assertions.ProcessInstanceAssert.hasPassedElement(ProcessInstanceAssert.java:188)
	at tv.cadent.workflow.example.Task1ServiceTest.task1ServiceCompleteTest(Task1ServiceTest.java:124)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:725)
	at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
	at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
	at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)
	at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84)
	at org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)
	at org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
	at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)
	at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98)
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:214)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:210)
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:135)
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:66)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:151)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
	at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
	at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
	at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
	at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
	at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
	at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:35)
	at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)
	at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:54)
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:107)
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88)
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:54)
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:67)
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:52)
	at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:114)
	at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:95)
	at org.junit.platform.launcher.core.DefaultLauncherSession$DelegatingLauncher.execute(DefaultLauncherSession.java:91)
	at org.junit.platform.launcher.core.SessionPerRequestLauncher.execute(SessionPerRequestLauncher.java:60)
	at org.eclipse.jdt.internal.junit5.runner.JUnit5TestReference.run(JUnit5TestReference.java:98)
	at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:40)
	at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:529)
	at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:756)
	at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:452)
	at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:210)

Test Class

@WorkflowServiceZeebeSpringTest
@TestMethodOrder(OrderAnnotation.class)
@TestInstance(Lifecycle.PER_CLASS)
@ExtendWith(SpringExtension.class)
public class Task1ServiceTest {

	private static final Logger log = LoggerFactory.getLogger(Task1Service.class);

	@MockBean private ZeebeTestEngineProxy zeebeTestEngineProxy;
	@MockBean private ZeebeClientProxy zeebeClientProxy;
	@Autowired private ZeebeTestEngine zeebeTestEngine;

	@InjectMocks private Task1Service task1Service;

	private Map<String, Object> variables;
	private ZeebeClient zeebeClient;

	private static final String PROCESS_ID = "generic-workflow-task1-test-process";
	private static final String TASK1_ELEMENT_ID = "task1";
	private static final String TASK1_SERVICE_JOB_TYPE = "task1-service-worker";

	@BeforeAll
	public void beforeAllInitialize() {
		zeebeClient = zeebeTestEngine.createClient();
		zeebeClient.newDeployResourceCommand().addResourceFromClasspath("bpmn/example/task1-test.bpmn").send().join();
	}

	@BeforeEach
	public void beforeEachInitialize() {
		
	}

	@AfterAll
	public void afterAllClose() {
		zeebeClient.close();
	}

	@Order(1)
	@Test
	public void task1ServiceFailTest() throws Exception {
		try {
			// Variables for testing failure of tast1 service
			Map<String,Object> variables = new HashMap<>();
			variables.put("name", "HappyProcess");
			variables.put("taskToFail", "Task1");

			// start a process instance
			ProcessInstanceEvent processInstanceFail = startProcessInstance(variables);
			assertThat(processInstanceFail).isActive();
			// Activated Job
			ActivateJobsResponse response = zeebeClient.newActivateJobsCommand().jobType(TASK1_SERVICE_JOB_TYPE)
					.maxJobsToActivate(1).send().join();

			ActivatedJob activatedJob = getActivatedJob(response);
			JobAssert assertions = BpmnAssert.assertThat(activatedJob);
			assertions.hasNoIncidents();
			task1Service.task1Service(activatedJob, zeebeClient);
			assertThat(processInstanceFail).hasNotPassedElement(TASK1_ELEMENT_ID);
			assertThat(processInstanceFail).isNotCompleted();
			cancelProcessInstance(processInstanceFail);
		} catch (Exception e) {
			log.error("Error in task1 service fail test " + e.getMessage());
		}

	}

	@Order(2)
	@Test
	public void task1ServiceCompleteTest() throws Exception {
		try {
			// Variables for testing completion of tast1 service
			Map<String,Object> variables = new HashMap<>();
			variables.put("name", "HappyProcess");
			variables.put("taskToFail", ",");
			// start a process instance
			ProcessInstanceEvent processInstance = startProcessInstance(variables);
			assertThat(processInstance).isActive();
			BpmnAssert.assertThat(processInstance);
			ActivateJobsResponse response = zeebeClient.newActivateJobsCommand().jobType(TASK1_SERVICE_JOB_TYPE)
					.maxJobsToActivate(1).send().join();
			ActivatedJob activatedJob = getActivatedJob(response);
			
			assertThat(processInstance).hasNotPassedElement(TASK1_ELEMENT_ID);
			task1Service.task1Service(activatedJob, zeebeClient);
			int duration = 10000;// wait till Task1 Service run
			Thread.sleep(duration);
			assertThat(processInstance).hasNoIncidents();
			assertThat(processInstance).hasPassedElement(TASK1_ELEMENT_ID);
			assertThat(processInstance).isCompleted();
			cancelProcessInstance(processInstance);
		} catch (Exception e) {
			log.error("Error in task1 service comelete test " + e.getMessage());
		}

	}

	private ActivatedJob getActivatedJob(ActivateJobsResponse response) throws Exception {		
		int duration = 1000;
		while(response.getJobs().size()<1) {
			Thread.sleep(duration);
			duration+=1000;
			if(duration == 10000)
				throw new Exception("Job waiting period exceeded");
		}
		return response.getJobs().get(0);
	}

	private ProcessInstanceEvent startProcessInstance(Map<String, Object> variables) {
		return zeebeClient.newCreateInstanceCommand().bpmnProcessId(PROCESS_ID).latestVersion().variables(variables)
				.send().join();
	}
	
	private void cancelProcessInstance(ProcessInstanceEvent processInstance) {
		zeebeClient.newCancelInstanceCommand(processInstance.getProcessInstanceKey()).send();
	}

}

Service Task Class

@Component
public class Task1Service {

	private Logger log = LoggerFactory.getLogger(this.getClass());

	/*
	 * Simple Zeebe Client Asynchronous, short run and provide immediate response
	 * Here type is reference from BPMN model, type must be unique within BPMN model
	 * autoComplete helps to complete job without any intervention or code and the
	 * Spring integration will take care of job completion otherwise job handler
	 * code has to also complete the job.
	 */

	@ZeebeWorker(type = "task1-service-worker")
	public void task1Service(final ActivatedJob job, final JobClient client) throws Exception {
		try {
			log.info("Task1 Service is running, job key: {}", job.getKey());

			Map<String, Object> variableMap = job.getVariablesAsMap();
			Object taskToFail = variableMap.get("taskToFail");

			if (taskToFail != null) {
				boolean isFail = taskToFail.toString().trim().equalsIgnoreCase("task1");
				if (!isFail) {
					int sleepDurationSecs = 10;
					log.info("Simulating short run by sleeping for {} seconds...", sleepDurationSecs);
					TimeUnit.SECONDS.sleep(sleepDurationSecs);

					client.newCompleteCommand(job).send();
					log.info("Task1 Service is completed");
				} else {
					client.newFailCommand(job).retries(0).send();
					log.info("Task1 Service is failed");
				}
			} else {
				throw new ExampleServiceException("taskToFail variable is not available");
			}
		} catch (InterruptedException ie) {
			log.error("Error in Task1 Service. Sleeping is interrupted.");
			throw ie;
		} catch (ExampleServiceException ese) {
			log.error("Error in Task1 Service.");
			throw ese;
		} catch (Exception e) {
			log.error("Error in Task1 Service.");
			throw new ExampleServiceException("Error in Task1 Service", e);
		}
	}
}


BPMN Model

<?xml version="1.0" encoding="UTF-8"?>
<bpmn:definitions xmlns:bpmn="http://www.omg.org/spec/BPMN/20100524/MODEL" xmlns:bpmndi="http://www.omg.org/spec/BPMN/20100524/DI" xmlns:dc="http://www.omg.org/spec/DD/20100524/DC" xmlns:modeler="http://camunda.org/schema/modeler/1.0" xmlns:di="http://www.omg.org/spec/DD/20100524/DI" xmlns:zeebe="http://camunda.org/schema/zeebe/1.0" xmlns:camunda="http://camunda.org/schema/1.0/bpmn" id="Definitions_1" targetNamespace="http://bpmn.io/schema/bpmn" exporter="Camunda Web Modeler" exporterVersion="174e755" modeler:executionPlatform="Camunda Cloud" modeler:executionPlatformVersion="8.0.0" camunda:diagramRelationId="b3624121-4554-4eb5-b30e-5010e463abb3">
  <bpmn:process id="generic-workflow-task1-test-process" name="Task1 Service Test Process" isExecutable="true">
    <bpmn:startEvent id="process-start-event" name="Process Start Event">
      <bpmn:outgoing>Flow_1uhj8re</bpmn:outgoing>
    </bpmn:startEvent>
    <bpmn:sequenceFlow id="Flow_1uhj8re" sourceRef="process-start-event" targetRef="task1" />
    <bpmn:endEvent id="process-end-event" name="Process End Event">
      <bpmn:incoming>Flow_1rwiwoz</bpmn:incoming>
    </bpmn:endEvent>
    <bpmn:sequenceFlow id="Flow_1rwiwoz" sourceRef="task1" targetRef="process-end-event" />
    <bpmn:serviceTask id="task1" name="Task1 Service">
      <bpmn:extensionElements>
        <zeebe:taskDefinition type="task1-service-worker" />
        <zeebe:ioMapping>
          <zeebe:input source="=name" target="name" />
          <zeebe:input source="=taskToFail" target="taskToFail" />
        </zeebe:ioMapping>
      </bpmn:extensionElements>
      <bpmn:incoming>Flow_1uhj8re</bpmn:incoming>
      <bpmn:outgoing>Flow_1rwiwoz</bpmn:outgoing>
    </bpmn:serviceTask>
  </bpmn:process>
  <bpmndi:BPMNDiagram id="BPMNDiagram_1">
    <bpmndi:BPMNPlane id="BPMNPlane_1" bpmnElement="generic-workflow-task1-test-process">
      <bpmndi:BPMNEdge id="Flow_1rwiwoz_di" bpmnElement="Flow_1rwiwoz">
        <di:waypoint x="340" y="118" />
        <di:waypoint x="402" y="118" />
      </bpmndi:BPMNEdge>
      <bpmndi:BPMNEdge id="Flow_1uhj8re_di" bpmnElement="Flow_1uhj8re">
        <di:waypoint x="186" y="118" />
        <di:waypoint x="240" y="118" />
      </bpmndi:BPMNEdge>
      <bpmndi:BPMNShape id="_BPMNShape_StartEvent_2" bpmnElement="process-start-event">
        <dc:Bounds x="150" y="100" width="36" height="36" />
        <bpmndi:BPMNLabel>
          <dc:Bounds x="136" y="143" width="67" height="27" />
        </bpmndi:BPMNLabel>
      </bpmndi:BPMNShape>
      <bpmndi:BPMNShape id="Event_0xrz09m_di" bpmnElement="process-end-event">
        <dc:Bounds x="402" y="100" width="36" height="36" />
        <bpmndi:BPMNLabel>
          <dc:Bounds x="389" y="143" width="63" height="27" />
        </bpmndi:BPMNLabel>
      </bpmndi:BPMNShape>
      <bpmndi:BPMNShape id="Activity_00h0zqo_di" bpmnElement="task1">
        <dc:Bounds x="240" y="78" width="100" height="80" />
      </bpmndi:BPMNShape>
    </bpmndi:BPMNPlane>
  </bpmndi:BPMNDiagram>
</bpmn:definitions>

What’s the issue? how to resolve it? Is My test cases not right? Please provide complete in depth solution.

Can some one explains me what this recordstream output says?

{"valueType":"DEPLOYMENT","key":-1,"position":1,"timestamp":1665586697359,"recordType":"COMMAND","intent":"CREATE","partitionId":1,"rejectionType":"NULL_VAL","rejectionReason":"","brokerVersion":"8.0.6","value":{"resources":[{"resource":"PD94bWwgdmVyc2lvbj0iMS4wIiBlbmNvZGluZz0iVVRGLTgiPz4NCjxicG1uOmRlZmluaXRpb25zIHhtbG5zOmJwbW49Imh0dHA6Ly93d3cub21nLm9yZy9zcGVjL0JQTU4vMjAxMDA1MjQvTU9ERUwiIHhtbG5zOmJwbW5kaT0iaHR0cDovL3d3dy5vbWcub3JnL3NwZWMvQlBNTi8yMDEwMDUyNC9ESSIgeG1sbnM6ZGM9Imh0dHA6Ly93d3cub21nLm9yZy9zcGVjL0RELzIwMTAwNTI0L0RDIiB4bWxuczptb2RlbGVyPSJodHRwOi8vY2FtdW5kYS5vcmcvc2NoZW1hL21vZGVsZXIvMS4wIiB4bWxuczpkaT0iaHR0cDovL3d3dy5vbWcub3JnL3NwZWMvREQvMjAxMDA1MjQvREkiIHhtbG5zOnplZWJlPSJodHRwOi8vY2FtdW5kYS5vcmcvc2NoZW1hL3plZWJlLzEuMCIgeG1sbnM6Y2FtdW5kYT0iaHR0cDovL2NhbXVuZGEub3JnL3NjaGVtYS8xLjAvYnBtbiIgaWQ9IkRlZmluaXRpb25zXzEiIHRhcmdldE5hbWVzcGFjZT0iaHR0cDovL2JwbW4uaW8vc2NoZW1hL2JwbW4iIGV4cG9ydGVyPSJDYW11bmRhIFdlYiBNb2RlbGVyIiBleHBvcnRlclZlcnNpb249IjE3NGU3NTUiIG1vZGVsZXI6ZXhlY3V0aW9uUGxhdGZvcm09IkNhbXVuZGEgQ2xvdWQiIG...
{"valueType":"PROCESS","key":2251799813685249,"position":2,"timestamp":1665586697688,"recordType":"EVENT","intent":"CREATED","partitionId":1,"rejectionType":"NULL_VAL","rejectionReason":"","brokerVersion":"8.0.6","value":{"resource":"PD94bWwgdmVyc2lvbj0iMS4wIiBlbmNvZGluZz0iVVRGLTgiPz4NCjxicG1uOmRlZmluaXRpb25zIHhtbG5zOmJwbW49Imh0dHA6Ly93d3cub21nLm9yZy9zcGVjL0JQTU4vMjAxMDA1MjQvTU9ERUwiIHhtbG5zOmJwbW5kaT0iaHR0cDovL3d3dy5vbWcub3JnL3NwZWMvQlBNTi8yMDEwMDUyNC9ESSIgeG1sbnM6ZGM9Imh0dHA6Ly93d3cub21nLm9yZy9zcGVjL0RELzIwMTAwNTI0L0RDIiB4bWxuczptb2RlbGVyPSJodHRwOi8vY2FtdW5kYS5vcmcvc2NoZW1hL21vZGVsZXIvMS4wIiB4bWxuczpkaT0iaHR0cDovL3d3dy5vbWcub3JnL3NwZWMvREQvMjAxMDA1MjQvREkiIHhtbG5zOnplZWJlPSJodHRwOi8vY2FtdW5kYS5vcmcvc2NoZW1hL3plZWJlLzEuMCIgeG1sbnM6Y2FtdW5kYT0iaHR0cDovL2NhbXVuZGEub3JnL3NjaGVtYS8xLjAvYnBtbiIgaWQ9IkRlZmluaXRpb25zXzEiIHRhcmdldE5hbWVzcGFjZT0iaHR0cDovL2JwbW4uaW8vc2NoZW1hL2JwbW4iIGV4cG9ydGVyPSJDYW11bmRhIFdlYiBNb2RlbGVyIiBleHBvcnRlclZlcnNpb249IjE3NGU3NTUiIG1vZGVsZXI6ZXhlY3V0aW9uUGxhdGZvcm09IkNhbXVuZGEgQ2xvdWQiIG1vZG...
{"valueType":"DEPLOYMENT","key":2251799813685250,"position":3,"timestamp":1665586697688,"recordType":"EVENT","intent":"CREATED","partitionId":1,"rejectionType":"NULL_VAL","rejectionReason":"","brokerVersion":"8.0.6","value":{"resources":[{"resource":"PD94bWwgdmVyc2lvbj0iMS4wIiBlbmNvZGluZz0iVVRGLTgiPz4NCjxicG1uOmRlZmluaXRpb25zIHhtbG5zOmJwbW49Imh0dHA6Ly93d3cub21nLm9yZy9zcGVjL0JQTU4vMjAxMDA1MjQvTU9ERUwiIHhtbG5zOmJwbW5kaT0iaHR0cDovL3d3dy5vbWcub3JnL3NwZWMvQlBNTi8yMDEwMDUyNC9ESSIgeG1sbnM6ZGM9Imh0dHA6Ly93d3cub21nLm9yZy9zcGVjL0RELzIwMTAwNTI0L0RDIiB4bWxuczptb2RlbGVyPSJodHRwOi8vY2FtdW5kYS5vcmcvc2NoZW1hL21vZGVsZXIvMS4wIiB4bWxuczpkaT0iaHR0cDovL3d3dy5vbWcub3JnL3NwZWMvREQvMjAxMDA1MjQvREkiIHhtbG5zOnplZWJlPSJodHRwOi8vY2FtdW5kYS5vcmcvc2NoZW1hL3plZWJlLzEuMCIgeG1sbnM6Y2FtdW5kYT0iaHR0cDovL2NhbXVuZGEub3JnL3NjaGVtYS8xLjAvYnBtbiIgaWQ9IkRlZmluaXRpb25zXzEiIHRhcmdldE5hbWVzcGFjZT0iaHR0cDovL2JwbW4uaW8vc2NoZW1hL2JwbW4iIGV4cG9ydGVyPSJDYW11bmRhIFdlYiBNb2RlbGVyIiBleHBvcnRlclZlcnNpb249IjE3NGU3NTUiIG1vZGVsZXI6ZXhlY3V0aW9uUGxhdGZvcm09IkNhbXVuZ...
{"valueType":"DEPLOYMENT","key":2251799813685250,"position":4,"timestamp":1665586697688,"recordType":"EVENT","intent":"FULLY_DISTRIBUTED","partitionId":1,"rejectionType":"NULL_VAL","rejectionReason":"","brokerVersion":"8.0.6","value":{"resources":[],"processesMetadata":[],"decisionRequirementsMetadata":[],"decisionsMetadata":[]},"sourceRecordPosition":1}
{"valueType":"PROCESS_INSTANCE_CREATION","key":-1,"position":5,"timestamp":1665586697794,"recordType":"COMMAND","intent":"CREATE","partitionId":1,"rejectionType":"NULL_VAL","rejectionReason":"","brokerVersion":"8.0.6","value":{"version":-1,"variables":{"taskToFail":"Task3","name":"HappyProcess"},"bpmnProcessId":"generic-workflow-task1-test-process","processDefinitionKey":0,"processInstanceKey":-1},"sourceRecordPosition":-1}
{"valueType":"VARIABLE","key":2251799813685252,"position":6,"timestamp":1665586697797,"recordType":"EVENT","intent":"CREATED","partitionId":1,"rejectionType":"NULL_VAL","rejectionReason":"","brokerVersion":"8.0.6","value":{"name":"name","value":"\"HappyProcess\"","bpmnProcessId":"generic-workflow-task1-test-process","processDefinitionKey":2251799813685249,"processInstanceKey":2251799813685251,"scopeKey":2251799813685251},"sourceRecordPosition":5}
{"valueType":"VARIABLE","key":2251799813685253,"position":7,"timestamp":1665586697797,"recordType":"EVENT","intent":"CREATED","partitionId":1,"rejectionType":"NULL_VAL","rejectionReason":"","brokerVersion":"8.0.6","value":{"name":"taskToFail","value":"\"Task3\"","bpmnProcessId":"generic-workflow-task1-test-process","processDefinitionKey":2251799813685249,"processInstanceKey":2251799813685251,"scopeKey":2251799813685251},"sourceRecordPosition":5}
{"valueType":"PROCESS_INSTANCE","key":2251799813685251,"position":8,"timestamp":1665586697797,"recordType":"COMMAND","intent":"ACTIVATE_ELEMENT","partitionId":1,"rejectionType":"NULL_VAL","rejectionReason":"","brokerVersion":"8.0.6","value":{"version":1,"bpmnProcessId":"generic-workflow-task1-test-process","elementId":"generic-workflow-task1-test-process","processDefinitionKey":2251799813685249,"processInstanceKey":2251799813685251,"flowScopeKey":-1,"bpmnElementType":"PROCESS","parentElementInstanceKey":-1,"parentProcessInstanceKey":-1},"sourceRecordPosition":5}
{"valueType":"PROCESS_INSTANCE_CREATION","key":2251799813685254,"position":9,"timestamp":1665586697797,"recordType":"EVENT","intent":"CREATED","partitionId":1,"rejectionType":"NULL_VAL","rejectionReason":"","brokerVersion":"8.0.6","value":{"version":1,"variables":{"taskToFail":"Task3","name":"HappyProcess"},"bpmnProcessId":"generic-workflow-task1-test-process","processDefinitionKey":2251799813685249,"processInstanceKey":2251799813685251},"sourceRecordPosition":5}
{"valueType":"PROCESS_INSTANCE","key":2251799813685251,"position":10,"timestamp":1665586697805,"recordType":"EVENT","intent":"ELEMENT_ACTIVATING","partitionId":1,"rejectionType":"NULL_VAL","rejectionReason":"","brokerVersion":"8.0.6","value":{"version":1,"bpmnProcessId":"generic-workflow-task1-test-process","elementId":"generic-workflow-task1-test-process","processDefinitionKey":2251799813685249,"processInstanceKey":2251799813685251,"flowScopeKey":-1,"bpmnElementType":"PROCESS","parentElementInstanceKey":-1,"parentProcessInstanceKey":-1},"sourceRecordPosition":8}
{"valueType":"PROCESS_INSTANCE","key":2251799813685251,"position":11,"timestamp":1665586697805,"recordType":"EVENT","intent":"ELEMENT_ACTIVATED","partitionId":1,"rejectionType":"NULL_VAL","rejectionReason":"","brokerVersion":"8.0.6","value":{"version":1,"bpmnProcessId":"generic-workflow-task1-test-process","elementId":"generic-workflow-task1-test-process","processDefinitionKey":2251799813685249,"processInstanceKey":2251799813685251,"flowScopeKey":-1,"bpmnElementType":"PROCESS","parentElementInstanceKey":-1,"parentProcessInstanceKey":-1},"sourceRecordPosition":8}
{"valueType":"PROCESS_INSTANCE","key":-1,"position":12,"timestamp":1665586697805,"recordType":"COMMAND","intent":"ACTIVATE_ELEMENT","partitionId":1,"rejectionType":"NULL_VAL","rejectionReason":"","brokerVersion":"8.0.6","value":{"version":1,"bpmnProcessId":"generic-workflow-task1-test-process","elementId":"process-start-event","processDefinitionKey":2251799813685249,"processInstanceKey":2251799813685251,"flowScopeKey":2251799813685251,"bpmnElementType":"START_EVENT","parentElementInstanceKey":-1,"parentProcessInstanceKey":-1},"sourceRecordPosition":8}
{"valueType":"PROCESS_INSTANCE","key":2251799813685255,"position":13,"timestamp":1665586697807,"recordType":"EVENT","intent":"ELEMENT_ACTIVATING","partitionId":1,"rejectionType":"NULL_VAL","rejectionReason":"","brokerVersion":"8.0.6","value":{"version":1,"bpmnProcessId":"generic-workflow-task1-test-process","elementId":"process-start-event","processDefinitionKey":2251799813685249,"processInstanceKey":2251799813685251,"flowScopeKey":2251799813685251,"bpmnElementType":"START_EVENT","parentElementInstanceKey":-1,"parentProcessInstanceKey":-1},"sourceRecordPosition":12}
{"valueType":"PROCESS_INSTANCE","key":2251799813685255,"position":14,"timestamp":1665586697807,"recordType":"EVENT","intent":"ELEMENT_ACTIVATED","partitionId":1,"rejectionType":"NULL_VAL","rejectionReason":"","brokerVersion":"8.0.6","value":{"version":1,"bpmnProcessId":"generic-workflow-task1-test-process","elementId":"process-start-event","processDefinitionKey":2251799813685249,"processInstanceKey":2251799813685251,"flowScopeKey":2251799813685251,"bpmnElementType":"START_EVENT","parentElementInstanceKey":-1,"parentProcessInstanceKey":-1},"sourceRecordPosition":12}
{"valueType":"PROCESS_INSTANCE","key":2251799813685255,"position":15,"timestamp":1665586697807,"recordType":"COMMAND","intent":"COMPLETE_ELEMENT","partitionId":1,"rejectionType":"NULL_VAL","rejectionReason":"","brokerVersion":"8.0.6","value":{"version":1,"bpmnProcessId":"generic-workflow-task1-test-process","elementId":"process-start-event","processDefinitionKey":2251799813685249,"processInstanceKey":2251799813685251,"flowScopeKey":2251799813685251,"bpmnElementType":"START_EVENT","parentElementInstanceKey":-1,"parentProcessInstanceKey":-1},"sourceRecordPosition":12}
{"valueType":"PROCESS_INSTANCE","key":2251799813685255,"position":16,"timestamp":1665586697817,"recordType":"EVENT","intent":"ELEMENT_COMPLETING","partitionId":1,"rejectionType":"NULL_VAL","rejectionReason":"","brokerVersion":"8.0.6","value":{"version":1,"bpmnProcessId":"generic-workflow-task1-test-process","elementId":"process-start-event","processDefinitionKey":2251799813685249,"processInstanceKey":2251799813685251,"flowScopeKey":2251799813685251,"bpmnElementType":"START_EVENT","parentElementInstanceKey":-1,"parentProcessInstanceKey":-1},"sourceRecordPosition":15}
{"valueType":"PROCESS_INSTANCE","key":2251799813685255,"position":17,"timestamp":1665586697817,"recordType":"EVENT","intent":"ELEMENT_COMPLETED","partitionId":1,"rejectionType":"NULL_VAL","rejectionReason":"","brokerVersion":"8.0.6","value":{"version":1,"bpmnProcessId":"generic-workflow-task1-test-process","elementId":"process-start-event","processDefinitionKey":2251799813685249,"processInstanceKey":2251799813685251,"flowScopeKey":2251799813685251,"bpmnElementType":"START_EVENT","parentElementInstanceKey":-1,"parentProcessInstanceKey":-1},"sourceRecordPosition":15}
{"valueType":"PROCESS_INSTANCE","key":2251799813685256,"position":18,"timestamp":1665586697817,"recordType":"EVENT","intent":"SEQUENCE_FLOW_TAKEN","partitionId":1,"rejectionType":"NULL_VAL","rejectionReason":"","brokerVersion":"8.0.6","value":{"version":1,"bpmnProcessId":"generic-workflow-task1-test-process","elementId":"Flow_1uhj8re","processDefinitionKey":2251799813685249,"processInstanceKey":2251799813685251,"flowScopeKey":2251799813685251,"bpmnElementType":"SEQUENCE_FLOW","parentElementInstanceKey":-1,"parentProcessInstanceKey":-1},"sourceRecordPosition":15}
{"valueType":"PROCESS_INSTANCE","key":2251799813685257,"position":19,"timestamp":1665586697817,"recordType":"COMMAND","intent":"ACTIVATE_ELEMENT","partitionId":1,"rejectionType":"NULL_VAL","rejectionReason":"","brokerVersion":"8.0.6","value":{"version":1,"bpmnProcessId":"generic-workflow-task1-test-process","elementId":"task1","processDefinitionKey":2251799813685249,"processInstanceKey":2251799813685251,"flowScopeKey":2251799813685251,"bpmnElementType":"SERVICE_TASK","parentElementInstanceKey":-1,"parentProcessInstanceKey":-1},"sourceRecordPosition":15}
{"valueType":"PROCESS_INSTANCE","key":2251799813685257,"position":20,"timestamp":1665586697856,"recordType":"EVENT","intent":"ELEMENT_ACTIVATING","partitionId":1,"rejectionType":"NULL_VAL","rejectionReason":"","brokerVersion":"8.0.6","value":{"version":1,"bpmnProcessId":"generic-workflow-task1-test-process","elementId":"task1","processDefinitionKey":2251799813685249,"processInstanceKey":2251799813685251,"flowScopeKey":2251799813685251,"bpmnElementType":"SERVICE_TASK","parentElementInstanceKey":-1,"parentProcessInstanceKey":-1},"sourceRecordPosition":19}
{"valueType":"VARIABLE","key":2251799813685258,"position":21,"timestamp":1665586697856,"recordType":"EVENT","intent":"CREATED","partitionId":1,"rejectionType":"NULL_VAL","rejectionReason":"","brokerVersion":"8.0.6","value":{"name":"name","value":"\"HappyProcess\"","bpmnProcessId":"generic-workflow-task1-test-process","processDefinitionKey":2251799813685249,"processInstanceKey":2251799813685251,"scopeKey":2251799813685257},"sourceRecordPosition":19}
{"valueType":"VARIABLE","key":2251799813685259,"position":22,"timestamp":1665586697856,"recordType":"EVENT","intent":"CREATED","partitionId":1,"rejectionType":"NULL_VAL","rejectionReason":"","brokerVersion":"8.0.6","value":{"name":"taskToFail","value":"\"Task3\"","bpmnProcessId":"generic-workflow-task1-test-process","processDefinitionKey":2251799813685249,"processInstanceKey":2251799813685251,"scopeKey":2251799813685257},"sourceRecordPosition":19}
{"valueType":"JOB","key":2251799813685260,"position":23,"timestamp":1665586697856,"recordType":"EVENT","intent":"CREATED","partitionId":1,"rejectionType":"NULL_VAL","rejectionReason":"","brokerVersion":"8.0.6","value":{"type":"task1-service-worker","errorMessage":"","variables":{},"bpmnProcessId":"generic-workflow-task1-test-process","deadline":-1,"retries":3,"worker":"","customHeaders":{},"elementId":"task1","processDefinitionKey":2251799813685249,"processInstanceKey":2251799813685251,"elementInstanceKey":2251799813685257,"processDefinitionVersion":1,"errorCode":"","recurringTime":-1,"retryBackoff":0},"sourceRecordPosition":19}
{"valueType":"PROCESS_INSTANCE","key":2251799813685257,"position":24,"timestamp":1665586697856,"recordType":"EVENT","intent":"ELEMENT_ACTIVATED","partitionId":1,"rejectionType":"NULL_VAL","rejectionReason":"","brokerVersion":"8.0.6","value":{"version":1,"bpmnProcessId":"generic-workflow-task1-test-process","elementId":"task1","processDefinitionKey":2251799813685249,"processInstanceKey":2251799813685251,"flowScopeKey":2251799813685251,"bpmnElementType":"SERVICE_TASK","parentElementInstanceKey":-1,"parentProcessInstanceKey":-1},"sourceRecordPosition":19}
{"valueType":"JOB_BATCH","key":-1,"position":25,"timestamp":1665586697868,"recordType":"COMMAND","intent":"ACTIVATE","partitionId":1,"rejectionType":"NULL_VAL","rejectionReason":"","brokerVersion":"8.0.6","value":{"type":"task1-service-worker","jobs":[],"worker":"task1-service-worker","maxJobsToActivate":1,"timeout":300000,"truncated":false,"jobKeys":[]},"sourceRecordPosition":-1}
{"valueType":"JOB_BATCH","key":2251799813685261,"position":26,"timestamp":1665586697873,"recordType":"EVENT","intent":"ACTIVATED","partitionId":1,"rejectionType":"NULL_VAL","rejectionReason":"","brokerVersion":"8.0.6","value":{"type":"task1-service-worker","jobs":[{"type":"task1-service-worker","errorMessage":"","variables":{"taskToFail":"Task3","name":"HappyProcess"},"bpmnProcessId":"generic-workflow-task1-test-process","deadline":1665586997868,"retries":3,"worker":"task1-service-worker","customHeaders":{},"elementId":"task1","processDefinitionKey":2251799813685249,"processInstanceKey":2251799813685251,"elementInstanceKey":2251799813685257,"processDefinitionVersion":1,"errorCode":"","recurringTime":-1,"retryBackoff":0}],"worker":"task1-service-worker","maxJobsToActivate":1,"timeout":300000,"truncated":false,"jobKeys":[2251799813685260]},"sourceRecordPosition":25}
{"valueType":"JOB","key":2251799813685260,"position":27,"timestamp":1665586707901,"recordType":"COMMAND","intent":"COMPLETE","partitionId":1,"rejectionType":"NULL_VAL","rejectionReason":"","brokerVersion":"8.0.6","value":{"type":"","errorMessage":"","variables":{},"bpmnProcessId":"","deadline":-1,"retries":-1,"worker":"","customHeaders":{},"elementId":"","processDefinitionKey":-1,"processInstanceKey":-1,"elementInstanceKey":-1,"processDefinitionVersion":-1,"errorCode":"","recurringTime":-1,"retryBackoff":0},"sourceRecordPosition":-1}
20:28:27.954 [main] WARN  io.camunda.zeebe.spring.test.AbstractZeebeTestExecutionListener - Test failure on 'public void tv.cadent.workflow.example.Task1ServiceTest.task1ServiceCompleteTest() throws java.lang.Exception'. Tracing workflow engine internals on INFO for debugging purposes:
20:28:27.956 [main] INFO  io.camunda.zeebe.process.test.filters.logger.IncidentLogger - 
20:28:27.958 [main] INFO  io.camunda.zeebe.process.test.filters.logger.RecordStreamLogger - 

Why its creating multiple instances??What’s rejectionType NULL_VAL ?

@jonathan.lukas can you help me with this one?

Hello @Prashik_Hingaspure ,

it looks to me like there could be a race condition. In your client code, you send(), but you don’t join() which means that the actual execution might take longer than your next line of code.

This is one main difference between Zeebe and Camunda 7 testing: The Zeebe engine is not running in the same thread, but it has a completely different Thread you need to be aware of.

This can be prevented by using zeebeTestEngine.waitForIdleState() or something similar.

I hope this helps

Jonathan

Hi @jonathan.lukas , this is camunda 8 and I cannot find any relevant example which shows unit testing of service tasks. Is it not possible to test it, if so must be mentioned it clearly. I am working on this cluelessly since last month and no one is ready to provide solution. Even in twitter example Camunda team skip service task from testing.

Hello @Prashik_Hingaspure ,

if you need a guide on how to set up a Unit Test using Zeebe, this would be a nice starting point:

There is a section later in the doc where the service task execution is done “manually”.

I hope this helps

Jonathan

Hello @Prashik_Hingaspure ,

I digged a bit in my own code and found this helper class I wrote. It relies on spring-zeebe-test:

package com.camunda.example;

import io.camunda.zeebe.client.ZeebeClient;
import io.camunda.zeebe.client.api.command.ActivateJobsCommandStep1.ActivateJobsCommandStep3;
import io.camunda.zeebe.client.api.command.CompleteJobCommandStep1;
import io.camunda.zeebe.client.api.command.DeployResourceCommandStep1;
import io.camunda.zeebe.client.api.command.DeployResourceCommandStep1.DeployResourceCommandStep2;
import io.camunda.zeebe.client.api.command.PublishMessageCommandStep1.PublishMessageCommandStep3;
import io.camunda.zeebe.client.api.response.ActivatedJob;
import io.camunda.zeebe.client.api.response.DeploymentEvent;
import io.camunda.zeebe.client.api.response.PublishMessageResponse;
import io.camunda.zeebe.model.bpmn.Bpmn;
import io.camunda.zeebe.process.test.api.ZeebeTestEngine;
import io.camunda.zeebe.process.test.assertions.DeploymentAssert;
import io.camunda.zeebe.process.test.assertions.MessageAssert;
import io.camunda.zeebe.process.test.assertions.ProcessInstanceAssert;
import io.camunda.zeebe.process.test.filters.RecordStream;
import io.camunda.zeebe.protocol.Protocol;
import io.camunda.zeebe.protocol.impl.record.value.processinstance.ProcessInstanceRecord;
import io.camunda.zeebe.protocol.record.Record;
import io.camunda.zeebe.protocol.record.RecordValue;
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceIntent;
import io.camunda.zeebe.protocol.record.value.BpmnElementType;
import org.assertj.core.api.Assertions;
import org.camunda.bpm.model.xml.instance.DomElement;

import java.io.ByteArrayInputStream;
import java.time.Duration;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

import static io.camunda.zeebe.process.test.assertions.BpmnAssert.*;

public class ZeebeTestUtil {
  private static ZeebeClient client;
  private static ZeebeTestEngine engine;
  private static Duration defaultDuration = Duration.ofSeconds(10);

  public static void init(ZeebeTestEngine engine, ZeebeClient client) {
    init(engine, client, defaultDuration);
  }

  public static void init(ZeebeTestEngine engine, ZeebeClient client, Duration defaultDuration) {
    ZeebeTestUtil.engine = engine;
    ZeebeTestUtil.client = client;
    ZeebeTestUtil.defaultDuration = defaultDuration;
  }

  public static String findId(String elementName) {
    waitForIdleState();
    List<String> potentialElementIds =
        StreamSupport.stream(
                RecordStream.of(engine.getRecordStreamSource()).deploymentRecords().spliterator(),
                false)
            .flatMap(record -> record.getValue().getResources().stream())
            .map(
                deploymentResource ->
                    Bpmn.readModelFromStream(
                        new ByteArrayInputStream(deploymentResource.getResource())))
            .flatMap(
                bpmnModelInstance ->
                    getChildElements(bpmnModelInstance.getDocument().getRootElement()).stream())
            .filter(element -> Objects.equals(element.getAttribute("name"), elementName))
            .map(element -> element.getAttribute("id"))
            .distinct()
            .collect(Collectors.toList());
    Assertions.assertThat(potentialElementIds).hasSize(1);
    return potentialElementIds.get(0);
  }

  private static Set<DomElement> getChildElements(DomElement parent) {
    Set<DomElement> elements = new HashSet<>();
    elements.add(parent);
    parent.getChildElements().forEach(child -> elements.addAll(getChildElements(child)));
    return elements;
  }

  public static void completeJob(String jobType) {
    completeJob(jobType, v -> null, null);
  }

  public static void completeJob(String jobType, Object variables) {
    completeJob(jobType, v -> variables, null);
  }

  public static <V, R> void completeJob(
      String jobType, Function<V, R> variableMapper, Class<V> variablesAsTypeClass) {
    ActivatedJob activatedJob = getJob(jobType);
    V variables =
        Optional.ofNullable(variablesAsTypeClass)
            .map(activatedJob::getVariablesAsType)
            .orElse(null);
    CompleteJobCommandStep1 completeCommand = client.newCompleteCommand(activatedJob);
    Optional.ofNullable(variableMapper.apply(variables)).ifPresent(completeCommand::variables);
    completeCommand.send().join();
    waitForIdleState();
  }

  private static ActivatedJob getJob(String jobType) {
    waitForIdleState();
    ActivateJobsCommandStep3 command =
        client.newActivateJobsCommand().jobType(jobType).maxJobsToActivate(1);
    List<ActivatedJob> activatedJobs = command.send().join().getJobs();
    Assertions.assertThat(activatedJobs).hasSize(1);
    return activatedJobs.get(0);
  }

  public static void correlateMessage(String messageName, String correlationKey) {
    correlateMessage(
        messageName, correlationKey, Optional.empty(), Optional.empty(), Optional.empty());
  }

  public static void correlateMessage(String messageName, String correlationKey, String messageId) {
    correlateMessage(
        messageName, correlationKey, Optional.of(messageId), Optional.empty(), Optional.empty());
  }

  public static void correlateMessage(String messageName, String correlationKey, Object variables) {
    correlateMessage(
        messageName, correlationKey, Optional.empty(), Optional.of(variables), Optional.empty());
  }

  public static void correlateMessage(
      String messageName, String correlationKey, String messageId, Object variables) {
    correlateMessage(
        messageName,
        correlationKey,
        Optional.of(messageId),
        Optional.of(variables),
        Optional.empty());
  }

  public static void correlateMessage(
      String messageName, String correlationKey, Duration timeToLive) {
    correlateMessage(
        messageName, correlationKey, Optional.empty(), Optional.empty(), Optional.of(timeToLive));
  }

  public static void correlateMessage(
      String messageName, String correlationKey, String messageId, Duration timeToLive) {
    correlateMessage(
        messageName,
        correlationKey,
        Optional.of(messageId),
        Optional.empty(),
        Optional.of(timeToLive));
  }

  public static void correlateMessage(
      String messageName, String correlationKey, Object variables, Duration timeToLive) {
    correlateMessage(
        messageName,
        correlationKey,
        Optional.empty(),
        Optional.of(variables),
        Optional.of(timeToLive));
  }

  public static void correlateMessage(
      String messageName,
      String correlationKey,
      String messageId,
      Object variables,
      Duration timeToLive) {
    correlateMessage(
        messageName,
        correlationKey,
        Optional.of(messageId),
        Optional.of(variables),
        Optional.of(timeToLive));
  }

  private static void correlateMessage(
      String messageName,
      String correlationKey,
      Optional<String> messageId,
      Optional<Object> variables,
      Optional<Duration> timeToLive) {
    waitForIdleState();
    PublishMessageCommandStep3 command =
        client.newPublishMessageCommand().messageName(messageName).correlationKey(correlationKey);
    messageId.ifPresent(command::messageId);
    variables.ifPresent(command::variables);
    timeToLive.ifPresent(command::timeToLive);
    MessageAssert messageAssert = assertThat(command.send().join());
    waitForIdleState();
    messageAssert.hasBeenCorrelated();
  }

  public static DeploymentAssert deploy(String resourceName) {
    DeploymentEvent deploymentEvent =
        client.newDeployResourceCommand().addResourceFromClasspath(resourceName).send().join();
    waitForIdleState();
    return assertThat(deploymentEvent);
  }

  public static void deployment(String... resourceNames) {
    DeployResourceCommandStep1 command = client.newDeployResourceCommand();
    DeployResourceCommandStep2 command2 = null;
    for (String resourceName : resourceNames) {
      if (command2 == null) {
        command2 = command.addResourceFromClasspath(resourceName);
      } else {
        command2 = command2.addResourceFromClasspath(resourceName);
      }
    }
    if (command2 != null) {
      command2.send().join();
      waitForIdleState();
      return;
    }
    throw new AssertionError("Deployment was called with no bpmn files");
  }

  public static ProcessInstanceAssert startProcessWithMessage(
      PublishMessageCommandStep3 publishMessageCommand) {
    PublishMessageResponse event = publishMessageCommand.send().join();
    waitForIdleState();
    return assertThat(event).hasCreatedProcessInstance().extractingProcessInstance();
  }

  public static void completeUserTask(Object variables) {
    completeJob(Protocol.USER_TASK_JOB_TYPE, variables);
  }

  public static <V, R> void completeUserTask(
      Function<V, R> variableMapper, Class<V> variablesAsTypeClass) {
    completeJob(Protocol.USER_TASK_JOB_TYPE, variableMapper, variablesAsTypeClass);
  }

  public static void completeWithError(String jobType, String errorCode, String errorMessage) {
    ActivatedJob job = getJob(jobType);
    client.newThrowErrorCommand(job).errorCode(errorCode).errorMessage(errorMessage).send().join();
    waitForIdleState();
  }

  public static void assertProcessInstanceEnded() {

    getRecordStream(ProcessInstanceRecord.class)
        .filter(record -> record.getValue().getBpmnElementType().equals(BpmnElementType.PROCESS))
        .filter(record -> record.getIntent().equals(ProcessInstanceIntent.ELEMENT_COMPLETED))
        .count();
  }

  private static <T extends RecordValue> Stream<Record<T>> getRecordStream(
      Class<T> recordValueType) {
    return StreamSupport.stream(
            engine.getRecordStreamSource().getRecords().spliterator(), false)
        .filter(record -> recordValueType.isAssignableFrom(record.getValue().getClass()))
        .map(record -> (Record<T>) record);
  }

  public static void waitForIdleState() {
    try {
      engine.waitForIdleState(defaultDuration);
    } catch (InterruptedException | TimeoutException e) {
      throw new RuntimeException(e);
    }
  }
}

I hope this helps

Jonathan

Thanks @jonathan.lukas

1 Like