Why doesn’t Zeebe support an API to query for active instances?

Stephen Colebourne: Looking at the reference docs here - https://docs.camunda.io/docs/reference/grpc - there is no call to get active workflow instances. We were wondering why that is?

korthout: Zeebe does not offer a query api because of its architecture. The state is distributed, so if you would make a query request it would need to collect data from the entire cluster to return a sound answer. This does not scale well. Instead zeebe offers exporters that separate tools can use to aggregate data from the cluster. For example you can have a look at the ZeeQS project: https://github.com/zeebe-io/zeeqs, which imports the data via Hazelcast, aggregates it and provides a GraphQL API to query it

Note: This post was generated by Slack Archivist from a conversation in the Zeebe Slack, a source of valuable discussions on Zeebe (get an invite). Someone in the Slack thought this was worth sharing!

If this post answered a question for you, hit the Like button - we use that to assess which posts to put into docs.

Stephen Colebourne: Makes sense

Josh Wulf: Operate also does this, if paid support is a requirement for the solution.

Stephen Colebourne: What about getting data about a single active workflow instance?

korthout: We want to avoid providing a double query API. Since CQRS pushes up to move querying to tools outside of zeebe (like elastic search, operate or zeeqs) that already offer tools for querying the data, we don’t want to reimplement it in Zeebe as well.

Stephen Colebourne: I can see the architectural choice, especially for general search, but not being able to query the state by ID feels very odd for an API.

korthout: Thanks for your feedback.

I was interested in what was written in the past about this topic. <https://forum.camunda.io/t/discussion-on-removal-of-list-and-get-workflows-queries-in-zeebe-0-18-0/488|This forum post> discusses the removal of the query api’s from zeebe for 0.18.0 back in 2019. There it is mentioned that:
> At the moment the architecture of Zeebe enables users to query state by exporting the event stream to their preferred data stores which already have clients and query APIs in place. This removes a lot of complexity from the initial Zeebe scope. From my perspective, a usable implementation of a query API on top of Zeebe requires handling partitioned data sets and pagination/filtering support. I don’t want to say that this will never be implemented but at the moment is out of scope for the first GA.
My assumption is that this opinion still holds. We’re currently working towards Camunda Cloud GA with Zeebe 1.0.0.

Stephen Colebourne: I do think being able to get the workflow instance and workflow by ID should be there.

FYI, part of the problem is that the exporter/elastic stuff seems undocumented. It is mentioned, but I have no idea what needs to be done - https://docs.camunda.io/docs/product-manuals/
I have seen some threads suggesting that setting up elastic requires particular setups - Ops type knowledge of CPUs/memory etc. As Camunda Cloud is a hosted service its not great to have to start worrying about these kinds of things. Is there something on the roadmap to get an exporter to (say) AWS elasticsearch more easily setup?

korthout: > FYI, part of the problem is that the exporter/elastic stuff seems undocumented. It is mentioned, but I have no idea what needs to be done - https://docs.camunda.io/docs/product-manuals/
@Josh Wulf and <@U01DN8Z0678>, just a quick ping to make you aware of this documentation improvement request.

> As Camunda Cloud is a hosted service its not great to have to start worrying about these kinds of things. Is there something on the roadmap to get an exporter to (say) AWS elasticsearch more easily setup?
<@U9CLPB6MT>, can you shed some light on this?

here is simple implementation of elastic search queries, adopted to 1.0.0-alpha3 to check ZeebeProcessStatus, GetZeebeProcessVariables, GetZeebeCurrentStep:

const { Client: ElasticClient } = require('@elastic/elasticsearch');
const errorUrl = 'http://localhost:9200';
const elasticUrl = process.env.ElasticUrl || errorUrl;
const elastic = new ElasticClient({ node: elasticUrl });

async function ZeebeProcessStatus (res, logger, params, sequenceId, callback)
{
  if (elasticUrl == errorUrl) {
    callback (res, {error: {error: 'No Zeebe data available', id: sequenceId}});
    return;
  }

   var beginreq = Date.now();
   var processId = params.processId;
   var outVars = [];
   if (params.variableList) {
     outVars = params.variableList.split (',');
   }
   var result = {processId: processId, status: 'ACTIVE', id: sequenceId };

   // Check if processId is valid
   try {
     var error = 'No Zeebe data available for process ' + processId;
     var body = await elastic.search({
       index: 'zeebe-record-process-instance',
       body: {
         'query': {
           'bool': {
             'filter': [
               {
                 'match_all': {}
               },
               {
                 'match_phrase': {'value.bpmnElementType': 'PROCESS'}
               },
               {
                 'match_phrase': {'intent': 'ELEMENT_ACTIVATING'}
               },
               {
                 'match_phrase': {'value.processInstanceKey': processId}
               },
             ],
           }
         },
       }
     });
     if (body.body.hits.total.value !== 1) {
       logger.log({level: 'info', message: {type: 'RESPONSE', method: 'Process.Status', error: error, responsetime: Date.now() - beginreq, sequenceId: sequenceId}});
       callback (res, {error: {error: error, id: sequenceId}});
       return;
     }

     // Check if process is completed
     body = await elastic.search({
       index: 'zeebe-record-process-instance',
       body: {
         'query': {
           'bool': {
             'filter': [
               {
                 'match_all': {}
               },
               {
                 'match_phrase': {'value.bpmnElementType': 'PROCESS'}
               },
               {
                 'match_phrase': {'intent': 'ELEMENT_COMPLETED'}
               },
               {
                 'match_phrase': {'value.processInstanceKey': processId}
               },
             ],
           }
         },
       }
     });
     if (body.body.hits.total.value == 1) {
       result.status = 'COMPLETED';
     } else {
       var lastStep = await getZeebeLastJob (processId);
       result['step'] = lastStep;
     }
     if (outVars.length > 0) {
       var processVars = await getZeebeProcessVariables (processId);
       var obj = {};
       for (var i=0; i < processVars.length; i++) {
         for (var k=0; k < outVars.length; k++) {
           if (outVars[k] == processVars[i].name) {
             obj[processVars[i].name] = processVars[i].value;
           }
         }
       }
       result['variables'] = obj;
     }

     logger.log({level: 'info', message: {type: 'RESPONSE', method: 'Process.Status', result: result, responsetime: Date.now() - beginreq, sequenceId: sequenceId}});
     callback (res, {result: result});
     return;
   }
   catch {
     logger.log({level: 'info', message: {type: 'RESPONSE', method: 'Process.Status', error: error, responsetime: Date.now() - beginreq, sequenceId: sequenceId}});
     callback (res, {error: {error: error, id: sequenceId}});
   }
}

async function getZeebeProcessVariables (processId) {
  var body = await elastic.search({
    index: 'zeebe-record-variable',
    body: {
      'query': {
        'bool': {
          'filter': [
            {
              'match_all': {}
            },
            {
              'match_phrase': {'value.processInstanceKey': processId}
            },
          ],
        }
      },
    }
  });
  var data = body.body.hits.hits;

  var obj = {};
  var vars = [];
  var newVar = false;

  // Last variable value by timestamp
  for (var i=0; i < data.length; i++) {
    obj = {};
    obj['name'] = data[i]._source.value.name;
    obj['value'] = data[i]._source.value.value;
    obj['timestamp'] = data[i]._source.timestamp;
    if (typeof obj.value == 'string') {
      obj.value = JSON.parse(obj.value);
    }
    newVar = true;
    for (var j=0; j < vars.length; j++) {
      if (obj.name == vars[j].name) {
        newVar = false;
        if (obj.timestamp > vars[j].timestamp) {
          vars[j].value = obj.value;
          vars[j].timestamp = obj.timestamp;
        }
      }
    }
    if (newVar) {
      vars.push (obj);
    }
  }
  return vars;
}

async function getZeebeLastJob (processId) {
  var body = await elastic.search({
    index: 'zeebe-record-job',
    body: {
      'query': {
        'bool': {
          'filter': [
            {
              'match_all': {}
            },
            {
              'match_phrase': {'value.processInstanceKey': processId}
            },
            {
              'match_phrase': {'intent': 'CREATED'}
            },
          ],
        }
      },
    }
  });
  var data = body.body.hits.hits;

  // Last job by timestamp
  var step = '';
  var t1;
  for (var i=0; i < data.length; i++) {
    var obj = {};
    obj['step'] = data[i]._source.value.elementId;
    obj['timestamp'] = data[i]._source.timestamp;
    if (step == '') {
      step = obj.step;
      t1 = obj.timestamp;
    } else {
      if (obj.timestamp > t1) {
        step = obj.step;
        t1 = obj.timestamp;
      }
    }
  }
  return step;
}

module.exports = {ZeebeProcessStatus};
3 Likes