Kafka inbout connector message correlation

STT-parent.bpmn (12.7 KB)
Hi - I have a sample process (attached) with kafka start and intermediate event elements.

I post the following messages on configured topic:

  1. {“action”:“STT”,“orderId”:“o100”} - this starts a new process instance which waits for the next event

  2. {“action”:“STT_crossing”,“orderId”:“o100”} - this message is supposed to activate the second element in wait and resume process to end. But instead the connector throws the following error:

2025-02-28 00:07:50 java.lang.RuntimeException: Message cannot be processed: InvalidInput
2025-02-28 00:07:50     at io.camunda.connector.kafka.inbound.KafkaConnectorConsumer.handleCorrelationResult(KafkaConnectorConsumer.java:180)
2025-02-28 00:07:50     at io.camunda.connector.kafka.inbound.KafkaConnectorConsumer.handleMessage(KafkaConnectorConsumer.java:169)
2025-02-28 00:07:50     at io.camunda.connector.kafka.inbound.KafkaConnectorConsumer.pollAndPublish(KafkaConnectorConsumer.java:153)
2025-02-28 00:07:50     at io.camunda.connector.kafka.inbound.KafkaConnectorConsumer.consume(KafkaConnectorConsumer.java:139)
2025-02-28 00:07:50     at io.camunda.connector.kafka.inbound.KafkaConnectorConsumer.lambda$startConsumer$0(KafkaConnectorConsumer.java:95)

Looking at the source code, I think this is where it might be throwing this exception out of the four possibilities (I wish it was logging the cause along with exception class).

InboundCorrelationHandler

@chillleader @sbuettner can you guys please take a look and point out what am I missing in the connector configuration? Thanks!

Could you enable debug logging (logging.level.io.camunda.connector=DEBUG)? This should hopefully make this logging statement visible and give more clarity.

Thanks @chillleader, yes I turned the debug log on and found that my activation condition wasnt mutually exclusive for both the elements and it was activating both on the same event.

After fixing the activation condition, it works alright except I would also like to set correlation on start message itself.

Can you please help me understand how can that be done since the documentation says start messages shouldnt have correlation…
Essentially, I dont even want to start another process instance for the same correlation key.

There is an option to enable correlation in the connector configuration. Let me know if you have this on your Kafka message start event - you need to select “Correlation required” in the dropdown. This is normally used for subprocesses as the hint suggests, but I think it also applies in your case.

Yes, I saw that option on the start element.

When I set correlation required on both elements using same correlation key. This is the behavior and I dont think its right.

  1. start message and element behaves correctly - it does not start a new process instance when one is running already with same correlation key
  2. intermetiate element and message does not behave correctly - it completes the waiting process instance but also starts a brand new instance which is unexpected. The new instance that is started ends up in the wait state again as if it was triggered by the start message, I dont understand how that could happen and activate the first element when activation condition is different for both messages.

I have a feeling that its not consuming all messages from the kafka topic even though I have “consume unmatched events” checked on. Those unconsumed messages might be starting new processes.

@chillleader any idea why it might not be committing offset on consuming unmatched events? I couldn’t find that property in the connector code, can you please help me locate that code?

How do your new activation conditions look like? To me it vaguely looks like something that might be caused by message buffering (see this table - row 3 might be your case). But still, I’m not sure where the duplicated message could be coming from.

Can you do the test again and capture the logs of the connector runtime after you send a Kafka message? The runtime should log every time it submits a new message to Zeebe (you might need to increase logging level up to TRACE to get the full picture).

As for unmatched events, the logic that decides whether to commit offsets is located here. The runtime returns the handling strategy based on the state of the checkbox in the connectors config (if “consume unmatched events” is true, then it will be Ignored, otherwise - ForwardErrorToUpstream).

@chillleader so I tried this again with both Kafka and a Webhook connector.
But this time I set a message id for deduplication and that seems to do the trick.

I get the desired behavior now except for one issue - when the process instance has finished (I assume its no longer active) and I send a start message with same correlation key as before, I expect a new process instance to be created since the one before has finished already. But instead, it does not create a new process instance unless I change the correlation key to some brand new value. This is unexpected.

I have attached both the sample bpmn files, can you please take a look?

Also, I dont really understand what does message TTL do, does the engine maintain an internal queue/buffer for all received messages? What’s the purpose of that? I didn’t set any TTL so I assume it defaulted to 0.

I also don’t understand why a second message id field is needed for deduplication and why correlation key alone cannot guarantee uniqueness?

KafkaTest.bpmn (12.9 KB)
WebhookTest.bpmn (9.7 KB)

Sorry for the delay. What you describe makes me think that when you publish a Kafka message (or trigger the webhook endpoint), the request is somehow duplicated which results in multiple messages reaching Zeebe for a single incoming connector event. Do you have a single instance of the connector runtime, or are you running multiple?

Re. message TTL: yes, there is a buffer for all queued messages. Primary use case for that is to store the message as long as it can’t be correlated immediately (for intermediate events, message subscription is not open yet in the process instance, or, for start events, another message has previously already triggered a process instance with the same correlation key). The default TTL is 1 hour, if you want to disable message buffering you can set it to 0 explicitly.

I also don’t understand why a second message id field is needed for deduplication and why correlation key alone cannot guarantee uniqueness?

Correlation key is not intended for uniqueness - for example, you can have different message types (like, payment_started and payment_completed) that should be correlated to your process instance. They will likely have the same correlation keys but must not be considered equal.

@chillleader I was running only one connector runtime at the time of this test. I will try setting TTL to 0 explicitly and see if that makes any difference.

I do have two runtimes running now for a different use case that I am trying. The other runtime does not have the ootb bundle, it only has the runtime and a custom connector which I believe should be ok. The runtime should have no knowledge of message subscriptions in the connector, right?

Regarding correlation, if I understand correctly - correlation key is used to find an existing matching process instance to resume execution and message id is just to maintain unique messages in the internal buffer?

Can there be a composite correlation key also - i.e. using two fields from the payload and hashing them together?

I can also recommend increasing logging level (logging.level.io.camunda.connector=DEBUG) and looking for the following messages: Message correlated successfully from the Kafka connector and Published message with key: {} from InboundCorrelationHandler. This way you will see if any duplicate messages are produced.

In your case, the second runtime should not any effect as long as it doesn’t have the Webhook and Kafka connectors registered.

You are correct in your understanding of the correlation keys and message IDs. You can absolutely have composite correlation keys - just set the correlation key (process) and correlation key (payload) fields accordingly as FEEL expressions that combine multiple variables/fields. The only thing to keep in mind is the visibility scopes of each expression: the payload expression can only reference fields from the incoming connector event, and the process expression can only access process variables.

Thanks @chillleader

I have the logger debug on and I can see those messages when message is correlated successfully.

I tried with TTL set to PT0S and I still dont see the desired behavior. I dont know what am I missing.

With TTL set to 0, I couldn’t trigger the intermediate event to resume the existing process.