Hi everyone!
I’m currently working on a small exporter for zeebe that derives a few custom messages from the exported records and publishes them to a kafka topic.
the export function is really just this:
fun export(record: Record<*>) {
processRecord(record, record.value)
controller.updateLastExportedRecordPosition(record.position)
}
where the processRecord
function interprets a few events and produces some kafka messages. When I start zeebe with the exporter this works exactly as I expect and all the correct messages end up in my kafka topic.
The problem: When I restart zeebe, all of the previously exported records are exported again.
The configuration of the exporter is simple (names replaced):
zeebe:
broker:
exporters:
simple-exporter:
className: my.SimpleExporter
jarPath: simple-exporter.jar
Configuration of the kafka producer is done through environment variables only, other than that there is nothing to configure.
The zeebe installation is use for testing is running in docker-compose like this (reduced):
version: "3"
services:
zeebe:
image: camunda/zeebe:${CAMUNDA_PLATFORM_VERSION:-8.3.3}
container_name: zeebe
environment:
- ZEEBE_BROKER_CLUSTER_PARTITIONSCOUNT=6
- ZEEBE_BROKER_CLUSTER_REPLICATIONFACTOR=1
- ZEEBE_BROKER_CLUSTER_CLUSTERSIZE=1
- ZEEBE_BROKER_EXPORTERS_ELASTICSEARCH_CLASSNAME=io.camunda.zeebe.exporter.ElasticsearchExporter
- ZEEBE_BROKER_EXPORTERS_ELASTICSEARCH_ARGS_URL=http://elasticsearch:9200
- ZEEBE_BROKER_EXPORTERS_ELASTICSEARCH_ARGS_BULK_SIZE=1
- SIMPLE_EXPORTER_KAFKA_BOOTSTRAP_SERVERS=kafka:9092
- SIMPLE_EXPORTER_KAFKA_TOPIC=simple_exporter
networks:
- camunda_platform
volumes:
- zeebe:/usr/local/zeebe/data
- './application.yaml:/usr/local/zeebe/config/application.yaml'
- './simple-exporter.jar:/usr/local/zeebe/simple-exporter.jar'
# kafka, elasticsearch and operate serivces omitted
volumes:
zeebe:
driver: local
networks:
camunda_platform:
driver: bridge
I also had a look at zeebe’s raft files to see if I can find information about the simple-exporter and I did find a section that seems to be the last position., but upon restart it still replayed all the previous records.
Thanks in advance for any input!