We are trying to create a pub sub connector with NATS , are issue arises with the sub (pub is working) , when we deploy the template.json for inbound and use a start event for modeling , it doesnt listen to the topic and hence stays dormant
import nats # NATS Python client for subscribing to messages from NATS
import asyncio
from python_camunda_sdk import InboundConnector
from python_camunda_sdk.connectors import ConnectorConfig
from pydantic import Field
import json
class NATSInboundConnector(InboundConnector):
"""NATS Inbound Connector to subscribe and receive messages from a NATS topic."""
# Metadata for the connector
class ConnectorConfig:
name = "NATS Inbound Connector" # Connector name shown in Camunda Modeler
type = "nats_inbound" # Connector type used internally
# Define configuration fields for the connector
nats_url: str = Field(
default="nats://ec2-54-163-138-108.compute-1.amazonaws.com:4222",
description="The URL of the NATS server.",
group="config") # NATS URL field in configuration
topic: str = Field(
description="The topic to subscribe to.",
group="config") # Topic field in configuration
polling_interval: str = Field(
default="10",
description="The polling interval in seconds.",
group="config") # Polling interval field in configuration
async def run(self) -> dict:
result = {}
client = None
try:
# Log the connection details
print(
f"Connecting to NATS server at: {self.nats_url} for topic: {self.topic}...")
client = await nats.connect(self.nats_url)
print(
f"Connected to NATS server. Subscribed to topic: {self.topic}")
# Define the message handler
async def message_handler(msg):
message_data = msg.data.decode()
print(f"Received message: {message_data}")
if not message_data.strip():
print("Error: Received an empty message.")
result["error"] = "Received an empty message"
return
# Parse the incoming message as JSON
try:
message_json = json.loads(message_data)
result["received_message"] = message_json
print(f"Message received: {result['received_message']}")
except json.JSONDecodeError as e:
print(f"Error decoding message: {e}")
result["error"] = "Invalid message format"
return
# Subscribe to the NATS topic
await client.subscribe(self.topic, cb=message_handler)
# Polling loop to simulate long polling
while True:
print(f"Polling every {self.polling_interval} seconds...")
await asyncio.sleep(int(self.polling_interval))
except Exception as e:
print(f"Error occurred while subscribing: {e}")
result["error"] = str(e)
finally:
print("Shutting down subscriber...")
if client and client.is_connected:
await client.close()
print("NATS connection closed.")
return result
and its corresponding json is
{
"$schema": "https://unpkg.com/@camunda/zeebe-element-templates-json-schema@0.9.0/resources/schema.json",
"name": "NATS Inbound Connector",
"id": "9fe6053e-cc29-467e-9d9a-319d53340179",
"appliesTo": [
"bpmn:ServiceTask"
],
"properties": [
{
"binding": {
"type": "zeebe:input",
"name": "nats_url"
},
"label": "The URL of the NATS server.",
"type": "String",
"group": "input",
"feel": "optional"
},
{
"binding": {
"type": "zeebe:input",
"name": "topic"
},
"label": "The topic to subscribe to.",
"type": "String",
"group": "input",
"feel": "optional"
},
{
"binding": {
"type": "zeebe:input",
"name": "polling_interval"
},
"label": "The polling interval in seconds.",
"type": "String",
"group": "input",
"feel": "optional"
},
{
"binding": {
"type": "zeebe:taskHeader",
"key": "resultVariable"
},
"label": "Result variable",
"type": "String",
"group": "output"
},
{
"binding": {
"type": "zeebe:taskDefinition:type"
},
"type": "Hidden",
"value": "nats_inbound"
},
{
"binding": {
"type": "zeebe:input",
"name": "correlation_key"
},
"label": "Correlation key",
"type": "String",
"group": "config",
"feel": "optional"
},
{
"binding": {
"type": "zeebe:input",
"name": "message_name"
},
"label": "Message name",
"type": "String",
"group": "config",
"feel": "optional"
}
],
"groups": [
{
"id": "input",
"label": "Input"
},
{
"id": "output",
"label": "Output"
},
{
"id": "config",
"label": "Configuration"
}
]
}