Message correlation isn't working

Hi all,

I’m trying to correlate message from one receive task to another receive task but I’m getting the error even though Asynchronous Before is checked.

  • I have a message receive event i.e Document Received, having rabbitmq consumer and on Java Delegate of Document Received is checking if the received document is PDF then publish that document on image-converter-queue rabbitmq queue and machined-queue and correlate the message to a Splitter service task.

  • If the received document is NOT PDF then publish that document to pdf-image-thumbnailing-queue only and correlate the message to Thumbnailing receive task.

  • These Java Delegates are executing twice time

To Correlate the message I’m doing below steps:-

RabbitMQ Consumer which is actully Start Receive Event

@RabbitListener(bindings = @QueueBinding(value = @Queue(value = Constant.DOCUMENT_QUEUE, durable = "true"), exchange = @Exchange(value = Constant.EXCHANGE, type = Constant.EXCHANGE_TYPE, durable = "true"), key = "document-queue-routing-key"))
public void startWorkflow(DocumentRequest documentRequest) {
	// TODO: start the workflow

	//creating process workflow instance
	camunda.getRuntimeService().startProcessInstanceByKey(Constant.PROCESS_ENGINE_KEY,
			Variables.putValue(Constant.DOCUMENT_ID, documentRequest.getId())
					.putValue(Constant.IS_PDF, true)
					.putValue(Constant.SKIP_CV, true));

	
	//correlating message from start receive task
	camunda.getRuntimeService().createMessageCorrelation("document_received")
			.setVariable(Constant.DOCUMENT_ID, documentRequest.getId())
			.setVariable(Constant.IS_PDF, true)
			.setVariable(Constant.SKIP_CV, true).correlateExclusively();
}

Java Delegate of start received event i.e Document Received

@Override
	public void execute(DelegateExecution execution) throws Exception {
		// TODO DocumentHandler Delegate
		System.out.println("------------------------------------- Document Handler Started ----------------------------------");
		String documentId = (String) execution.getVariable(Constant.DOCUMENT_ID);
		boolean isPdf = (boolean) execution.getVariable(Constant.IS_PDF);

		DocumentRequest documentRequest = new DocumentRequest();
		documentRequest.setId(documentId);

		if(isPdf) {
			rabbitTemplate.convertAndSend(Constant.EXCHANGE, "pdf-to-image-converter-queue-routing-key", documentRequest);
		}
		else {
			rabbitTemplate.convertAndSend(Constant.EXCHANGE, "image-thumbnailing-queue-routing-key", documentRequest);
		}
		System.out.println("------------------------------------- Document Handler Finished ----------------------------------");
	}

While executing running activity (one small circle having number) is not showing.

Any suggestion or comments are welcome.

@Vinit_Kumar when you use parallel gateway, fork and join should be configured properly. In your model, you have forked after every activity but you didn’t used parallel gateway to join.

Hello @Vinit_Kumar,
The message for the thumbnail activitiy cannot be correlated simply because your workflow does not have a token waiting at this activity for this mesage; the only token at this point in time sits at the start event and waits for moving forward AFTER the delegate is executed.
You should morph this task to a service task and use an expression at the sequenceflow to let the engine move the token forward to the thumbnail task.
@aravindhrs: Even though proper joining after a fork is generally good practise, it will not solve the problem. This way he assures that after every task the response is saved.

Hi, @aravindhrs and @McAlm,

Thank you so much for your valuable response.

Correlation from Image Converter to Thumbnail is working now but from Thumbnail to Scanner correlation isn’t working.

This wait event is delaying my process so don’t want to use this.
Could you please tell me how to execute java delegates only once with received message task?

My Use Case is

My workflow should be activated when someone posts something on rabbitMQ.

For that, I have created one RabbitMQ consumer, so that consumer will give me one id based on that id I have to send an HTTP request and the response of that HTTP request I have to publish to rabbitMQ so that other receive task can consume that response and will able to do there further processing.

Hello @Vinit_Kumar,
A message will only be correlated successfully if your process instance is at the according message receive task waiting. If you send a message from the Thumbnailing task to trigger the Scanner task, it will not work because the token is still sitting at the Thumbnailing task. Within your process model you simply can proceed by using condition expressions at the sequenceflows.
JavaDelegates are always executed only once.

Thank you @McAlm,

It is cleared completely now. I changed in BPMN file and I have one rabbitMQ listener inside that listener I’m trying to create the instance of the workflow as mention below:-

    // creating process workflow instance
		camunda.getRuntimeService().startProcessInstanceByKey("vini_platform_worflow",
				Variables.putValue(Constant.DOCUMENT_ID, documentRequest.getId())
						.putValue("name", documentRequest.getName()).putValue(Constant.IS_PDF, false)
						.putValue(Constant.SKIP_COMPUTER_VISION, false));

		// correlating message from start receive task
		camunda.getRuntimeService().createMessageCorrelation(Constant.START_DOCUMENT_RECEIVED_MESSAGE)
				.setVariable(Constant.DOCUMENT_ID, documentRequest.getId())
				.setVariable(Constant.IS_PDF, false)
				.setVariable(Constant.SKIP_COMPUTER_VISION, false).correlateStartMessage();

And I’m getting the following error in RabbitMQ while creating the instance of the workflow.

org.camunda.bpm.engine.exception.NullValueException: no processes deployed with key 'vini_platform_worflow': processDefinition is null

I’m bit confused about how to create an instance of the workflow having multiple pools. I have attached my BPMN file here.

Thanks in advance for your valuable response.

regards,
Vinit

Hello @Vinit_Kumar,
Currently I‘m not able to take a closer look into your BPMN file. I can do this earliest next Monday.
If you use two pools within one bpmn file you have to make sure that both of them are flagged as „executable“. Otherwise the second one will be ignored by the engine.
Best, McAlm

Hi @McAlm,

I have made all pools are executable in my local BPMN file but still, I’m getting the same error.

org.camunda.bpm.engine.exception.NullValueException: no processes deployed with key 'vini_platform_worflow': processDefinition is null

Will have a look at it on Monday!

Hello @Vinit_Kumar,
Can you reattach your BPMN file here so that I can help you?

Best, Stefan

Hi @McAlm,

It’s working fine now but I have two issues:-

  1. I’m not able to get a small circle which shown the traverse history.
    As showing into below screenshot I want to do the same: imageedit_1_7250779490
  2. My tasks are processed very fast so can I save or maintain the history of every task into the database.

Thanks & Regards,
Vinit

Hello @Vinit_Kumar,

  1. Displaying the history in cockpit is an Enterprise-Feature only.
  2. Usually this is what users expect from Camunda :wink:
    The engine persists all the history data in the database (according to the configured history level) even in CommunityEdition. So if needed you can query the history data on your own, preferably by using the API.

@McAlm thanks for the response,

I was trying to get historical details using process instance id but I’m getting below error.

localhost/8080/history/detail?processInstanceId=90c1618c-ce0d-11e9-ad3f-a2b6ef732aeb 

Or else could you please tell me the table name where all the history level things getting stored.

Regards,
Vinit

Hi @Vinit_Kumar

If you’re using the Springboot distro the rest endpoint is

localhost:8080/rest/

If you’re using another distro the endpoint is:

localhost:8080/engine-rest/

So you should try the rest call in postman with
http://localhost:8080/engine-rest/history/detail
or
http://localhost:8080/rest/history/detail

Hope this helps.

Regards
Michael

Hello @MichiDahm,

Thank you so much for your quick response.

I’m using Springboot distro so it’s working for me using rest endpoint

localhost:8080/rest/

@MichiDahm is this fetching history from ACT_HI_DETAIL table or from some other table?

Regards,
Vinit Kumar