I have deployed a bpmn process via desktop modeler(file attached).
Process has to start with a message start event.
What I am trying to achieve:
I am trying to model a workflow wherein a message arrives in MQTT broker, and it is processed.
I have written a MQTT client in java. For continuous listening to mqtt broker, it executes a continuous loop. I understand that a camunda task has to complete so that next task can be triggered. Hence, I can’t capture my mqtt client in a service task. So I have planned that on receival of a message in mqtt client, I would create an instance of the process, passing my message contents to it. Going through camunda documents, I found that a message start event is most suitable for such a use-case. So in my process definition(file attached), process has a message start event, followed by a service task to process the message content.
When I try to run the process after providing json message in modeler, I get error:
Command ‘CREATE’ rejected with code ‘INVALID_STATE’: Expected to create instance of process with none start event, but there is no such event [ start-instance-error ].
What am I missing in my setup? Also, is my proposed approach correct one for my use-case?
iot.bpmn (3.3 KB)
Your flow starts with a message start event.That can not start by the Modeler.
You can start the flow by a client code.
Hello @mghildiy ,
If you are using a Message Start Event, you need to start the event by sending a message through the API or Zeebe client.
For example, in the code snippet using the Zeebe client, you can either pass the complete message if its size is less than 4MB (it is recommended to keep the process instance lightweight), or you can store the message in an external database and send only the UUID or any unique key. This code can be placed in your actual message listener.
@PostMapping("/sendmessage")
public PublishMessageResponse sendMessage() {
Map<String, String> vars = new HashMap<>();
vars.put("message", "This is a test message");
vars.put("UUID", UUID.randomUUID().toString());
PublishMessageResponse messageCommand = client.newPublishMessageCommand()
.messageName("iccc_mqtt_event")
.correlationKey("UUID")
.variables(vars)
.timeToLive(Duration.ofMinutes(30))
.send()
.join();
return messageCommand;
}
If the above code is successful then you process instance will start. After than you can handle your business logic though Service Task.
@Service
@Slf4j
public class IoTMqttProcessor {
@JobWorker(name = "ioTMqttProcessor")
public void ioTMqttProcessor(final JobClient client, final ActivatedJob job) {
Map<String, Object> variablesAsMap = job.getVariablesAsMap();
//Example to print message uuid or message
log.info("Message UUID : {}",variablesAsMap.get("UUID"));
log.info("Message Body : {}",variablesAsMap.get("message"));
variablesAsMap.put("Message Processed", true);
//TO DO your message processing logic etc
//Complete the activity and move forward if all as expected
client.newCompleteCommand(job.getKey())
.variables(variablesAsMap)
.send();
}
}
Hope it helps, also read about message correlation and Camunda 8 API for more details.
1 Like
Yup, it helps @Kaveri_Govindasamy. Thanks.