Using the RabbitMQ Connectors to make RPC calls

We are investigating Camunda 8 again (we also did this almost two years ago when 8.0 was released), now that a RabbitMQ Producer Connector and RabbitMQ Inbound Connector have been added.

We rely heavily on RPC calls to initiate actions. Simply publishing a message isn’t enough since we need to wait until it is finished or handle failures.

This is based on RabbitMQ tutorial - Remote procedure call (RPC) — RabbitMQ

Prerequisite:

  • Make sure there is a camunda8 queue with a binding on camunda.rpc.reply.

It roughly goes like this:

  • Generate a GUID for the correlationId (still need to find out how we best do this)
  • Use the RabbitMQ Outbound Connector to send a message.
    • The routing key is something like worker.sre.subscriber.create.v1.
    • The body contains a JSON with information about the subscriber to create.
    • The properties contains {"replyTo": "camunda.rpc.reply", "correlationId": guid}
  • Another worker that is subscribed to the RabbitMQ bus is receiving messages with routing key worker.sre.subscriber.create.v1 and create the subscriber in the backend. When this is done (might take a while), it sends back a message to the routing key indicated by the replyTo property to indicate success or failure. The passed correlationId will be relayed in this reply.
  • The Camunda process listens for these messages and takes action based on the success or failure.

Configuration in the Modeller:

afbeelding

Send AMQP:

Wait for done:

This works, but it’s a bit cumbersome. Is there a better way handle this? There is also no FEEL function to generate a GUID unfortunately. (See Built-in function for UUID)

Ideally, there would be a RPC Task definition in the RabbitMQ Connector to combine these things:

  • Generate a GUID for the correlationId
  • Publish the message
  • Listen for a reply on the specified queue and pick up a reply with the correct correlationId
  • Communicate to zeebe to continue the process

I’ve also seen this warning on this page RabbitMQ Connector | Camunda 8 Docs :

To maintain stable behavior from the RabbitMQ Connector, do not subscribe multiple RabbitMQ Connectors to the same queue.

Successfully consumed messages are removed from the queue, even if they are not correlated.

Does this mean that this setup isn’t even supported? There might be other messages arriving on our single response queue for other processes…

Hi @MichaelArnauts,

Currently there’s no simple out-of-the-box solution for RPC tasks via RabbitMQ Connector. The setup you’ve come up with is definitely a viable option, but as you mentioned, it’s somewhat cumbersome.

To mitigate this complexity, I would recommend one of the following approaches:

  1. You can encapsulate the complex implementation in a subprocess and invoke it using a Call Activity. See more info here. This way you will only have to implement the RPC pattern only once and reuse it from other processes.

    Note that you can further simplify the usage of the call activity by creating a custom element template. You can hardcode the called process ID and other relevant properties in an element template. See examples attached.

    Combined_Process.bpmn (5.3 KB)
    echo_call_activity.bpmn (2.8 KB)

Call activity element template
{
  "$schema": "https://unpkg.com/@camunda/zeebe-element-templates-json-schema/resources/schema.json",
  "name": "Combined Connector",
  "id": "io.camunda.connectors.Combined",
  "documentationRef": "https://docs.camunda.io/docs/components/connectors/out-of-the-box-connectors/automation-anywhere/",
  "version": 1,
  "appliesTo": [
    "bpmn:CallActivity"
  ],
  "elementType": {
    "value": "bpmn:CallActivity"
  },
  "groups": [
    {
        "id": "message",
        "label": "Message"
    }
  ],
  "properties": [
    {
      "type": "Hidden",
      "value": "Combined_Process",
      "binding": {
        "type": "zeebe:calledElement",
        "property": "processId"
      }
    },
    {
        "group": "message",
        "type": "Text",
        "label": "Message ID",
        "description": "The message ID to be echoed",
        "constraints": {
            "notEmpty": true
        },
        "binding": {
            "type": "zeebe:input",
            "name": "messageId"
        }
    },
    {
        "type": "Hidden",
        "value": "echoedId",
        "binding": {
            "type": "zeebe:output",
            "source": "=echoedId"
        }
    }
  ]
}
  1. If hosting an on-premises connector runtime is an option for you, you can create a custom RabbitMQ connector implementation using the Connector SDK. You can then spin up a self-managed connector runtime instance and either connect it to a SaaS cluster or use it with Camunda 8 Self-Managed.

Regarding multiple connectors subscribing to the same queue, this is currently not recommended because every connector in the diagram currently leads to creation of a separate RabbitMQ subscription. Because of that, if you run it without additional server-side configuration, consumers will be stealing messages from each other. While this may technically work if you set the correct activation condition in your connector, unexpected things can happen due to messages being constantly requeued after being read by a wrong consumer.

The good news is that we’re considering a way to support more flexible configuration of how connectors are mapped to subscriptions. See similar issue here. We will share more details when it’s available, but for now we recommend using one queue per connector.

1 Like

Hi @chillleader ,

Thanks for your reply!

I think the 2nd option is best, since I think this belongs in the RabbitMQ Connector itself.

I’ve opened an issue: Use the RabbitMQ connector to make RPC calls · Issue #1735 · camunda/connectors · GitHub
And made an draft implementation here Add RPC support to the RabbitMQ Connector by michaelarnauts · Pull Request #1812 · camunda/connectors · GitHub.

I would really appreciate some input however. Will this even be accepted, or should I just keep the code for myself and build a separate Connector docker image?

Thanks for opening the issue and preparing a draft implementation!

I will take a detailed look and provide feedback, and then we can decide how to proceed with it.