Set timeout for Kafka event in a workflow

In a workflow, i send a message(command) to Kafka using Zeebe Kafka connect and wait for a message(command response) from kafka for correlation. How do i set a timeout for how long my workflow will wait for the command result?

For example in https://github.com/camunda-community-hub/kafka-connect-zeebe/tree/master/examples/microservices-orchestration, lets say i want to set a timeout for how long the process will wait for Order Paid event from Payment service. Please help in modelling.

Hi @ankit_joinwal and welcome to the forums :wave:

Typically when waiting for either a message or a timer, we would use an event-based gateway. It’s a gateway that you attach to 2 or more events, and the gateway will wait until 1 of those events has triggered.

Hope it helps!

1 Like

Thanks, I am trying the example kafka-connect-zeebe/examples/microservices-orchestration at master Ā· camunda-community-hub/kafka-connect-zeebe Ā· GitHub as it is, however when i am sending the message to payment-confirm topic, i can see the message in Kafka but the workflow instance is not progressing. I am using the source, sink connector. files as is but just updated them with camunda cloud conn settings. FYI the source connector is working fine . Any suggestions what could be done wrong with sink connector?

I am using below to send message to kafka

./kafka-console-producer.sh --topic payment-confirm --bootstrap-server localhost:9092

{ā€œeventTypeā€: ā€œOrderPaidā€, ā€œorderIdā€: 9, ā€œamountā€: 4000}

I notice below error in log,
connect_1 | org.apache.kafka.connect.errors.ConnectException: com.jayway.jsonpath.PathNotFoundException: Expected to find an object with property [ā€˜orderId’] in path $ but found ā€˜java.lang.String’. This is not a json object according to the JsonProvider: ā€˜com.jayway.jsonpath.spi.json.JsonSmartJsonProvider’

I resolved it by using JsonConverter. I have a separate query - As per zeebe Kafka connect github , i am not able to filter the variables being sent to kafka.

I have added a header with name variablesToSendToKafka with value as comma separated variable names. Still all variables are being sent to kafka.