Integrating AMQP with Camunda

Hi Everyone,
I am working on implementing AMQP(KAFKA) with Camunda, where I would be getting huge number of messages from KAFKA and I would use camunda to process each message and do some process automation on each message body.
Earlier i have implemented the above scenario with a different approach, in this approach, there are two bpmn’s.
The first Bpmn connects to the KAFKA Topic based on the configuration that’s provided within a thread, and this process doesn’t end.


Inside the Fetch Service task, a thread is created and connection to KAFKA is established, and that thread is responsible to send the data to a child Process using createMessageCorrelation, where I have Message Start task.

 runtimeService
          .createMessageCorrelation("listeningToTopic")
          .setVariable("rootId", delegateExecution.getId())
          .setVariable("assetIdMfi", id.toString())
          .setVariable("anomalyIndex", anomalyIndex)
          .setVariable("anomalyValue", anomalyValue)
          .correlateExclusively();

Now this approach isnt feasible if I am having 500/1000 messages from my TOPIC, Camunda is taking some time to process the 500 messages. While I was checking out existing examples, to get this implemented, I came across one of the example by Bernd Ruecker,

In this example of Bernd, When the execution arrives at ShipGoods, we are ending the process as soon as the 5 minutes timer is over. My question here is,

  1. Is the connection to AMQP still alive?
  2. How are we going to correlate with Receive task for the second time if the connection to AMQP is still alive, when the new data from the Queue comes in, since the process has ended, and the previous process instance no longer exists, how can we corelate with the receive task.
  3. Will this approach work for large set of data coming every minute.

Thanks
-Warm Regards

Hi Maka.

Inside the Fetch Service task, a thread is created and connection to KAFKA is established, and that thread is responsible to send the data to a child Process using createMessageCorrelation, where I have Message Start task.

If I understand this correctly you opened up the connection from within a delegate attached to the BPMN process? This does not make sense from my perspective, you want to have the receiving part (independant of if it is Kafka or AMQP) as its own listener build with Java/Spring mechanisms. When receiving one message (or Kafka record) you might correlate that to a waiting process instance (or start a new one).

In this example of Bernd, When the execution arrives at ShipGoods, we are ending the process as soon as the 5 minutes timer is over.

To be precise, the 5 minutes timer is AFTER the “goods shipped” message arrived and was correlated. It is simply there to show off the capability of the workflow engine and how to test it.

  • Is the connection to AMQP still alive?

This is completly independant of the workflow engine and the process instance, so: yes. You can see this here: camunda-7-springboot-amqp-microservice-cloud-example/src/main/java/com/camunda/demo/springboot/adapter/AmqpReceiver.java at master · berndruecker/camunda-7-springboot-amqp-microservice-cloud-example · GitHub. This code could also simply do a sysout on a new message, so it is not dependant on Camunda at all.

  • How are we going to correlate with Receive task for the second time if the connection to AMQP is still alive, when the new data from the Queue comes in, since the process has ended, and the previous process instance no longer exists, how can we corelate with the receive task.

You can’t. The message was already correlated, so the process is no longer waiting for it. But why do you get multiple message instances for one process instance?
Do you ship the goods for the same order multiple times? No.
You need to look at an instance level (I know this can be confusing - I hope this super short explanation helps a bit)

  • Will this approach work for large set of data coming every minute.

Always depends :slight_smile: What is large? What do you need to do with these messages? Can you scale your environment? …
We always recommend to do a proper load test with a realistic load in an environment close to where you will later on operate.
So far, we always got it to work :slight_smile:

Best
Bernd

I guess this answers my question, I will try it out Bernd.

Hey Bernd,
I am using Kafka instead of Rabbit, and instead of starting my task via REST API, I am starting the process via TaskList. My Workflow looks like this:


After starting the task:

When I start it:
The execution moves to Receive Task, I don’t have a send task, like ShipGoods, in your case, i am using command line, to push data into a kafka topic, and Kafka is listening to the topic, and inside the listener method i am trying to corelate to the receive task topic.

  @Service
  public class KafkaConsumer {
 @Autowired
  private ProcessEngine camunda;
 private final Logger LOG = LoggerFactory
  .getLogger(KafkaConsumer.class);

@KafkaListener(topics = "anomaly-topic", groupId = "group_id")
void listener(String message) {
LOG.info("Listener [{}]", message);
corelateMessage(message);

 }
public void corelateMessage(String message)
{camunda.getRuntimeService().createMessageCorrelation("AnomalyTopicMessage").setVariable("mymessage", message).correlateWithResult();
}
}

When I am publishing to kafka topic, i am getting the following exception, can you tell me where I am going wrong.

  020-10-23 13:40:59.067 ERROR 16456 --- [ntainer#0-0-C-1] o.s.kafka.listener.LoggingErrorHandler   : Error while processing: ConsumerRecord(topic = anomaly-topic, partition = 0, leaderEpoch = 0, offset = 22, CreateTime = 1603440657902, serialized key size = -1, serialized value size = 5, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = hello)

org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'void com.hcc.hpi.services.kafka.KafkaConsumer.listener(java.lang.String)' threw exception; nested exception is org.camunda.bpm.engine.MismatchingMessageCorrelationException: Cannot correlate message 'AnomalyTopicMessage': No process definition or execution matches the parameters; nested exception is org.camunda.bpm.engine.MismatchingMessageCorrelationException: Cannot correlate message 'AnomalyTopicMessage': No process definition or execution matches the parameters
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:1716) [spring-kafka-2.3.4.RELEASE.jar:2.3.4.RELEASE]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeErrorHandler(KafkaMessageListenerContainer.java:1708) [spring-kafka-2.3.4.RELEASE.jar:2.3.4.RELEASE]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1621) [spring-kafka-2.3.4.RELEASE.jar:2.3.4.RELEASE]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1549) [spring-kafka-2.3.4.RELEASE.jar:2.3.4.RELEASE]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1457) [spring-kafka-2.3.4.RELEASE.jar:2.3.4.RELEASE]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1207) [spring-kafka-2.3.4.RELEASE.jar:2.3.4.RELEASE]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:966) [spring-kafka-2.3.4.RELEASE.jar:2.3.4.RELEASE]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:901) [spring-kafka-2.3.4.RELEASE.jar:2.3.4.RELEASE]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_151]
	at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_151]
	at java.lang.Thread.run(Thread.java:748) [na:1.8.0_151] 

Regards
-Maka