Camunda 8 web modeler interaction with NATS custom inbound connector

We are trying to create a pub sub connector with NATS , are issue arises with the sub (pub is working) , when we deploy the template.json for inbound and use a start event for modeling , it doesnt listen to the topic and hence stays dormant

import nats  # NATS Python client for subscribing to messages from NATS
import asyncio
from python_camunda_sdk import InboundConnector
from python_camunda_sdk.connectors import ConnectorConfig
from pydantic import Field
import json


class NATSInboundConnector(InboundConnector):
    """NATS Inbound Connector to subscribe and receive messages from a NATS topic."""

    # Metadata for the connector
    class ConnectorConfig:
        name = "NATS Inbound Connector"  # Connector name shown in Camunda Modeler
        type = "nats_inbound"            # Connector type used internally

    # Define configuration fields for the connector
    nats_url: str = Field(
        default="nats://ec2-54-163-138-108.compute-1.amazonaws.com:4222",
        description="The URL of the NATS server.",
        group="config")  # NATS URL field in configuration

    topic: str = Field(
        description="The topic to subscribe to.",
        group="config")  # Topic field in configuration

    polling_interval: str = Field(
        default="10",
        description="The polling interval in seconds.",
        group="config")  # Polling interval field in configuration

    async def run(self) -> dict:
        result = {}
        client = None
        try:
            # Log the connection details
            print(
                f"Connecting to NATS server at: {self.nats_url} for topic: {self.topic}...")
            client = await nats.connect(self.nats_url)
            print(
                f"Connected to NATS server. Subscribed to topic: {self.topic}")

            # Define the message handler
            async def message_handler(msg):
                message_data = msg.data.decode()
                print(f"Received message: {message_data}")

                if not message_data.strip():
                    print("Error: Received an empty message.")
                    result["error"] = "Received an empty message"
                    return

                # Parse the incoming message as JSON
                try:
                    message_json = json.loads(message_data)
                    result["received_message"] = message_json
                    print(f"Message received: {result['received_message']}")
                except json.JSONDecodeError as e:
                    print(f"Error decoding message: {e}")
                    result["error"] = "Invalid message format"
                    return

            # Subscribe to the NATS topic
            await client.subscribe(self.topic, cb=message_handler)

            # Polling loop to simulate long polling
            while True:
                print(f"Polling every {self.polling_interval} seconds...")
                await asyncio.sleep(int(self.polling_interval))
        except Exception as e:
            print(f"Error occurred while subscribing: {e}")
            result["error"] = str(e)
        finally:
            print("Shutting down subscriber...")
            if client and client.is_connected:
                await client.close()
            print("NATS connection closed.")

        return result

and its corresponding json is

{
  "$schema": "https://unpkg.com/@camunda/zeebe-element-templates-json-schema@0.9.0/resources/schema.json",
  "name": "NATS Inbound Connector",
  "id": "9fe6053e-cc29-467e-9d9a-319d53340179",
  "appliesTo": [
    "bpmn:ServiceTask"
  ],
  "properties": [
    {
      "binding": {
        "type": "zeebe:input",
        "name": "nats_url"
      },
      "label": "The URL of the NATS server.",
      "type": "String",
      "group": "input",
      "feel": "optional"
    },
    {
      "binding": {
        "type": "zeebe:input",
        "name": "topic"
      },
      "label": "The topic to subscribe to.",
      "type": "String",
      "group": "input",
      "feel": "optional"
    },
    {
      "binding": {
        "type": "zeebe:input",
        "name": "polling_interval"
      },
      "label": "The polling interval in seconds.",
      "type": "String",
      "group": "input",
      "feel": "optional"
    },
    {
      "binding": {
        "type": "zeebe:taskHeader",
        "key": "resultVariable"
      },
      "label": "Result variable",
      "type": "String",
      "group": "output"
    },
    {
      "binding": {
        "type": "zeebe:taskDefinition:type"
      },
      "type": "Hidden",
      "value": "nats_inbound"
    },
    {
      "binding": {
        "type": "zeebe:input",
        "name": "correlation_key"
      },
      "label": "Correlation key",
      "type": "String",
      "group": "config",
      "feel": "optional"
    },
    {
      "binding": {
        "type": "zeebe:input",
        "name": "message_name"
      },
      "label": "Message name",
      "type": "String",
      "group": "config",
      "feel": "optional"
    }
  ],
  "groups": [
    {
      "id": "input",
      "label": "Input"
    },
    {
      "id": "output",
      "label": "Output"
    },
    {
      "id": "config",
      "label": "Configuration"
    }
  ]
}

Hi @Muhammad_Arsalan , let’s see if I can help!

Here are a few troubleshooting ideas:

  • Are you seeing any logs that contain “Connecting to NATS server at:”? This would help confirm the Connector is being deployed correctly.
  • Can you check the NATS server to see if your Connector is subscribed as expected?
  • I notice your template JSON uses a deprecated binding:
"binding": {
        "type": "zeebe:taskDefinition:type"
}

This should be replaced with:

"binding": {
    "type": "zeebe:taskDefinition",
    "property": "type"
}

Let us know if any of these ideas help you find the issue!

2 Likes

Hi , thanks for your reply

i am getting the following error , i have attached my json file , kindly let me know if you have used a similar connector for inbound subs for NATS

Command ‘CREATE’ rejected with code ‘INVALID_ARGUMENT’: Expected to deploy new resources, but encountered the following errors:
‘test_nats_inbound.bpmn’: - Element: Event_1qhmocz > extensionElements > taskDefinition
- ERROR: Expected expression but not found.
[ deploy-error ] 27/9/2024 17:18:14

"properties": [
    {
      "binding": {
        "type": "zeebe:input",
        "name": "nats_url"
      },
      "label": "The URL of the NATS server.",
      "type": "String",
      "group": "input",
      "feel": "optional"
    },
    {
      "binding": {
        "type": "zeebe:input",
        "name": "topic"
      },
      "label": "The topic to subscribe to.",
      "type": "String",
      "group": "input",
      "feel": "optional"
    },
    {
      "binding": {
        "type": "zeebe:input",
        "name": "polling_interval"
      },
      "label": "The polling interval in seconds.",
      "type": "String",
      "group": "input",
      "feel": "optional"
    },
    {
      "binding": {
        "type": "zeebe:taskHeader",
        "key": "resultVariable"
      },
      "label": "Result variable",
      "type": "String",
      "group": "output"
    },
    {
          "type": "Hidden",
          "binding": {
            "type": "zeebe:taskDefinition",
            "property": "type"
          }
    },
    {
      "binding": {
        "type": "zeebe:input",
        "name": "correlation_key"
      },
      "label": "Correlation key",
      "type": "String",
      "group": "config",
      "feel": "optional"
    },
    {
      "binding": {
        "type": "zeebe:input",
        "name": "message_name"
      },
      "label": "Message name",
      "type": "String",
      "group": "config",
      "feel": "optional"
    }
  ],
  "groups": [
    {
      "id": "input",
      "label": "Input"
    },
    {
      "id": "output",
      "label": "Output"
    },
    {
      "id": "config",
      "label": "Configuration"
    }
 ]

It looks like

{
          "type": "Hidden",
          "binding": {
            "type": "zeebe:taskDefinition",
            "property": "type"
          }
},

is missing the "value": "nats_inbound" entry the original template had. That might explain the “Expected expression but not found.” error.

I don’t have experience with NATS or NATS Connectors, but I’ll try to help troubleshoot! :smile:

Its deployed successfully but the issue of subscriber not triggering the flow remains intact , can you review my xml

<?xml version="1.0" encoding="UTF-8"?>
<bpmn:definitions xmlns:bpmn="http://www.omg.org/spec/BPMN/20100524/MODEL" xmlns:bpmndi="http://www.omg.org/spec/BPMN/20100524/DI" xmlns:modeler="http://camunda.org/schema/modeler/1.0" xmlns:dc="http://www.omg.org/spec/DD/20100524/DC" xmlns:zeebe="http://camunda.org/schema/zeebe/1.0" xmlns:di="http://www.omg.org/spec/DD/20100524/DI" id="Definitions_1" targetNamespace="http://bpmn.io/schema/bpmn" exporter="Camunda Web Modeler" exporterVersion="24b1c55" modeler:executionPlatform="Camunda Cloud" modeler:executionPlatformVersion="8.5.0">
  <bpmn:process id="Process_1eyrze2" name="test_nats_inbound" isExecutable="true">
    <bpmn:endEvent id="Event_1chms07" name="end">
      <bpmn:incoming>Flow_1r5x731</bpmn:incoming>
    </bpmn:endEvent>
    <bpmn:startEvent id="Event_0ddyuiw" name="start" zeebe:modelerTemplate="79ad73ed-78e7-44fd-8d32-a7bc29ef0d0c" zeebe:modelerTemplateVersion="1727440835276">
      <bpmn:extensionElements>
        <zeebe:taskDefinition type="nats_inbound" />
        <zeebe:ioMapping>
          <zeebe:input source="&#34;natsurl.com&#34;" target="nats_url" />
          <zeebe:input source="&#34;natslistner&#34;" target="topic" />
          <zeebe:input source="&#34;5&#34;" target="polling_interval" />
          <zeebe:input source="test" target="correlation_key" />
          <zeebe:input source="test" target="message_name" />
        </zeebe:ioMapping>
        <zeebe:taskHeaders>
          <zeebe:header key="resultVariable" value="result" />
        </zeebe:taskHeaders>
      </bpmn:extensionElements>
      <bpmn:outgoing>Flow_1r5x731</bpmn:outgoing>
    </bpmn:startEvent>
    <bpmn:sequenceFlow id="Flow_1r5x731" sourceRef="Event_0ddyuiw" targetRef="Event_1chms07" />
  </bpmn:process>
  <bpmndi:BPMNDiagram id="BPMNDiagram_1">
    <bpmndi:BPMNPlane id="BPMNPlane_1" bpmnElement="Process_1eyrze2">
      <bpmndi:BPMNShape id="Event_1chms07_di" bpmnElement="Event_1chms07">
        <dc:Bounds x="592" y="242" width="36" height="36" />
        <bpmndi:BPMNLabel>
          <dc:Bounds x="601" y="285" width="19" height="14" />
        </bpmndi:BPMNLabel>
      </bpmndi:BPMNShape>
      <bpmndi:BPMNShape id="Event_0ddyuiw_di" bpmnElement="Event_0ddyuiw">
        <dc:Bounds x="392" y="282" width="36" height="36" />
        <bpmndi:BPMNLabel>
          <dc:Bounds x="399" y="325" width="22" height="14" />
        </bpmndi:BPMNLabel>
      </bpmndi:BPMNShape>
      <bpmndi:BPMNEdge id="Flow_1r5x731_di" bpmnElement="Flow_1r5x731">
        <di:waypoint x="428" y="300" />
        <di:waypoint x="510" y="300" />
        <di:waypoint x="510" y="260" />
        <di:waypoint x="592" y="260" />
      </bpmndi:BPMNEdge>
    </bpmndi:BPMNPlane>
  </bpmndi:BPMNDiagram>
</bpmn:definitions>