More code examples on using Zeebe testcontainers please?

HI,

I am evaluating how easy its to write tests for Zeebe. and I am looking into examples of using the zeebe testcontainers, as I get it that its the prefered way of doing it.

But, I am a bit lost in the documentation and also examples, I am looking into different repo like eze, zeebe, zeebe-testcontainer repos to understand how it works.

My main question now is, see this example here

Is this actually starting the process and executing the real tasks in the workers or not?

1 Like

@philipp.ossler any help here? Sorry to directly ping you :confused:

Basically. I want to have my worker task either be mocked or sometimes ran and then I can check what is set inside it

Hey @sherry-ummen

the line you referenced:

 val process = Bpmn.createExecutableProcess("process")
            .startEvent()
            .endEvent()
            .done()

Just creates an executable BPMN process which you can deploy then later. This is done in one of the next lines, it looks like this:

val deployFuture: Future<DeploymentEvent> = client.newDeployCommand()
            .addProcessModel(process, "process.bpmn")
            .send()

Creating a process instance would look like this:

        val processInstance = client.newCreateInstanceCommand()
            .bpmnProcessId("process")
            .latestVersion()
            .variables(mapOf("x" to 1))
            .send()
            .join()

Everything above is just normal client usage. I suggest, if you have problems with that, check out one of our getting started guides.

We added more samples on the eze-sample project, which you can also check out.

Plus: As part of our hack days, we created today our first eze (embedded zeebe engine) release which should simplify how you can write tests. Feel free to check it out.

Hope that helps!

Greets
Chris

2 Likes

Hey thanks Zelldon. Eze project looks pretty cool. Definitely trying that.

Though my original question still. So my process has certain tasks executed in the worker.

Now in the tests of course I am not running any worker but all the worker code exists in the project. So these tests when I run using the same process model does it execute any of the tasks ? If yes then how?

And if not then how can I do it?

1 Like

No. If the workers are not started then the tasks are not executed.

The Zeebe Testcontainer project doesn’t provide a way of mocking tasks.

Depending on your setup and what you want to test, you could start the original workers, or provide special workers for the test environment.

Have a look at:

1 Like

Hmm so yes I am using Springboot and the test looks like this

package fi.workflow

//imports skipped

@SpringBootTest
@Testcontainers
class WorkflowApplicationTests {

    private var client: ZeebeClient? = null

    @Container
    private val zeebeContainer = ZeebeContainer()

    @BeforeEach
    fun setup() {
        System.setProperty("zeebe.client.broker.gateway-address",zeebeContainer.externalGatewayAddress )
        client = ZeebeClient.newClientBuilder()
                .gatewayAddress(zeebeContainer.externalGatewayAddress)
                .usePlaintext()
                .build()
        val process = Bpmn.createExecutableProcess("Process_v1").startEvent().endEvent().done()
        client?.newDeployCommand()?.addProcessModel(process, "basic_algo.bpmn")?.send()?.join()
    }

    @Test
    fun `verify pac worked is called`() {
        val processId = UUID.randomUUID().toString()
        val event = TransformedEvent(
                id = processId,
        )
        val input = mapOf("processId" to processId, "event" to event) // WorkflowInput(processId, event)
        val variables = Klaxon().toJsonString(input)
        val processInstanceResult = client
                ?.newCreateInstanceCommand()
                ?.bpmnProcessId("Process_v1")
                ?.latestVersion()
                ?.variables(variables)
                ?.withResult()
                ?.send()
                ?.join()

        assertThat(processInstanceResult?.variablesAsMap?.containsKey("waitCounter")).isEqualTo(0)
    }
}


The bpmn file resides in the tests/resources folder

The worker is in the same application.

I am a bit confused now, why my worker in not being called?

Is it not possible or something is wrong how I am doing it?

Bpmn: basic_algo.bpmn · GitHub

Worker:

package fi.workflow.worker

// imports skipped

@Component
class PacWorker {

    @ZeebeWorker(type = "pac")
    fun pac(client: JobClient, job: ActivatedJob) {
        val initObject = object {
            val waitCounter: Int = 0;
            val packTicketSuppressed: Boolean = false
        }
        client.newCompleteCommand(job.key)
            .variables(Klaxon().toJsonString(initObject))
            .send().join()
    }
}

It can be that the application is already started before you set the system property for the gateway address. As a result, the application client/worker uses the wrong address and can’t connect to Zeebe inside the test-container.

Please check if the system property is set before the application client/worker is started.

It may help to setup the test-container statically for the test class.

Alright. Thank you. I had that suspicion as well. Now I changed the tests like this

@SpringBootTest
@Testcontainers
class WorkflowApplicationTests {

    companion object{
        @Container
        val zeebeContainer = ZeebeContainer()

        init {
        	zeebeContainer.start()
            System.setProperty("zeebe.client.broker.gateway-address",zeebeContainer.externalGatewayAddress )
        }
    }

    private var client: ZeebeClient? = null

    @BeforeEach
    fun setup() {
        client = ZeebeClient.newClientBuilder()
                .gatewayAddress(zeebeContainer.externalGatewayAddress)
                .usePlaintext()
                .build()
        val process = Bpmn.createExecutableProcess("Process_v1").startEvent().endEvent().done()
        client?.newDeployCommand()?.addProcessModel(process, "basic_algo.bpmn")?.send()?.join()
    }

    @Test
    fun `verify pac worker is called`() {
        val processId = UUID.randomUUID().toString()
        val event = TransformedEvent(
                id = processId,
        )
        val input = mapOf("processId" to processId, "event" to event) // WorkflowInput(processId, event)
        val variables = Klaxon().toJsonString(input)
        val processInstanceResult = client
                ?.newCreateInstanceCommand()
                ?.bpmnProcessId("Process_v1")
                ?.latestVersion()
                ?.variables(variables)
                ?.withResult()
                ?.send()
                ?.join()

        assertThat(processInstanceResult?.variablesAsMap?.containsKey("waitCounter")).isEqualTo(0)
    }

}

Still does not help. But my log now looks so that the assertion fails, and later in the logs I see these kind of warnings

2021-08-17 11:13:30.866  WARN 49901 --- [ult-executor-15] io.camunda.zeebe.client.job.poller       : Failed to activated jobs for worker vnoc-worker and job type pac

io.grpc.StatusRuntimeException: UNAVAILABLE: io exception
	at io.grpc.Status.asRuntimeException(Status.java:535) ~[grpc-api-1.37.0.jar:1.37.0]
	at io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onClose(ClientCalls.java:478) ~[grpc-stub-1.37.0.jar:1.37.0]
	at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:553) ~[grpc-core-1.37.0.jar:1.37.0]
	at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:68) ~[grpc-core-1.37.0.jar:1.37.0]
	at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:739) ~[grpc-core-1.37.0.jar:1.37.0]
	at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:718) ~[grpc-core-1.37.0.jar:1.37.0]
	at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) ~[grpc-core-1.37.0.jar:1.37.0]
	at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123) ~[grpc-core-1.37.0.jar:1.37.0]
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[na:na]
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[na:na]
	at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
Caused by: io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: localhost/0:0:0:0:0:0:0:1:55038
Caused by: java.net.ConnectException: Connection refused
	at java.base/sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) ~[na:na]
	at java.base/sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:779) ~[na:na]
	at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:330) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
	at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:334) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:707) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final]
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) ~[netty-common-4.1.58.Final.jar:4.1.58.Final]
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.58.Final.jar:4.1.58.Final]
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.58.Final.jar:4.1.58.Final]
	at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]

Hmm I am a bit confused what is happening. Is the test now ending soon and the worker are not able to setup ?

or still my setup is broken ? :confused: