Hi I’m trying to add kafka connector as start event for my event subprocess but unfortunately I’m getting below issue
Start event of event subprocess must be of type ‘error’, ‘message’, ‘timer’, ‘signal’, ‘compensation’ or ‘escalation.
Is there a way to add kafka connector as a start event for subprocess?
Hi,
Currently, the Kafka Connector for start events is based on a NoneStart event. You can see this in the corresponding element template:
[...]
"appliesTo": [
"bpmn:StartEvent"
],
[...]
Since NoneStartEvents cannot be used to initiate an event subprocess, you cannot use the Kafka Connector.
However, the Kafka Connector for intermediate events can handle message events. It may be possible to adapt the corresponding element template to work with message start events of event subprocesses. But, be careful: The element template cannot distinguish between message start events of event subprocesses and normal processes, yet it will only work for the former.
Hi, I appreciate your reply. What do I need to use as a start event to listen to my kafka if I want to utilise the kafka intermediate connector for event subprocesses? Whenever a message arrives in Kafka, how token will fall to subprocess
You would need to change the element template, so you can apply it to the subprocess’s start event.
Read more about element templates in the documentation:
I don’t know the details of the connectors’ implementation – so there is no guarantee that it will work.
Hi @prdv6 was this answer helpful? If there are challeneges with the teomplet, maybe you could share what you have so far.
You may also find this helpful: https://github.com/camunda/connectors/tree/main/connectors/kafka/element-templates
Best regards
Rob
Hi @prdv6 and @rob2universe,
As explained above, I’d try to use the template for the intermediate Kafka connector to build a new template that is compatible with event subprocesses. The element template looks like this:
{
"$schema": "https://unpkg.com/@camunda/zeebe-element-templates-json-schema/resources/schema.json",
"name": "kafka-inbound-connector-eventsubprocess",
"id": "00500279-640a-4327-aced-5f67199b8205",
"version": 1,
"description": "Consume Kafka messages",
"documentationRef": "https://docs.camunda.io/docs/components/connectors/out-of-the-box-connectors/kafka-inbound/",
"category": {
"id": "connectors",
"name": "Connectors"
},
"appliesTo": [
"bpmn:StartEvent"
],
"elementType": {
"value": "bpmn:StartEvent",
"eventDefinition": "bpmn:MessageEventDefinition"
},
"groups": [
{
"id": "authentication",
"label": "Authentication"
},
{
"id": "kafka",
"label": "Kafka"
},
{
"id": "activation",
"label": "Activation"
},
{
"id": "variable-mapping",
"label": "Variable mapping"
}
],
"properties": [
{
"type": "Hidden",
"value": "io.camunda:connector-kafka-inbound:1",
"binding": {
"type": "zeebe:property",
"name": "inbound.type"
}
},
{
"type": "Hidden",
"generatedValue": {
"type": "uuid"
},
"binding": {
"type": "bpmn:Message#property",
"name": "name"
}
},
{
"label": "Authentication type",
"description": "Username/password or custom",
"id": "authenticationType",
"group": "authentication",
"type": "Dropdown",
"value": "credentials",
"choices": [
{
"name": "Credentials",
"value": "credentials"
},
{
"name": "Custom",
"value": "custom"
}
],
"binding": {
"type": "zeebe:property",
"name": "authenticationType"
}
},
{
"label": "Username",
"description": "Provide the username (must have permissions to produce message to the topic)",
"group": "authentication",
"type": "String",
"feel": "optional",
"optional": true,
"binding": {
"type": "zeebe:property",
"name": "authentication.username"
},
"condition": {
"property": "authenticationType",
"equals": "credentials"
}
},
{
"label": "Password",
"description": "Provide a password for the user",
"group": "authentication",
"type": "String",
"feel": "optional",
"optional": true,
"binding": {
"type": "zeebe:property",
"name": "authentication.password"
},
"condition": {
"property": "authenticationType",
"equals": "credentials"
}
},
{
"label": "Bootstrap servers",
"description": "Provide bootstrap server(s), comma-delimited if there are multiple",
"group": "kafka",
"type": "String",
"feel": "optional",
"binding": {
"type": "zeebe:property",
"name": "topic.bootstrapServers"
},
"constraints": {
"notEmpty": true
}
},
{
"label": "Topic",
"description": "Provide the topic name",
"group": "kafka",
"type": "String",
"feel": "optional",
"binding": {
"type": "zeebe:property",
"name": "topic.topicName"
},
"constraints": {
"notEmpty": true
}
},
{
"label": "Consumer Group ID",
"description": "Provide the consumer group ID used by the connector. Leave empty for an automatically generated one",
"group": "kafka",
"type": "String",
"feel": "optional",
"optional": true,
"binding": {
"type": "zeebe:property",
"name": "groupId"
},
"constraints": {
"notEmpty": false,
"maxLength": 250
}
},
{
"label": "Additional properties",
"description": "Provide additional Kafka consumer properties in JSON",
"group": "kafka",
"type": "String",
"optional": true,
"feel": "required",
"binding": {
"type": "zeebe:property",
"name": "additionalProperties"
}
},
{
"label": "Offsets",
"description": "List of offsets, e.g. '10' or '=[10, 23]'. If specified, it has to have the same number of values as the number of partitions",
"group": "kafka",
"type": "String",
"feel": "optional",
"optional": true,
"binding": {
"type": "zeebe:property",
"name": "offsets"
}
},
{
"label": "Auto offset reset",
"description": "What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server. You should only select none if you specified the offsets",
"id": "autoOffsetReset",
"group": "kafka",
"type": "Dropdown",
"value": "latest",
"choices": [
{
"name": "Latest",
"value": "latest"
},
{
"name": "Earliest",
"value": "earliest"
},
{
"name": "None",
"value": "none"
}
],
"binding": {
"type": "zeebe:property",
"name": "autoOffsetReset"
}
},
{
"label": "Correlation key (process)",
"type": "String",
"group": "activation",
"feel": "required",
"description": "Sets up the correlation key from process variables",
"binding": {
"type": "bpmn:Message#zeebe:subscription#property",
"name": "correlationKey"
},
"constraints": {
"notEmpty": true
}
},
{
"label": "Correlation key (payload)",
"type": "String",
"group": "activation",
"feel": "required",
"binding": {
"type": "zeebe:property",
"name": "correlationKeyExpression"
},
"description": "Extracts the correlation key from the incoming message payload",
"constraints": {
"notEmpty": true
}
},
{
"label": "Activation condition",
"type": "String",
"group": "activation",
"feel": "required",
"optional": true,
"binding": {
"type": "zeebe:property",
"name": "activationCondition"
},
"description": "Condition under which the Connector triggers. Leave empty to catch all events"
},
{
"label": "Result variable",
"type": "String",
"group": "variable-mapping",
"optional": true,
"binding": {
"type": "zeebe:property",
"name": "resultVariable"
},
"description": "Name of variable to store the result of the connector in"
},
{
"label": "Result expression",
"description": "Expression to map the inbound payload to process variables",
"group": "variable-mapping",
"type": "Text",
"feel": "required",
"binding": {
"type": "zeebe:property",
"name": "resultExpression"
}
}
],
"icon": {
"contents": "data:image/svg+xml;utf8,%3Csvg width='18' height='18' viewBox='0 0 256 416' xmlns='http://www.w3.org/2000/svg' preserveAspectRatio='xMidYMid'%3E%3Cpath d='M201.816 230.216c-16.186 0-30.697 7.171-40.634 18.461l-25.463-18.026c2.703-7.442 4.255-15.433 4.255-23.797 0-8.219-1.498-16.076-4.112-23.408l25.406-17.835c9.936 11.233 24.409 18.365 40.548 18.365 29.875 0 54.184-24.305 54.184-54.184 0-29.879-24.309-54.184-54.184-54.184-29.875 0-54.184 24.305-54.184 54.184 0 5.348.808 10.505 2.258 15.389l-25.423 17.844c-10.62-13.175-25.911-22.374-43.333-25.182v-30.64c24.544-5.155 43.037-26.962 43.037-53.019C124.171 24.305 99.862 0 69.987 0 40.112 0 15.803 24.305 15.803 54.184c0 25.708 18.014 47.246 42.067 52.769v31.038C25.044 143.753 0 172.401 0 206.854c0 34.621 25.292 63.374 58.355 68.94v32.774c-24.299 5.341-42.552 27.011-42.552 52.894 0 29.879 24.309 54.184 54.184 54.184 29.875 0 54.184-24.305 54.184-54.184 0-25.883-18.253-47.553-42.552-52.894v-32.775a69.965 69.965 0 0 0 42.6-24.776l25.633 18.143c-1.423 4.84-2.22 9.946-2.22 15.24 0 29.879 24.309 54.184 54.184 54.184 29.875 0 54.184-24.305 54.184-54.184 0-29.879-24.309-54.184-54.184-54.184zm0-126.695c14.487 0 26.27 11.788 26.27 26.271s-11.783 26.27-26.27 26.27-26.27-11.787-26.27-26.27c0-14.483 11.783-26.271 26.27-26.271zm-158.1-49.337c0-14.483 11.784-26.27 26.271-26.27s26.27 11.787 26.27 26.27c0 14.483-11.783 26.27-26.27 26.27s-26.271-11.787-26.271-26.27zm52.541 307.278c0 14.483-11.783 26.27-26.27 26.27s-26.271-11.787-26.271-26.27c0-14.483 11.784-26.27 26.271-26.27s26.27 11.787 26.27 26.27zm-26.272-117.97c-20.205 0-36.642-16.434-36.642-36.638 0-20.205 16.437-36.642 36.642-36.642 20.204 0 36.641 16.437 36.641 36.642 0 20.204-16.437 36.638-36.641 36.638zm131.831 67.179c-14.487 0-26.27-11.788-26.27-26.271s11.783-26.27 26.27-26.27 26.27 11.787 26.27 26.27c0 14.483-11.783 26.271-26.27 26.271z' style='fill:%23231f20'/%3E%3C/svg%3E"
}
}
I can use this template for message for Event Subprocesses and deploy the model. (See example below). However, I did not test whether it works with Kafka.
Here is my minimal process:
kafka-connector-test.bpmn (7.1 KB)
Regards,
Stephan