Working example of basic message aggregation in java

Hi,

Could someone please point me to a simple working example of message aggregation in Zeebe like the one mentioned on Zeebe and Operate Alpha Releases: December 2019 Edition - Camunda

I believe I need to only to deploy one instance of my workflow and publish multiple messages with the same name and correlation key so I can aggregate all of them together, correct? Currently, every time I publish a message, a new instance is created, which is not prevents me from aggregating one message with other messages.

Any help and examples are appreciated.

Hi @Nima_Namjouyan :wave:

Regarding your question:

Correct. Make sure that the messages are published with the same correlation key.

If the problem persists then please share your code that publishes the messages.

The pattern is documented here: https://docs.camunda.io/docs/product-manuals/concepts/messages#message-aggregator

Best regards,
Philipp

Thanks @philipp.ossler .

It is a bit difficult to show all my code. The zeebe code is embedded in a custom Nifi processor written in java, but essentially, this is what is happening with zeebee:

private ZeebeClient client;

client = ZeebeClient.newClientBuilder()

                                .gatewayAddress(zeebeConnUrl)

                                .usePlaintext()

                                .build();

                

                        logger.debug(String.format("Workflow path: %s", workFlowPathString));

                        logger.debug(String.format("Zeebe connection url: %s", zeebeConnUrl));

                        

                        final DeploymentEvent deployment = client.newDeployCommand()

                                .addResourceFile(workFlowPathString)

                                .send()

                                .join();

                        

                        int workflowVersion = deployment.getWorkflows().get(0).getVersion();

                        logger.debug(String.format("Deployed workflow version: %d", workflowVersion));

                        

                        // logger.info(String.format("Successfully created zeebe workflow instance and data variable name: %s", dataVariableName));

                }

        

                final Map<String, Object> data = new HashMap<>();

                data.put(dataVariableName, jsonData);

                

                client.newPublishMessageCommand()

                        .messageName("hello")

                        .correlationKey("hello")

                        .timeToLive(Duration.ofSeconds(0))

                        .variables(data).send().join();

So, essentially, I am deploying a workflow with a given url and then publish a message with a variable string called data.
My issue is now that when I set timeToLive > 0, I get multiple instances created, which I think is not what I want right? Because it does not let me aggregate multiple messages together as they are have their own seperate instances.
When I let timeToLive = 0, only one instance is deployed, but only one message goes through out of multiple messages.

Also, the message start event bpmn icon only has message Name field, but no correlation field.

Any idea why this is happening?

This sounds like you are publishing the “Start” Message, rather than the message for the “Message Catch” event.

@jwulf
I would like to publish to start message event, that is correct. I would like to publish multiple messages to the same instance using the message start event. Is it not possible to do so?
So What I am trying to do is to buffer messages at the message start event to go through the same instance. There is a message catch in the middle with its correlation key updating to catch messages with the same ID.
Am I doing this incorrectly? I am just trying to implement this:

Is there a working example of this anywhere?

A message start event should have a different message definition from an intermediate catch event. Otherwise the intention is ambiguous. A start event cause a new process instance to be created.

You need to use a different message to correlate.

@jwulf

Thank you. Are there any examples of this is Java? Particularly in the context of message aggregation pattern? I would appreciate it if you could please elaborate a bit more? Because, I am having some difficulty finding examples of this.

Just change the intermediate message from A to B.

There is nothing Java-specific about this.

If you give me details of your specific domain, I can give you an example using that.

@jwulf

I have a situation where I am receiving a number of json files of parents and children. Often, a parent only has one child. But sometimes, a parent can have twins or triplets and so on. The issue is that I will be still receiving the json files of parents and children one by one and in any order, but I need to aggregate the connected parent and children together and add to their json file that they are related (i.e. together). Can Zeebe and aggregation help with this case and keep track of the related parents and children in this case or there needs to be something external to zeebe to keep track of states of which parent and related child has been processed?

OK, so you have a single event type from NiFi that can contain a parent or one of its children?

If you wire that directly to Zeebe, you can only do this with a single process instance that basically runs forever.

Otherwise, you need an adaptor that routes NiFi messages containing parents to the correct Zeebe message type to start a process instance, and a NiFi message containing children to the Zeebe message type for the intermediate catch event.

In Zeebe, messages are routed on their message name (message type) and a correlation on content, not simply on their content.

@jwulf

Yes. And they contain ID to link a child to a parent if they are related.

Ahhhh, that makes sense.

If you wire that directly to Zeebe, you can only do this with a single process instance that basically runs forever.

Are you saying that I can make one instance and keep putting new data through the same instance? Currently, every time I send a new json file, a new instance is created. So, I am not sure how I can send new data through the same instance every time.

Otherwise, you need an adaptor that routes NiFi messages containing parents to the correct Zeebe message type to start a process instance, and a NiFi message containing children to the Zeebe message type for the intermediate catch event.

In Zeebe, messages are routed on their message name (message type) and a correlation on content, not simply on their content.

This makes perfect sense.

You start a process instance programatically with a start event, and all your NiFi messages are correlated to an intermediate catch event in it.

I wouldn’t recommend this, though, because a long running process like this will cause unbounded memory usage.

@jwulf

That makes very good sense. Thank you so much for you patience and clarifying the concepts for me. I do appreciate it