Message ID uniquiness only during super short buffering time?

A user is trying to leverage the unique message feature of Zeebe - but just found that messages are delivered independent if the message is already known. See Zeebe grpc duplicate message error is not triggered. · Issue #17 · berndruecker/kafka-camunda-spring-simple · GitHub

I quickly tried this myself in this project: GitHub - berndruecker/zeebe-message-id-investigation: Playground to check message ids. I can confirm, that no messages are rejected as duplicates.

I think this is inline with the documentation: “A message is rejected and not correlated if a message with the same name, the same correlation key, and the same id is already buffered” - as the messages are properly correlated, they might not count as buffered any more.

BUT this is not inline with the intuitive understanding around idempotent message delivery. I also understood that feature in a way, that I can make sure messages are only processed once. This is also how the feature was originally designed: A message can be published idempotent · Issue #1012 · camunda/zeebe · GitHub

Any thoughts from the Zeebe team ?

Thanks in advance!
Bernd

1 Like

Hi Bernd. :wave:

Thank you for reporting. This behavior is not expected.

The message should be rejected as long as the message is not deleted. The message is deleted after its TTL is reached. This is independent if the message is correlated or not.

I tried to reproduce the behavior using your demo process. But it’s working for me.

Which Zeebe version do you use?
What TTL do you set for the message?

In your demo application, the message is published without a TTL.
As a result, the message is not buffered and the next message is not rejected.

Best regards,
Philipp

1 Like

Hi @Philipp_Ossler

I am the person who initially raised this issue to Bernd. In my example I actually do use a TTL and I tested this with a long and short TTL just for my own sanity check.

Here is the link to the issue I raised on GitHub:
https://github.com/berndruecker/kafka-camunda-spring-simple/issues/17#issuecomment-1357450369

Here is the link to my example, which is also mentioned in the issue:
https://gitlab.com/ohmycaptainnemo/zeebe-message-publisher-api/-/blob/dev/zeebe-spring-api/src/main/java/com/ohmycaptainnemo/zeebespringapi/message/MessageDaoService.java

Just to clarify, I have used Zeebe before and I have received the Duplication Message Error before doing a very similar test I am doing in the link above. Except I used python and pyzeebe or Java client for zeebe, which should not really make any difference. This issue seems to be happening somehow with the integration of spring and zeebe perhaps?

Thanks

Hi @BerndRuecker ,

Whenever you get the chance, could you please do an independent test based on what Philipp mentioned and confirm that you get the same behaviour or not? I just want to get to the bottom of this and find out whether there is some sort of config on my end or your end that could be causing this perhaps?

Thanks

Hi @BerndRuecker ,

Any luck or news on trying what @Philipp_Ossler mentioned and seeing if you get the same or different outcome?

I had a quick look at the demo application again - and I just recognized the “logging” was pretty bad - as the message is actually properly rejected - even without an explicit TTL (which should lead to some default TTL):

Send in message...
2023-01-11 10:16:37.497  INFO 28356 --- [pool-2-thread-1] i.c.z.spring.example.ExampleApplication  : started instance for workflowKey='2251799813686381', bpmnProcessId='demoProcess', version='1' with workflowInstanceKey='2251799814842076'
...completed
...could NOT be sent: io.grpc.StatusRuntimeException: ALREADY_EXISTS: Command 'PUBLISH' rejected with code 'ALREADY_EXISTS': Expected to publish a new message with id '121212', but a message with that id was already published
Send in message...
2023-01-11 10:16:42.489  INFO 28356 --- [pool-2-thread-1] i.c.z.spring.example.ExampleApplication  : started instance for workflowKey='2251799813686381', bpmnProcessId='demoProcess', version='1' with workflowInstanceKey='4503599628527317'
...completed
...could NOT be sent: io.grpc.StatusRuntimeException: ALREADY_EXISTS: Command 'PUBLISH' rejected with code 'ALREADY_EXISTS': Expected to publish a new message with id '121212', but a message with that id was already published

Could you run into the same problem @Nima_Namjouyan - that you simply not pick up the exception properly?

1 Like

Hi @BerndRuecker,

Ahh I see. You are correct about the logs indeed. Thank you and also @Philipp_Ossler .

However, you see how the code block for publishing the message has the exceptionally bit at the end? If the message failed to publish due to already existing, should not an exception be able to be thrown in there if desired? I mean technically the message will not be published as it already exists and I was thinking the exceptionally bit should perhaps catch that?
Please let me know if I am mistaken, but the exception is not being thrown even when the message already exists.

        zeebe.newPublishMessageCommand().messageName("yes")
                .correlationKey("yes")
                .messageId("121212")
                .timeToLive(Duration.ofSeconds(10000))
                .variables(message.getVariables())
                .send()
                .exceptionally(t -> {
                    throw new RuntimeException("Could not hand over record to Zeebe: "+". check nested exception for details: " + t.getMessage());
                  });

Thank you

You can very well throw an exception - but since callback executed asynchronously the caller thread is no longer available and thus does not get the exception. The beauty of asynchronous programming - it is super powerful and much more efficient, but also harder to get sometimes :wink:

You could make the call blocking by doing send().join() and then should get an exception in the callstack - but also limit scalability at that moment (I wrote a bit about it here: Writing Good Workers For Camunda Cloud | by Bernd Rücker | berndruecker)

1 Like

Very nice. Thank you for the explanation.

@BerndRuecker ,

Sorry, just another last one. In your code here:
https://github.com/berndruecker/kafka-camunda-spring-simple/blob/dcdfc5c4080de6586559573c8df3f54e0b2df3c1/src/main/java/io/berndruecker/example/camunda/kafka/FromKafkaToCamunda.java#L40

I think you are using the same syntax and async approach for publishing a message, correct? Does that mean in your case the exception that you throw may encounter the same issue as mine?

I guess so - yes. As you can see - even if you theoretically know you might stumble upon it :wink: Happy to take a PR on it - otherwise I try to fix it some time in the future

1 Like