Camunda Spring boot rabbitmq multiple participants

Hello i am still struggeling finding a solution to perform communication (send, receive tasks) within loops in an asychronious way. (Correlation errors because msg arrives to fast)
To solve this i came to the idea to use a message broker inbetween which is called RabbitMQ.

I used this project as a starting point and what i did was to simple transform the whole project from groovey to java code. ( StephenOTT/camunda-spring-boot-rabbitmq-messaging: Example project of a Camunda Spring Boot project that uses RabbitMQ to queue messages (github.com))

After i mixed it up with my java code which i already had before.
The structure looks like this:

The only class which i added was the class which the red arrow pointing to (SendMessageRest)
This class get called as a delegation expression on send tasks from bpmn - the code looks like this:

package com.example.workflow;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Date;

import org.camunda.bpm.engine.RuntimeService;
import org.camunda.bpm.engine.delegate.DelegateExecution;
import org.camunda.bpm.engine.delegate.JavaDelegate;
import org.camunda.bpm.model.bpmn.BpmnModelInstance;
import org.camunda.bpm.model.bpmn.instance.FlowNode;
import org.joda.time.DateTime;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Component;
import org.springframework.web.client.RestTemplate;

@Component()
public class SendMessageRest implements JavaDelegate {
	final
    RuntimeService runtimeService;

    public SendMessageRest(RuntimeService runtimeService){
        this.runtimeService= runtimeService;
    }
    
	private static final Logger LOGGER = LoggerFactory.getLogger(SendMessageRest.class);

	@Override
	public void execute(DelegateExecution execution) throws Exception {
		String correlationKey = (String) execution.getVariable("correlationKey");
        String messageName = "msg_" + execution.getProcessEngineServices().getRepositoryService().getProcessDefinition(execution.getProcessDefinitionId()).getKey();
        BpmnModelInstance modelInstance = execution.getBpmnModelInstance();
        FlowNode sendTask = modelInstance.getModelElementById(execution.getCurrentActivityId());
        String receiveID = sendTask.getDocumentations().iterator().next().getTextContent();
        
        System.out.println("receiveID: " + receiveID);
        // String messageID = (String) execution.getVariable("messageName");
        String targetParticipant = execution.getCurrentActivityName().substring(execution.getCurrentActivityName().indexOf("Send(")+5, execution.getCurrentActivityName().indexOf(")"));
       // {c} P3
        boolean decisionVariable = false;
        String decision = "";
        if(targetParticipant.contains("{") && targetParticipant.contains("}")) {
        	decisionVariable = true;
        	decision = targetParticipant.substring(targetParticipant.indexOf("{")+1, targetParticipant.indexOf("}"));
        	targetParticipant = targetParticipant.substring(targetParticipant.lastIndexOf("}")+2);
        }
        
        messageName = "msg_" + receiveID;
        
        DateTime time = new DateTime();
        System.out.println("\n\033[33;1mSending out message\033[0m " + time.getHourOfDay() + ":" + time.getMinuteOfHour() + ":" + time.getSecondOfMinute());  
        System.out.println("\033[32;1mmessageName: \033[0m"+ messageName + " \033[32;1mtargetParticipant: \033[0m" + targetParticipant);
        
        /*{
        	"messageName":"msg_Process_1kaxj2c_P1",
        	"correlationKeys":{
        			"correlationKey":{
        				"value":"correlation0.0331547946214632"
        			}
        	}
        }*/
        HttpEntity<String> entity2;
        
		JSONObject obj = new JSONObject();
		JSONObject var = new JSONObject();
		JSONObject cKey = new JSONObject();
		
		JSONObject decisionJson = new JSONObject();
		/*"processVariables" : {
		"aVariable" : {"value" : "aNewValue"}
	  }*/
		
		if(decision.length() == 0 && !decisionVariable) {
			obj.put("messageName", messageName);
		} else {
			obj.put("messageName", messageName);
			decisionJson.put("value", execution.getVariable(decision).toString());
			var.put(decision, decisionJson);
			obj.put("processVariables", var);
		}
		//cKey.put("value", correlationKey);
		//var.put("correlationKey", cKey);
		
		//obj.put("correlationKeys", var);
        
		System.out.println(obj.toString());
		
		String businessKey = execution.getProcessBusinessKey();
		System.out.println("\033[32;1mBusinessKey:\033[0m "+ businessKey + "\n");
		
		// find out who is target e.g P1, P2 ...
		// find port => P1 = 8081, P2 = 8082, P3 = 8083 ...
		//String targetParticipant = messageID.substring(messageID.indexOf("(")+1, messageID.indexOf(")"));
		
		String host = "";
		String port = "8091";
		if(targetParticipant.equals("P1")) {
			port = "8091";
		} else if(targetParticipant.equals("P2")) {
			port = "8092";
		} else if(targetParticipant.equals("P3")) {
			port = "8093";
		} else if(targetParticipant.equals("P4")) {
			port = "8094";
		}
			boolean gotIt = false;
			try {
				InetAddress.getByName(targetParticipant);
				gotIt = true;
			} catch (UnknownHostException e) {
				// just don't die
			}
			if (gotIt) {
				host = targetParticipant;
			} else {
				host = "localhost";
			}
			
		//host = "192.168.1.116";
		final String REST_ENDPOINT = "http://" + host + ":" + port + "/rabbit/message";  ///rest/message
		
		System.out.println("\033[32;1mSend msg to: \033[0m"+ targetParticipant + " \033[32;1mwith port: \033[0m" + port + " " + REST_ENDPOINT);

		HttpHeaders headers = new HttpHeaders();
		headers.setContentType(MediaType.APPLICATION_JSON);
		
		entity2 = new HttpEntity<String>(obj.toString(), headers);
		
		/*HttpEntity<RestMessageDto> entity = new HttpEntity<RestMessageDto>(//
				new RestMessageDto(messageName, businessKey), // here is the JSON body
				headers);*/
		boolean successful = false;
		while(!successful) {
			ResponseEntity<Object> result = null;
			try {
				result = new RestTemplate().postForEntity(REST_ENDPOINT, entity2, null);
			} catch(Exception e) {
				System.out.println("\n\033[33;1mERROR send out1\033[0m\n");
			}
			if(result != null) {
				if (!result.getStatusCode().is2xxSuccessful()) {
					Thread.sleep(2000);
					System.out.println("\n\033[33;1mERROR send out2\033[0m\n");
					//throw new RuntimeException(result.getStatusCode().toString() + ": " + result.getBody());
					
				} else {
					successful = true;
					System.out.println("\n\033[33;1mSending out message done\033[0m\n");
				}
			} else {
				Thread.sleep(2000);
				System.out.println("\n\033[33;1mERROR send out3\033[0m\n");
			}
			
			
		}
		
	}
}

What it does is simple parsing a messageName which is attached to the send task as documentation and it finds out to which participant (8091, 8092, 8093) it is sending a message over rabbitmq broker.

Send task looks like this which has a documentaion attached with that he knows to the message name which gets sended out msg_ + docuName:

As i have 3 different participants which will be communicating i simply copied the whole project two times (my-project, my-project2, my-project3).
Each of them gets his own bpmn process looking like this:

Model my-project = P1
Model my-project2 = P2
Model my-project3 = P3

I know it looks tricky but actually it isnt.
I have P1 which starts manually and then sends a message waking up P3.
P3 also send a message for waking up P2. (marked in light green)

After that every Participant goes into the loop body and they are communicating with each other so they simply pass the control flow as expected. But as you see in red borders: P2 is sending something to P1 and P1 is so fast that it directly jumps into next loop iteration and it is on send task marked with red borders and it sends a message to P2 but P2 is not ready yet so i get a correlation error on the receive task of P2 marked in red border.

So, the actual problem is that the receiver is not yet on the receive task but the sender has already sended the message for him to correlate.

That was the inital point where i decided to use a message broker inbetween in order to be sure that the receiving side simply takes out a message from the Queue (rabbitMQ) when he is ready and not before.

But it does not work… i am also facing a correlation mismatch.

I have deployed three processes (8091, 8092, 8093)

As you see on the top-left window here is P1 sending a message to P2 at 18:15:32 (msg_task10)
P2 is below P1 which faces the correlation error.
P3 is on the right side and he also sends a message to P2 at 18:15:32 (msg_task21)

To not lose the point i marked the receive task with 21 and 10 in red that you can see which receive tasks are meant to be:

When i think of rabbitmq queue it is FIFO so P1 is sending at 16:8:29 and P3 is sending at 16:8:31 then the wrong message is on top of rabbitmq stack or why does i get a correlation error??

RabbitExchange files of P1 (my-project):

package com.example.workflow.rabbitmq;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class CamundaMessageExchange {

    public static final String topicExchangeName = "camunda-exchange";
    static final String queueName = "messages";

    @Bean
    Queue queue() {
        return new Queue(queueName, false);
    }

    @Bean
    TopicExchange exchange() {
        return new TopicExchange(topicExchangeName);
    }

    @Bean
    Binding binding(Queue queue, TopicExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("camunda.bpmn.message");
    }

    @Bean
    SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
                                             MessageListenerAdapter listenerAdapter) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueueNames(queueName);
        container.setMessageListener(listenerAdapter);
        return container;
    }

    @Bean
    MessageListenerAdapter listenerAdapter(Receiver receiver) {
        return new MessageListenerAdapter(receiver, "receiveMessage");
    }
}

The only thing i have changes is the queueNames for each participant and the topicExchangeName as you can see here the code from P3 (my-project3):

package com.example.workflow.rabbitmq;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class CamundaMessageExchange {

    public static final String topicExchangeName = "camunda-exchange3";
    static final String queueName = "messages3";

    @Bean
    Queue queue() {
        return new Queue(queueName, false);
    }

    @Bean
    TopicExchange exchange() {
        return new TopicExchange(topicExchangeName);
    }

    @Bean
    Binding binding(Queue queue, TopicExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("camunda.bpmn.message3");
    }

    @Bean
    SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
                                             MessageListenerAdapter listenerAdapter) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueueNames(queueName);
        container.setMessageListener(listenerAdapter);
        return container;
    }

    @Bean
    MessageListenerAdapter listenerAdapter(Receiver receiver) {
        return new MessageListenerAdapter(receiver, "receiveMessage");
    }
}

Here is the whole error message;

org.camunda.bpm.engine.MismatchingMessageCorrelationException: Cannot correlate message 'msg_task10': No process definition or execution matches the parameters
        at org.camunda.bpm.engine.impl.cmd.CorrelateMessageCmd.execute(CorrelateMessageCmd.java:88) ~[camunda-engine-7.18.0.jar:7.18.0]
        at org.camunda.bpm.engine.impl.cmd.CorrelateMessageCmd.execute(CorrelateMessageCmd.java:42) ~[camunda-engine-7.18.0.jar:7.18.0]
        at org.camunda.bpm.engine.impl.interceptor.CommandExecutorImpl.execute(CommandExecutorImpl.java:28) ~[camunda-engine-7.18.0.jar:7.18.0]
        at org.camunda.bpm.engine.impl.interceptor.CommandContextInterceptor.execute(CommandContextInterceptor.java:110) ~[camunda-engine-7.18.0.jar:7.18.0]
        at org.camunda.bpm.engine.spring.SpringTransactionInterceptor.lambda$execute$0(SpringTransactionInterceptor.java:71) ~[camunda-engine-spring-7.18.0.jar:7.18.0]
        at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:140) ~[spring-tx-5.3.22.jar:5.3.22]
        at org.camunda.bpm.engine.spring.SpringTransactionInterceptor.execute(SpringTransactionInterceptor.java:71) ~[camunda-engine-spring-7.18.0.jar:7.18.0]
        at org.camunda.bpm.engine.impl.interceptor.ProcessApplicationContextInterceptor.execute(ProcessApplicationContextInterceptor.java:70) ~[camunda-engine-7.18.0.jar:7.18.0]
        at org.camunda.bpm.engine.impl.interceptor.CommandCounterInterceptor.execute(CommandCounterInterceptor.java:35) ~[camunda-engine-7.18.0.jar:7.18.0]
        at org.camunda.bpm.engine.impl.interceptor.LogInterceptor.execute(LogInterceptor.java:33) ~[camunda-engine-7.18.0.jar:7.18.0]
        at org.camunda.bpm.engine.impl.interceptor.ExceptionCodeInterceptor.execute(ExceptionCodeInterceptor.java:55) ~[camunda-engine-7.18.0.jar:7.18.0]
        at org.camunda.bpm.engine.impl.MessageCorrelationBuilderImpl.execute(MessageCorrelationBuilderImpl.java:322) ~[camunda-engine-7.18.0.jar:7.18.0]
        at org.camunda.bpm.engine.impl.MessageCorrelationBuilderImpl.correlateWithResult(MessageCorrelationBuilderImpl.java:233) ~[camunda-engine-7.18.0.jar:7.18.0]
        at org.camunda.bpm.engine.rest.impl.MessageRestServiceImpl.correlate(MessageRestServiceImpl.java:83) ~[camunda-engine-rest-core-7.18.0.jar:7.18.0]
        at org.camunda.bpm.engine.rest.impl.MessageRestServiceImpl.deliverMessage(MessageRestServiceImpl.java:66) ~[camunda-engine-rest-core-7.18.0.jar:7.18.0]
        at com.example.workflow.rabbitmq.CamundaMessageProcessor.processMessage(CamundaMessageProcessor.java:35) ~[classes/:na]
        at com.example.workflow.rabbitmq.Receiver.receiveMessage(Receiver.java:19) ~[classes/:na]
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:na]
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
        at java.base/java.lang.reflect.Method.invoke(Method.java:564) ~[na:na]
        at org.springframework.util.MethodInvoker.invoke(MethodInvoker.java:283) ~[spring-core-5.3.22.jar:5.3.22]
        at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.invokeListenerMethod(MessageListenerAdapter.java:366) ~[spring-rabbit-2.4.6.jar:2.4.6]
        at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.onMessage(MessageListenerAdapter.java:293) ~[spring-rabbit-2.4.6.jar:2.4.6]
        at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1670) ~[spring-rabbit-2.4.6.jar:2.4.6]
        at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1589) ~[spring-rabbit-2.4.6.jar:2.4.6]
        at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1577) ~[spring-rabbit-2.4.6.jar:2.4.6]
        at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1568) ~[spring-rabbit-2.4.6.jar:2.4.6]
        at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1512) ~[spring-rabbit-2.4.6.jar:2.4.6]
        at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:993) ~[spring-rabbit-2.4.6.jar:2.4.6]
        at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:940) ~[spring-rabbit-2.4.6.jar:2.4.6]
        at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:84) ~[spring-rabbit-2.4.6.jar:2.4.6]
        at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1317) ~[spring-rabbit-2.4.6.jar:2.4.6]
        at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1223) ~[spring-rabbit-2.4.6.jar:2.4.6]
        at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]

org.camunda.bpm.engine.MismatchingMessageCorrelationException: Cannot correlate message 'msg_task10': No process definition or execution matches the parameters

Sry for this big explanation but i dont know who to properly explain my problem.

I thought i could use a message broker so that message will correlate when its time and not directly when receiving.

What is also not clear is that i do not know if i can run one rabbitmq instance creating 3 queues and that is good to go or do i need to start 3 separate instances of rabbitmq on 3 different ports so for every process there is one queue? Actually in a real life scenario i have 3 different PC’s and so everyone is starting his own rabbitmq instance but now on testing purposes i only use one pc as you can see on the terminal screens…

I tried it with async before and after and both but also correlation error.

Repo:
SebastianAT/camunda_springboot_rabbitmq (github.com)

Please, can anybody help me out.