How to complete a task later asynchronously

Hey guys.

I currently want to implement a simple demo which sends a message via Kafka and waits for an response message on Kafka before to move on. As we do not yet have receiving message capabilities in the broker I thought to simply use a task handler to send the message and then do nothing. The response message will trigger the completion of the task later.

In code I do something like this:

  @ZeebeTaskListener(taskType = "payment")
  public void handleTaskA(final TasksClient client, final TaskEvent task) {
    // send message via Kafka (or basically Spring Cloud Streams, could also be Rabbit or sth similar)
    messageSender.send( //
        new Message<RetrievePaymentCommandPayload>(...));
    // do NOTHING, so the workflow instance keeps waiting there
   }

Now we can listen on the messages on Kafka, with Spring CLoud Streams e.g.:

  @StreamListener(target = Sink.INPUT)
  public void messageReceived(String messageJson) {
    Message<PaymentReceivedEvent> message = new ObjectMapper().readValue(messageJson, new TypeReference<Message<PaymentReceivedEvent>>(){});
    PaymentReceivedEvent evt = message.getPayload();    
    
    // THIS IS NOT POSSIBLE:
    zeebeClients.tasks().complete(evt.getTaskId());

I need to have a TaskEvent in order to complete the task. Why is this? Is there a reason that prevents completing the task with some id only? Or probably a combination of 2 or informations?
Is there any way to reconstruct this myself. What is necessary to set on this event and what is not needed?
Or would it be doable to stringify the event and reconstruct it from there?

Happy to take any thoughts - as this is currently the only way I see to implement waiting for any event (message, timer, …) from the outside.

Thanks
Bernd

2 Likes

Philipp just pushed me into the right direction via Slack:

  • Remember the position of the event.
  • Combined with the topic this is a unique identifier
  • You can re-read the event from Zeebe later on in this case

open a topic subscription to find the given with the stored position https://github.com/zeebe-io/zeebe-simple-monitor/blob/master/src/main/java/io/zeebe/zeebemonitor/rest/WorkflowInstanceResource.java#L86

Thanks a lot Philipp!!

I can post some code here as soon as I have it working.

Here my current code (working): https://github.com/flowing/flowing-retail/blob/zeebe/kafka/java/order-zeebe/src/main/java/io/flowing/retail/kafka/order/adapter/ZeebeWorkarounds.java

Hi Bernd,

I think this will be a better fit once you will have bpmn message send / catch events.

You will send a message to a kafka topic and consume the response and correlate it back to zeebe from another one.

Makes sense?

(Now whether that will be a single service task or a sequence of message send / catch (receive) is another story.)

Best,
Daniel

Also: https://gist.github.com/meyerdan/6f9dd94192652977870346d09d310b20

Definitely make sense for this use case - once we have that. But I think we should still have a way of completing a task with an ID only.

And there is a related discussion around if send command/receive event should always be two activities (send & receive task) or if that can be combined into one.We haven’t yet talked about how we see this in Zeebe…

Too bad you cannot comment in gists :wink: Thanks for the proposal - let me comment on some key points:

ZeebeClient client = ...;
ZeebeTopicClient orderTopic = client.topic("order-processing");

Probably a good idea to abstract the topic at the very beginning - this also makes it easy to inject it and the code using it doesn’t have to know about the topic name at all.

I still would love to have

ZeebeTopicClient orderTopic = client.topic(DEFAULT); 

This is confusing - listTopics get you a Map of partitions?

Map<String, PartitionInfo> partitions = client.listTopics();
PartitionInfo partitionInfo = partitions.get("order-processing");

I thought a bit about it, but I think I like this way of making it clear that you send commands. It is very in-sync with the conepts behind.

orderTopic.send(new StartWorkflowInstanceCommand("orderProcess").payload(...));

The downside is definitely that it will be harder for “workflow” people to understand it.
Also we should think about how a user will know about the possible commands - as you cannot simply use code completion with that. Probably some kind of Factory or static helper could help with this?

orderTopic.send(Commands.startWorkflowInstanceCommand("orderProcess").payload(...));

I definitely do not like the design that you have to store the task event yourself for completing it later asynchronously (https://gist.github.com/meyerdan/6f9dd94192652977870346d09d310b20#file-taskworkerasync-java). Why is that? What kind of information do you need beyond the task id? We should at least provide a simple strinigified identifier that can be used to restore the event with sufficient data to complete. Actually I still would prefer an API where you can simply complete by task id. What is the show stopper of doing that?

And last comment:

new TaskHandler()  {
  public void handle(TaskEvent taskEvent) {
    topicClient.send(new CompleteTaskCommand(taskEvent).payload(...));

Where does the topicClient comes from in https://gist.github.com/meyerdan/6f9dd94192652977870346d09d310b20#file-taskworkersync-java-L15?
And: While it is consistent, at this point sending a Command feels a bit wired compared to the current API.

A task worker in Zeebe, you have to look at it as stream processor: it reacts to an event “Task Locked and Available for processing” calculates it’s logic, potentially updating it’s state and then submits a command “Compte Task”.
This is a different perspective from “Call a service”. We need to synchronize on this, make it clearer and document it properly.

In the example you created: do you

  1. use kafka as asynchronous transport for “calling” services
  2. use kafka as an event bus to which events are published and from which events are consumed to which the workflow should react.

Case 1) does not make sense in my opinion.
Case 2) makes sense, but here, if you build an integration with Zeebe, than that integration will be stateful and will have to do the correlation before Zeebe can do it itself

In the example it would be option 2 - as I will wait not for the ACK of the command but a real event that is issued from another component.
But despite that I do not see why option 1 should not make sense? Probably Kafka is misused as messaging system in this case - but I see that happening at quite some customers. And then you send a command and wait for the ack - why not?

In both cases building a stateful integration is a show stopper of using Zeebe for this use case in my perspective. If I have to store state in order to use a state machine that is really wired (and hard to do at scale). But I know that this is an intermediate stage until we can really do it ourselves.

But what I still do not understand: Where is the problem in providing a completeTask(taskId) method? Is there a technical show stopper? Why do you want to avoid that use case by all means?

And I think we could/should still discuss if one service task in a BPMN process could serve as send AND receive in one node. We discuss this every time we have some kind of async communication (messaging, Kafka) with Camunda BPM. But that might be an optimization later on and it is not that important at the moment.

1 Like

It strikes me that we’re attempting to fit a square peg in a round hole here… and I agree with Daniel that this use case will work much more effectively once we have events - especially message events - in Zeebe.

I think this argument is very similar to the argument of whether it should be possible to implement an activity in a Camunda BPM BPMN diagram using ActivityBehavior, and that one’s essentially private (in an impl package) because there’s a better way to accomplish that in Camunda BPMN… message events.

For now, in demos, it’s possible to hold a lock for a very long time and thus implement this functionality, but it isn’t perfect. Then again, demos don’t have to be perfect. :blush:

I do agree that we should be able to complete a task using a task id; that just makes sense.

Big picture, I think we need to let Zeebe evolve a bit more and wait until it has functionality such as message events before we spend a lot of time discussing the API.

-Ryan

I discussed with Sebastian earlier on Skype and he explained me a bit more why we cannot complete a task based on its task id at the moment.

When we complete a task, the event that we hand in will be written to the log directly. That means it has to contain all relevant information. We do not look up the event or attributes of it, as this would require to read back the event log - which is not very performant to do.

We will have to tackle questions like this with added features around e.g. message correlation. Let’s wait for that, maybe that also can help with the complete(taskId) - as this is basically also only a correlation :slight_smile:

It’s like CQRS if the client has to store the event locally or find it in the topic. For convenience, this kind of functionality should be integrated into the clients provided by Camunda. Ultimately, what users will work with is the client API. What happens internally is hidden like Camunda BPM’s RuntimeServiceImpl.

If one decides to always look up the event like Philipp showed, does that come with a performance penalty?

I come back one year later with a similar question : for asynchronous tasks implying messaging systems , should i always create a send task & a receive catch message with a corelationId or is there a way for an external service to mark a job as finish and send its result in Zeebe?

Thanks

Yes, messaging. See: https://zeebe.io/blog/2018/12/writing-an-apache-kafka-connector-for-zeebe/

Hi , I was looking for Microservices orchestration using different solutions and came across Zeebe.
I tried the starter examples provided in the source code of spring-zeebe project with a sample workflow as shown in screenshot below, and i observed that by default all the Service tasks were getting invoked sequentially by default.

I was particularly looking for calling few of my Miroservices in a workflow asynchronously, for e.g. a JobWorker for Task1 need not be complete inorder to Task 2 and Task 3 to be invoked.
While searching for this solution i happened to land into these conversations in this thread which I am aware that they are quite old now, but the problem looked same.

All i wanted to know was do i really need a messaging infrastructure like Kafka or any other MQ to achieve async service task completion? What if i dont really care about the response of the Service Task 1 (from example i described above) and want the workflow execution to continue.
Is there any solution where in Java client I mention the Job Worker to mark Job as complete without waiting from the Microservice i invoked inside the handler?

1 Like

Hey abhiraj747 - nice to see you in our forums - happy that you found Zeebe!

Like everything in life, the answer depends - so it might be good to share some more tech/archirtecture details of what you are doing.

But my first approach would actually be to model the way you want this to happen. For example:

Now you can do Task1, 2 & 3 really in parallel - and not waiting for the result. But you wait for 2 & before doing 4. This model is exectuable in Zeebe, see Worker.java · GitHub

2 Likes

Thanks a lot berndrueker!!

The parallel gateway solution works fine for this. I was missing the number of threads while initiating the ClientBuilder , added the property in application.yaml in spring boot clinet example and it worked fine !!

Great - good to hear! We already discussed to give this config option more visibility in our examples and docs.

Hi @berndruecker,

even though this is a very old topic, I would like to revisit it.

Camunda Cloud has matured considerably in the meantime and there are now numerous examples of workers, exporters, etc., that can be used as orientation. Including good documentation :slight_smile:

However, the question “How to complete a task later asynchronously” is not finally answered for me. Especially if I interpret “later” as really significantly later like “after several days or weeks”.
To go into more detail:

  • I have the possibility to write a worker for the job type “io.camunda.zeebe:userTask”.
  • I now want to use a CQRS pattern
  • The goal is to first save information about these job instances along with some other business information which eventually is not known to the process engine (this can be described as simply listening to “task has been activated” events), and then at some point - and really at some point whenever in the future - complete jobs.
  • the GraphQL API of the tasklist is well designed, but for particular custom use cases it doesn’t always offer what I need. Using the tasklist component might then be overhead. Especially if I go into the direction like described.

Sure I can use asynchronous / reactive patterns in workers and I can complete a job asynchronously by using its key.

Nevertheless, workers are designed in such a way that they typically process their jobs in a more timely manner. And the process engine assigns the job instance to another worker instance after a timeout. Well, I have to take care of idempotency anyway. But still the question remains: is a worker the right concept if I want to do real CQRS like described above? Isn’t a single worker instance configured with an amount of jobs it likes to handle concurrently? What do I do with that? What else do I need to consider in that case?

Are exporters the better concept? Certainly if I only want to listen to events from the process. But then the possibility to complete a task is missing. What would you do? Mix these concepts?

I am curious about your answer. Thanks in advance.

Gunnar

Good morning Gunnar - nice to see you here :slight_smile:

Let me quickly recap in my words: You want to push a task to some external system, which eventually completes it, but this might take days or weeks, right?

Let’s quickly recap options (basically thinking out loud):

  1. You use a normal service task, but simply set a high job timeout. The timeout is a int64 field, which means you can set quite high numbers (like months or years), even if they need to be expressed in milliseconds.

  2. You could use a Send-Task to push the field to your system and the following receive task to wait for a response

  3. You can use a Human Task and the Task API

  4. You could use exporters in some of the above scenarios to avoid querying systems.

Quick assessment:

  • I would try to avoid exporters, as they will not easily work in a SaaS scenario (might not be a problem for you, but it is a general limitation)

  • For the service task vs. send/receive Task combo you can also look at the discussion in Service Integration Patterns With BPMN And Camunda Cloud | by Bernd RĂĽcker | berndruecker. In general, I am a bit skeptical about keeping internal ids (like the job key) in external systems for long time. Again: Might not lead to problems in your case, but I feel more confident with then using some artificial id you have under your control. This means I would lean towards send/receive task (but this might get challenged if you use that all over the place in a model).

  • The task API should also work, but if you keep tasks in your external system, the same logic about internal IDs applies.

So my architect’s brain thinks, you should look at send/receive tasks. But my business analyst’s brain says, this might get too cluttered and you should probably just use a long task timeout.

Does this make sense?
Best
Bernd

PS: Quick side note: I think it is easier if you start new threads for new questions (and potentially link to old ones that are related).

2 Likes