Safe signaling of message events when consuming from a queue

Hello,

we are struggling to test processes which contain contain a message events. Just after signaling it, it looks like the process doesn’t move immediately.

We are looking for possible cause of it:

  1. Are message events signaled asynchronously by default?
  2. Has it got something to do with H2 in-memory configuration?

In order for our tests to work, we are forced to introduce a number of Thread.sleep here and there and the time required by our tests have explosed.

What’s the right solution for this issue?

Assuming that this process is signalling through the Camunda event-engine? In other words, is it passing through the job executor?

That’s what I thought when you said “doesn’t move immediately”.

You might want to take a look at Camunda’s event infrastructure to see which best applies. Alternate event infrastructure includes CDI, JMS integration, etc… Each has their own pros/cons.

I typically weigh in on (naming just a few):

  • require transacted Camunda flow navigation
  • does it need to happen “now!”
  • planning on clustered production infrastructure
  • reliability requirements (fire/forget, or external guaranteed delivery)

We are using

processEngine.getRuntimeService.messageEventReceived. Are there alternatives?

The topics you listed are obviously to be kept into consideration and as we are moving our first steps with the Process Engine, we would appreciate if you could provide links or details :slight_smile:

Thanks

For event testing (jms, etc.), I use Apache Camel’s extended JUnit framework. But, requires some background on Camel.

What’s nice about Camel’s test-extension is that it’s capable of concurrent, parallel execution and monitoring. Though the new JUnit does this… Camel is built-to-purpose around testing event-management.

My test case, for example:

  1. Launch multiple event-emitters
  2. While emitting events (in parallel), start up receivers and measure: duration, count, etc.

Direct thread APIs not required…

I’ll try to follow-up later. But, here’s a very rough link to a basic test-case (maven project for Camunda with some testing). However, it’s ONLY an example - and a rough one at that… was planning on beefing up the example.

I’ll try to follow-up later

You raised a point that shows the problem is not with the tests, but with the process itself.

Take this example: I have a thread consuming events from a queue and signaling events on a process. When two events arrive at a very short distance, the second event would try to signal the second message, but if the transition from the first event has not completed, it would fail.

How can I make transitions fully synchronous when signaling message catch events?

Assuming I understand the problem domain - paraphrasing a bit:

From a logical view, we rough-in these top-level systems:

1) business event stream: events originating here. These events have their own management services. The event stream needs to handle arrival (initial arrival and/or return), interpretation, and routing. There’s also a system-service, if needed, to help with misc infrastructure (trans, HA, DR).

  1. A specialized-to-purpose event listener/processor providing some form of mediation between the business-event-stream and BPM. For lack of a better name… event-processor. This event-processor quickly reacts to the following key conditions: event-type AND event-payload. The above event stream (#1) is typically responsible for routing events. So, this specific event-processor is only bothered when needed.

3) BPM-engine hosting various BPMN/process-flow types. The BPM-engine responds to API calls from above #2, event-processor. The BPM-engine also has its own, internal complete set of specialized BPM-event management sub-systems.

As events stream in they’re (1) routed to processor, (2) managed and delivered to BPM, (3) BPM-process invoked and/or pre-existing correlated with updates.

Key requirement is that streaming events either start new or aggregated within pre-existing BPM process-instances. Additionally, some events represent duplicates… (and need to be handled that way).

Give the logical architecture, we’re experiencing a problem whereby:

a) starting too many process instances (i.e. creating duplicates, competing for same resources)

  • or -
    b) not responding well to requests from #2 processor for aggregation.

What I’ve done to solve this issue is to create a specialized correlation service. This correlation-service uses a transaction manager (I’ve used the DBMS).

Event types and payloads then handled in the following manner:

start transaction:
a) look-up: have we already seen this exact event type and payload?
b1) if found: aggregate. Associate new event and payload to pre-existing

  • or -
    b2) if not-found: create new and add payload.
    end transaction:

In case-management terms: create a new case or add information to pre-existing.

The next problem is signalling an in-flight, process instance when new event-payloads become associated (i.e. added to this existing case).

This requirement sounds like you need a BPM-event interceptor. The tricky part is when, and at which point, does the in-flight process receive and handle events (requests for aggregation)?

… and this needs some illustration.

Is this explanation on-track? - It’s Monday morning, and I require additional coffee. I want to share an illustrated example later - if this all makes sense.

The explanation are correct, although I do not understand why we need a complex solution. Isn’t the BPM synchronous and transactional?

Hey,

I think we are getting a bit off track. I’ll try to clarify the engine’s behavior. RuntimeService#messageEventReceived is going to synchronously execute the BPMN process until the next wait state is reached (e.g. user task, catching event, asynchronous continuation). In case of an exception raised during that processing, it rolls back to the state before the event was received.

If you have the impression that the engine does not behave this way, can you please provide a (simplified) process model and steps to reproduce your problem or even a test case that fails (based on the unit testing template)?

Thanks,
Thorben

1 Like

Addressing both complexity and capability, as @thorben points out, Camunda has built-in features meeting these requirements.

Aggregating all our system characteristics into one domain significantly reduces complexity.

The question is then seeking balance on the following points:

  1. latency, or duration between event genesis and reaction. Responsiveness defined as a business or non-functional requirement
  2. Managing capacity: system impact and effects resulting from business-event loading into targeted destination.
  3. Managing inter-system operations: impact to services, within service infrastructure (transaction: ACID, I/O, etc). Same as #2 above, but with a angled view on system-level services.
  4. Maintenance and administration: general fit within the enterprise architecture and associated long-term goals - with a focus on operations/maintenance (complexity, etc).

Apologies for restating: from a testing perspective, the next step is to come up with a good automated test-case for above items 1 and 2.

Just sharing some ideas…
I was thinking about driving Camunda with an increasing event-msg load. Monitoring both both internal resource consumption and responsiveness. My existing tests submit a static load - I originally didn’t intend on charting profile characteristics.

Camunda’s new v7.6 alpha release(s) provide fantastic built-in metrics/performance reporting. I was thinking about marrying this with HawtIO measurement plug-ins. Goal is to monitor both internal and external interaction (system-edge).