How do I signal failure via a message when using an async external service?

Enrico Hofmann: How should the Model looks like that it get the complete state.

Enrico Hofmann: So each message (taks) receive an “failure” or “success” event (Message in zeebe) with the correlation Key

Enrico Hofmann: How can I model that in an async way?

Josh Wulf: The way you have done it now works, where you have two different messages - one signalling success, one signalling failure.

If you want to send a single message and have a single message catch event with a succeed/fail semantic, then you could use a specific key by convention - for example OPERATION_OK and set it to true or false, then have a conditional gateway.

This will make your model more verbose though.

Josh Wulf: Like this

Josh Wulf:

Josh Wulf: It is basically implementing the machinery of a Future/Promise explicitly in your model, using a convention for the async result messages

Josh Wulf: Is this what you are looking for?

Enrico Hofmann: Ok, this is a possible workaround for me :slightly_smiling_face:

Enrico Hofmann: But not the solution. We use CloudEvents, and use a “type” topic. This also represents something. So I have not the option to aggregate it before it goes back to zeebe

Enrico Hofmann: each function that trigger a function foobar has a callback foobar.success or foobar.failure so one possible solution could be to parse that in our middleware. but it will be better to can model this in the workflow.

Enrico Hofmann: Should something like this also works ?

Enrico Hofmann:

Enrico Hofmann: This is should interrupt the main event flow, if failure is thrown, right?

Josh Wulf: What about using the BPMN Error Boundary Interrupt?

Josh Wulf: This relies on not completing the job in the worker while the possibility of an error still exists

Josh Wulf: Hmmmm…. if you use a message interrupting boundary event, you may need to put the task and async completion message in subprocess

Josh Wulf: Like this:

Josh Wulf: “An operation that signals its failure or success to the broker asynchronously

Josh Wulf: I think the receive event in your model can only be correlated while the job is active. As soon as your worker completes the job, the token moves out of that flow node.

Enrico Hofmann: Ok thx Josh for that Information. The thing is how can I failure in async way with error boundary inputs? I know BPMN, and know this should work in a synchronous way, but what is with an async event driven infrastructure?

Josh Wulf: @Enrico Hofmann you can only throw a BPM error event from a worker in its response to the broker. A message is the only way to communicate into the broker asynchronously. You could use an error semantic on your message, and put a specific task straight after the intermediate message catch. In that task worker, you would examine the payload for the conventional payload key for the error response, and respond with a BPM error throw.

The limitation of this pattern is that the BPM error throw from the client cannot update the payload at the same time. Which means that the BPM error flag remains set in the payload. So if somewhere after this in your flow you have another of these, if the incoming message does not reset that flag to overwrite it, the next instance will throw again.

You could namespace each instance of that “Check and throw BPM error” task with a custom header, but that reduces the generic nature of the “message catch / check and throw task” pattern, and makes each message require a specific key / creates a surface area for bugs.

Josh Wulf: Like this:

Josh Wulf: The challenge comes if you have this:

Josh Wulf: In this case, if the async response message in the failure flow does not reset the BPM error flag, then it will remain in the payload from the previous async response, and will always fail again.

You could use a library function for your async message responses that always sets that flag appropriately.

Josh Wulf: Here is TypeScript code to create helper functions that send the async completion messages with a conventional flag set:

export const sendAsyncSuccessMessageFactory = (zbc: ZBClient) => 
  (key: string, payload = {}) => 
    zbc.publishMessage({
      correlationKey: key,
      name: 'async-response', 
      timeToLive: 0, 
      variables: {...payload, throwBpmError: false}
    })

export const sendAsyncFailureMessageFactory = (zbc: ZBClient) =>
  (key: string, payload = {}) => 
    zbc.publishMessage({
      correlationKey: key,
      name: 'async-response', 
      timeToLive: 0, 
      variables: {...payload, throwBpmError: true}
    })

To use it:


const sendAsyncSuccessMessage = sendAsyncSuccessMessageFactory(zbc)

const sendAsyncFailureMessage = sendAsyncFailureMessageFactory(zbc)

// handles a response from an async external system
export const publishAsyncOperationOutcomeToZeebe = res =>
  res?.success ?
    sendAsyncSuccessMessage(res.key, res.payload) : 
    sendAsyncFailureMessage(res.key, res.payload)

Josh Wulf: And the “Check Error Flag Worker”:

  job.variables.throwBpmError ? 
    complete.error() : 
    complete.success())

Josh Wulf: As long as you control the publishing of the Async Response message, you can use these helpers to ensure correct setting of the conventional flag.

Josh Wulf: Here is a model showing the complete flow:

Josh Wulf: Does this approach accomplish what you want to do?

Note: This post was generated by Slack Archivist from a conversation in the Zeebe Slack, a source of valuable discussions on Zeebe (get an invite). Someone in the Slack thought this was worth sharing!

If this post answered a question for you, hit the Like button - we use that to assess which posts to put into docs.

1 Like