Camunda - correlation problem and scalability

Dear Members,

Hope I get some valuble guidance/suggestion from you on this topic, as always!

My Camunda Installation Details:

Process Engine Version: 7.14.0 (Community Edition)
Process Engine Deployment Mode: Spring Boot Microservice (MS) running on k8s cluster with 4 pods (Replica:4)
Database: MySQL (5.7.25-enterprise-commercial-advanced-log)
MS Tomcat Starting Args: args: [“cd /opt/uidapp/; java -Dlog4j2.formatMsgNoLookups=true -jar <jar_name>.jar --spring.config.location=/config/application.properties”]
resources:
limits:
cpu: “6000m”
memory: “10Gi”
requests:
cpu: “2000m”
memory: “8Gi”

Use Case:

  • I’m processing 100,000 applications per day (with an end goal of 1M applications per day)
  • Each application is spawning 2 workflow instances of two separate process definitions(BPMN attached)
  • All workflow instances have unique business keys
  • I’m using Camunda workflow as an application orchestrator
  • Camunda is using Kafka to interact with other microservices in a command-acknowledge pattern
  • One of my process instances (let’s call it parent) get invoked after consuming a Kafka message (initiator)
  • Once parent process instance has started, there are series of service tasks (java delegated) and receive tasks through which the command (service task) and acknowledgement (receive task) takes place (you may refer to the attached BPMN)
  • Each receive task is invoked through a Kafka message. The message correlation is done from Kafka consumer with anootation @KafkaListener added
  • History cleanup strategy as below (we are maintaining FULL histroy level - historyLevel:3 in ACT_GE_PROPERTY)-
    camunda.bpm.generic-properties.properties.history-cleanup-strategy=removalTimeBased
    camunda.bpm.generic-properties.properties.history-cleanup-batch-window-start-time=01:00
    camunda.bpm.generic-properties.properties.history-cleanup-batch-window-end-time=00:00

==============================================================================================
Typical delegate code:

@Component
@Slf4j
public class SomeClass implements JavaDelegate {
@Autowired
KafkaProducerService kafkaProducerService;

@Override
public void execute(DelegateExecution execution) throws Exception {
try{
Packet pt = new Packet();
//all setters
kafkaProducerService.publishKafkaEvent(pt, <kafka_message_name);
}
catch(Exception e) {
e.printStackTrace();
}
}
}

Typical Receive task code (Kafka Consumer)

@Component
@Slf4j
public class SomeClass {
@KafkaListener(topics = <some_topic>)
public void onMessage(ConsumerRecord<Integer, String> consumerRecord, Acknowledgment acknowledgment)
throws JsonProcessingException {
try{

//Construct the unique business key from the kafka message payload that will be used for message correlation

List procInsList = processEngine.getRuntimeService().createProcessInstanceQuery()
.processInstanceBusinessKey(parentBusinessKey)
.active()
.list();
if (!procInsList.isEmpty())
for (ProcessInstance ins : procInsList) {
log.info(“Running process Instances for Parent business key:” + parentBusinessKey + “>>>>”
+ ins.getId());
List subscriptionList = processEngine.getRuntimeService()
.createEventSubscriptionQuery()
.processInstanceId(ins.getId())
.eventType(“message”)
.list();
for (EventSubscription subscription : subscriptionList) {
log.info(“Subscription Name:” + subscription.getEventName()
+ “Subscription Execution ID:”+ subscription.getExecutionId());
if (subscription.getEventName().equalsIgnoreCase(<receive_task_message_name>)) {
isParentSubscriptionReady = true;
log.info(“Parent process is ready for correlation…”);
}
}
}

if (isParentSubscriptionReady){
log.info(“Message is ready for correlation”);
MessageCorrelationResult messageCorrelationResultParent = processEngine.getRuntimeService()
.createMessageCorrelation(<receive_task_message_name>)
.processInstanceBusinessKey(parentBusinessKey)
.setVariable(<var_name>, )
.correlateWithResult();
if(messageCorrelationResultParent != null)
log.info(“Message successfully correlated”);
}
else log.info(“No parent workflow with business key:” + parentBusinessKey
+ " is running and hence could not be correlated…retrying…");

}
catch(Exception e){
e.printStackTrace();
}
}

Issues observed:

  1. Frequent correlation failure - (isParentSubscriptionReady is false and log message in else block is printed) - meaning the token is not present in the receive task at the time of correlation, as per subscription query
  2. Significant time is taken to fetch the subscriptions of a running process instance (2-5 seconds)
  3. Correlation itself is taking time (2-5 seconds) - Time between two log entries before and after the correlation can prove that
  4. Due to this observed lag at Camunda end, my Kafka is currently under-utilized (not able to match Kafka throughput with Camunda). Seeing kafla lags and sometime rebalancing
  5. On a different note, ending a running process instance is also taking time in seconds (2 seconds at least)

Questions:

  1. Is Community Edition a best choice to run such workload? Is there any benchmarking available for Community Edition in terms of scalability?
  2. Is there anything fundamentally wrong in the above use case design? Any glaring shortcoming that, when corrected, may help address scalability
  3. Any other suggestion/guidance?

Thank you very much,
Soumik Ray

Child.bpmn (17.1 KB)
Parent.bpmn (10.4 KB)

Hi @mtq0205

Can you please form this post correctly so that it’s easier to read. :slight_smile:
Thanks.

//EDIT to the original post as per guideline//

Dear Members,

Hope I get some valauble guidance/suggestion from you on this topic, as always!

My Camunda Installation Details

Process Engine Version: 7.14.0 (Community Edition)
Process Engine Deployment Mode: Spring Boot Microservice (MS) running on k8s cluster with 4 pods (Replica:4)
Database: MySQL (5.7.25-enterprise-commercial-advanced-log)
MS Tomcat Starting Args: args: [“cd /opt/uidapp/; java -Dlog4j2.formatMsgNoLookups=true -jar <jar_name>.jar --spring.config.location=/config/application.properties”]
resources:
limits:
cpu: “6000m”
memory: “10Gi”
requests:
cpu: “2000m”
memory: “8Gi”

Use Case:

  • I’m processing 100,000 applications per day (with an end goal of 1M applications per day)
  • Each application is spawning 2 workflow instances of two separate process definitions(BPMN attached)
  • All workflow instances have unique business keys
  • I’m using Camunda workflow as an application orchestrator
  • Camunda is using Kafka to interact with other microservices in a command-acknowledge pattern
  • One of my process instances (let’s call it parent) get invoked after consuming a Kafka message (initiator)
  • Once parent process instance has started, there are series of service tasks (java delegated) and receive tasks through which the command (service task) and acknowledgement (receive task) takes place (you may refer to the attached BPMN)
  • Each receive task is invoked through a Kafka message. The message correlation is done from Kafka consumer with anootation @KafkaListener added
  • History cleanup strategy as below (we are maintaining FULL histroy level - historyLevel:3 in ACT_GE_PROPERTY)-
    camunda.bpm.generic-properties.properties.history-cleanup-strategy=removalTimeBased
    camunda.bpm.generic-properties.properties.history-cleanup-batch-window-start-time=01:00
    camunda.bpm.generic-properties.properties.history-cleanup-batch-window-end-time=00:00

Typical delegate code:

@Component
@Slf4j
public class SomeClass implements JavaDelegate {
@Autowired
KafkaProducerService kafkaProducerService;
//Autowired declarations.......
@Override
	public void execute(DelegateExecution execution) throws Exception {
		try{
			Packet pt = new Packet();
			//all setters
            pt.setRefId(execution.getVariable("refId").toString());
            //........
			kafkaProducerService.publishKafkaEvent(pt, <kafka_message_name);
		}
		catch(Exception e) {
			e.printStackTrace();
		}
	}
}

Typical Receive task code (Kafka Consumer)

@Component
@Slf4j
public class SomeClass {
@KafkaListener(topics = <some_topic>)
public void onMessage(ConsumerRecord<Integer, String> consumerRecord, Acknowledgment acknowledgment)
			throws JsonProcessingException {
		try{
		//.......
		//Construct the unique business key from the kafka message payload that will be used for message correlation
		//......
		List<ProcessInstance> procInsList = processEngine.getRuntimeService().createProcessInstanceQuery()
						.processInstanceBusinessKey(parentBusinessKey)
						.active()
						.list();
				if (!procInsList.isEmpty())
					for (ProcessInstance ins : procInsList) {
						log.info("Running process Instances for Parent business key:" + parentBusinessKey + ">>>>"
								+ ins.getId());
						List<EventSubscription> subscriptionList = processEngine.getRuntimeService()
								.createEventSubscriptionQuery()
								.processInstanceId(ins.getId())
								.eventType("message")
								.list();
						for (EventSubscription subscription : subscriptionList) {
							log.info("Subscription Name:" + subscription.getEventName()
									+ "Subscription Execution ID:"+ subscription.getExecutionId());
							if (subscription.getEventName().equalsIgnoreCase(<receive_task_message_name>)) {
								isParentSubscriptionReady = true;
								log.info("Parent process is ready for correlation...");
							}
						}
					}
					//........
					if (isParentSubscriptionReady){
						log.info("Message is ready for correlation");
						MessageCorrelationResult messageCorrelationResultParent = processEngine.getRuntimeService()
								.createMessageCorrelation(<receive_task_message_name>)
								.processInstanceBusinessKey(parentBusinessKey)
								.setVariable(<var_name>, <value>)
								.correlateWithResult();
						if(messageCorrelationResultParent != null)
							log.info("Message successfully correlated");
					}
					else log.info("No parent workflow with business key:" + parentBusinessKey
								+ " is running and hence could not be correlated...retrying...");
					//................
		}
		catch(Exception e){
			e.printStackTrace();
		}
}

Issues observed:

  • Frequent correlation failure - (isParentSubscriptionReady is false and log message in else block is printed) - meaning the token is not present in the receive task at the time of correlation, as per subscription query
  • Significant time is taken to fetch the subscriptions of a running process instance (2-5 seconds)
  • Correlation itself is taking time (2-5 seconds) - Time between two log entries before and after the correlation can prove that
  • Due to this observed lag at Camunda end, my Kafka is currently under-utilized (not able to match Kafka throughput with Camunda). Seeing kafla lags and sometime rebalancing
  • On a different note, ending a running process instance is also taking time in seconds (2 seconds at least)

Questions:

  • Is Community Edition a best choice to run such workload? Is there any benchmarking available for Community Edition in terms of scalability?
  • Is there anything fundamentally wrong in the above use case design? Any glaring shortcoming that, when corrected, may help address scalability
  • Any other suggestion/guidance?

Thank you very much,
Soumik Ray

1 Like

Did you manage to solve the root cause of the issue?

Hi @mtq0205,

Below post might be of help to you