Kafka connect not connecting to zeebe (?)

Hi there! Maybe there is some bug on zeebe kafka connect? Can anybody answer this?

Error stacktrace from kafka-connect pod (on GKE):

java.util.concurrent.ExecutionException: io.grpc.StatusRuntimeException: UNAVAILABLE: io exception
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
at io.zeebe.kafka.connect.source.ZeebeSourceTaskFetcher.activateJobs(ZeebeSourceTaskFetcher.java:86)
at io.zeebe.kafka.connect.source.ZeebeSourceTaskFetcher.fetchBatch(ZeebeSourceTaskFetcher.java:54)
at io.zeebe.kafka.connect.source.ZeebeSourceTask.lambda$fetchJobs$0(ZeebeSourceTask.java:94)
at io.zeebe.kafka.connect.util.ManagedClient.withClient(ManagedClient.java:53)
at io.zeebe.kafka.connect.source.ZeebeSourceTask.fetchJobs(ZeebeSourceTask.java:94)
at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:267)
at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
at java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
at io.zeebe.kafka.connect.source.ZeebeSourceTask.poll(ZeebeSourceTask.java:76)
at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:245)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:221)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: io.grpc.StatusRuntimeException: UNAVAILABLE: io exception
at io.grpc.Status.asRuntimeException(Status.java:533)
at io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onClose(ClientCalls.java:460)
at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:426)
at io.grpc.internal.ClientCallImpl.access$500(ClientCallImpl.java:66)
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.close(ClientCallImpl.java:689)
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.access$900(ClientCallImpl.java:577)
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:751)
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:740)
at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
… 3 more
Caused by: io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: localhost/127.0.0.1:26500
Caused by: java.net.ConnectException: Connection refused
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:330)
at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:334)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:702)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)

It can’t connect to your Zeebe broker as it says in the exception message:

Connection refused: localhost/127.0.0.1:26500

You may need to configure it to point to your right Zeebe broker (or Camunda Cloud), see zeebe contact point: GitHub - camunda-community-hub/kafka-connect-zeebe: Kafka Connect for Zeebe.io

I have the same problem with localhost zeebe.
I can connect to the zeebe with this example, but with local kafka, there is the connection refused exception. Any idea what is wrong?

Camunda 8 local compose

services:

  zeebe: # https://docs.camunda.io/docs/self-managed/platform-deployment/docker/#zeebe
    image: camunda/zeebe:${CAMUNDA_PLATFORM_VERSION:-8.0.5}
    container_name: zeebe
    ports:
      - "26500:26500"
      - "9600:9600"
    environment: # https://docs.camunda.io/docs/self-managed/zeebe-deployment/configuration/environment-variables/
      - ZEEBE_BROKER_EXPORTERS_ELASTICSEARCH_CLASSNAME=io.camunda.zeebe.exporter.ElasticsearchExporter
      - ZEEBE_BROKER_EXPORTERS_ELASTICSEARCH_ARGS_URL=http://elasticsearch:9200
      - ZEEBE_BROKER_EXPORTERS_ELASTICSEARCH_ARGS_BULK_SIZE=1
      # allow running with low disk space
      - ZEEBE_BROKER_DATA_DISKUSAGECOMMANDWATERMARK=0.998
      - ZEEBE_BROKER_DATA_DISKUSAGEREPLICATIONWATERMARK=0.999
      - "JAVA_TOOL_OPTIONS=-Xms512m -Xmx512m"
      # Enable the embedded gateway to start on broker startup.
      - ZEEBE_BROKER_GATEWAY_ENABLE=true
      # Sets the port the embedded gateway binds to.
      - ZEEBE_BROKER_GATEWAY_NETWORK_PORT=26500
      # Enables TLS authentication between clients and the gateway
      - ZEEBE_BROKER_GATEWAY_SECURITY_ENABLED=false
      # Sets the host the embedded gateway binds to.
      - ZEEBE_BROKER_GATEWAY_NETWORK_HOST=0.0.0.0
      - ZEEBE_BROKER_NETWORK_HOST=0.0.0.0
      # -
      # Enable the embedded gateway to start on broker startup.
      #- ZEEBE_BROKER_GATEWAY_ENABLE=false
    restart: always
    volumes:
      - zeebe:/usr/local/zeebe/data
    networks:
      - camunda-platform
    depends_on:
      - elasticsearch

  operate: # https://docs.camunda.io/docs/self-managed/platform-deployment/docker/#operate
    image: camunda/operate:${CAMUNDA_PLATFORM_VERSION:-8.0.5}
    container_name: operate
    ports:
      - "9081:8080"
    environment: # https://docs.camunda.io/docs/self-managed/operate-deployment/configuration/
      - CAMUNDA_OPERATE_ZEEBE_GATEWAYADDRESS=zeebe:26500
      - CAMUNDA_OPERATE_ELASTICSEARCH_URL=http://elasticsearch:9200
      - CAMUNDA_OPERATE_ZEEBEELASTICSEARCH_URL=http://elasticsearch:9200
      # For more information regarding configuration with Identity see:
      # https://docs.camunda.io/docs/self-managed/operate-deployment/authentication/#identity
      - SPRING_PROFILES_ACTIVE=identity-auth
      - CAMUNDA_OPERATE_IDENTITY_ISSUER_URL=http://localhost:18080/auth/realms/camunda-platform
      - CAMUNDA_OPERATE_IDENTITY_ISSUER_BACKEND_URL=http://keycloak:8080/auth/realms/camunda-platform
      - CAMUNDA_OPERATE_IDENTITY_CLIENTID=operate
      - CAMUNDA_OPERATE_IDENTITY_CLIENTSECRET=XALaRPl5qwTEItdwCMiPS62nVpKs7dL7
      - CAMUNDA_OPERATE_IDENTITY_AUDIENCE=operate-api
      - SPRING_SECURITY_OAUTH2_RESOURCESERVER_JWT_ISSUER_URI=http://localhost:18080/auth/realms/camunda-platform
      - SPRING_SECURITY_OAUTH2_RESOURCESERVER_JWT_JWK_SET_URI=http://localhost:18080/auth/realms/camunda-platform/protocol/openid-connect/certs
    networks:
      - camunda-platform
      - identity-network
    depends_on:
      - zeebe
      - identity
      - elasticsearch

  tasklist: # https://docs.camunda.io/docs/self-managed/platform-deployment/docker/#tasklist
    image: camunda/tasklist:${CAMUNDA_PLATFORM_VERSION:-8.0.5}
    container_name: tasklist
    ports:
      - "9082:8080"
    environment: # https://docs.camunda.io/docs/self-managed/tasklist-deployment/configuration/
      - CAMUNDA_TASKLIST_ZEEBE_GATEWAYADDRESS=zeebe:26500
      - CAMUNDA_TASKLIST_ELASTICSEARCH_URL=http://elasticsearch:9200
      - CAMUNDA_TASKLIST_ZEEBEELASTICSEARCH_URL=http://elasticsearch:9200
      # For more information regarding configuration with Identity see:
      # https://docs.camunda.io/docs/self-managed/tasklist-deployment/authentication/#identity
      - SPRING_PROFILES_ACTIVE=identity-auth
      - CAMUNDA_TASKLIST_IDENTITY_ISSUER_URL=http://localhost:18080/auth/realms/camunda-platform
      - CAMUNDA_TASKLIST_IDENTITY_ISSUER_BACKEND_URL=http://keycloak:8080/auth/realms/camunda-platform
      - CAMUNDA_TASKLIST_IDENTITY_CLIENTID=tasklist
      - CAMUNDA_TASKLIST_IDENTITY_CLIENTSECRET=XALaRPl5qwTEItdwCMiPS62nVpKs7dL7
      - CAMUNDA_TASKLIST_IDENTITY_AUDIENCE=tasklist-api
      - SPRING_SECURITY_OAUTH2_RESOURCESERVER_JWT_ISSUER_URI=http://localhost:18080/auth/realms/camunda-platform
      - SPRING_SECURITY_OAUTH2_RESOURCESERVER_JWT_JWK_SET_URI=http://localhost:18080/auth/realms/camunda-platform/protocol/openid-connect/certs
    networks:
      - camunda-platform
      - identity-network
    depends_on:
      - zeebe
      - identity
      - elasticsearch

  optimize: # https://docs.camunda.io/docs/self-managed/platform-deployment/docker/#optimize
    image: camunda/optimize:${CAMUNDA_OPTIMIZE_VERSION:-3.8.5}
    container_name: optimize
    ports:
      - "9083:8090"
    environment: # https://docs.camunda.io/docs/self-managed/optimize-deployment/setup/installation/#available-environment-variables
      - OPTIMIZE_ELASTICSEARCH_HOST=elasticsearch
      - OPTIMIZE_ELASTICSEARCH_HTTP_PORT=9200
      - SPRING_PROFILES_ACTIVE=ccsm
      - CAMUNDA_OPTIMIZE_ZEEBE_ENABLED=true
      - CAMUNDA_OPTIMIZE_ENTERPRISE=false
      - CAMUNDA_OPTIMIZE_IDENTITY_ISSUER_URL=http://localhost:18080/auth/realms/camunda-platform
      - CAMUNDA_OPTIMIZE_IDENTITY_ISSUER_BACKEND_URL=http://keycloak:8080/auth/realms/camunda-platform
      - CAMUNDA_OPTIMIZE_IDENTITY_CLIENTID=optimize
      - CAMUNDA_OPTIMIZE_IDENTITY_CLIENTSECRET=XALaRPl5qwTEItdwCMiPS62nVpKs7dL7
      - CAMUNDA_OPTIMIZE_IDENTITY_AUDIENCE=optimize-api
      - CAMUNDA_OPTIMIZE_SECURITY_AUTH_COOKIE_SAME_SITE_ENABLED=false
      - CAMUNDA_OPTIMIZE_UI_LOGOUT_HIDDEN=true
    restart: on-failure
    networks:
      - camunda-platform
      - identity-network
    depends_on:
      - identity
      - elasticsearch

  identity: # https://docs.camunda.io/docs/self-managed/platform-deployment/docker/#identity
    container_name: identity
    image: camunda/identity:${CAMUNDA_PLATFORM_VERSION:-8.0.5}
    ports:
      - "9084:8084"
    environment: # https://docs.camunda.io/docs/self-managed/identity/deployment/configuration-variables/
      SERVER_PORT: 8084
      KEYCLOAK_URL: http://keycloak:8080/auth
      IDENTITY_AUTH_PROVIDER_BACKEND_URL: http://keycloak:8080/auth/realms/camunda-platform
      KEYCLOAK_INIT_OPERATE_SECRET: XALaRPl5qwTEItdwCMiPS62nVpKs7dL7
      KEYCLOAK_INIT_OPERATE_ROOT_URL: http://localhost:9081
      KEYCLOAK_INIT_TASKLIST_SECRET: XALaRPl5qwTEItdwCMiPS62nVpKs7dL7
      KEYCLOAK_INIT_TASKLIST_ROOT_URL: http://localhost:9082
      KEYCLOAK_INIT_OPTIMIZE_SECRET: XALaRPl5qwTEItdwCMiPS62nVpKs7dL7
      KEYCLOAK_INIT_OPTIMIZE_ROOT_URL: http://localhost:9083
      KEYCLOAK_USERS_0_USERNAME: "demo"
      KEYCLOAK_USERS_0_PASSWORD: "demo"
      KEYCLOAK_USERS_0_FIRST_NAME: "demo"
      KEYCLOAK_USERS_0_ROLES_0: "Identity"
      KEYCLOAK_USERS_0_ROLES_1: "Optimize"
      KEYCLOAK_USERS_0_ROLES_2: "Operate"
      KEYCLOAK_USERS_0_ROLES_3: "Tasklist"
    restart: on-failure
    networks:
      - identity-network
    depends_on:
      - keycloak

  keycloak: # https://hub.docker.com/r/jboss/keycloak
    container_name: keycloak
    image: jboss/keycloak:${KEYCLOAK_VERSION:-16.1.1}
    ports:
      - "18080:8080"
    environment:
      KEYCLOAK_USER: admin
      KEYCLOAK_PASSWORD: admin
    volumes:
      - "./.keycloak/themes/identity:/opt/jboss/keycloak/themes/identity"
    networks:
      - identity-network

  elasticsearch: # https://hub.docker.com/_/elasticsearch
    image: docker.elastic.co/elasticsearch/elasticsearch:${ELASTIC_VERSION:-7.17.0}
    container_name: elasticsearch
    ports:
      - "9200:9200"
      - "9300:9300"
    environment:
      - bootstrap.memory_lock=true
      - discovery.type=single-node
      # allow running with low disk space
      - cluster.routing.allocation.disk.threshold_enabled=false
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
    ulimits:
      memlock:
        soft: -1
        hard: -1
    restart: always
    healthcheck:
      test: [ "CMD-SHELL", "curl -f http://localhost:9200/_cat/health | grep -q green" ]
      interval: 30s
      timeout: 5s
      retries: 3
    volumes:
      - elastic:/usr/share/elasticsearch/data
    networks:
      - camunda-platform

volumes:
  zeebe:
  elastic:

networks:
  # Note there are two bridge networks: One for Camunda Platform and one for Identity.
  # Operate, Tasklist, and Optimize use both
  camunda-platform:
  identity-network:

kafka local compose:

version: '2.1'

services:

  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch-oss:6.8.3
    ports:
      - "9201:9200"
    environment:
      - discovery.type=single-node
      - cluster.name=elasticsearch
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"

  zookeeper:
    image: zookeeper:3.4.9
    restart: unless-stopped
    hostname: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOO_PORT: 2181
      ZOO_SERVERS: server.1=zookeeper:2888:3888

  kafka:
    image: confluentinc/cp-enterprise-kafka:5.5.3
    hostname: kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka:19092,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
      KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
      KAFKA_BROKER_ID: 1
      KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_METRIC_REPORTERS: "io.confluent.metrics.reporter.ConfluentMetricsReporter"
      KAFKA_CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: "kafka:19092"
      KAFKA_CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: "zookeeper:2181"
    depends_on:
      - zookeeper

  schema-registry:
    image: confluentinc/cp-schema-registry:5.5.3
    hostname: schema-registry
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka:19092
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
    depends_on:
      - zookeeper
      - kafka

  connect:
    image: confluentinc/cp-kafka-connect:5.5.3
    hostname: connect
    ports:
      - "8083:8083"
    environment:
      CONNECT_BOOTSTRAP_SERVERS: "kafka:19092"
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: "connect-group"
      CONNECT_CONFIG_STORAGE_TOPIC: "connect-configs"
      CONNECT_OFFSET_STORAGE_TOPIC: "connect-offsets"
      CONNECT_STATUS_STORAGE_TOPIC: "connect-status"
      CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.storage.StringConverter"
      CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
      CONNECT_VALUE_CONVERTER:  "org.apache.kafka.connect.storage.StringConverter"
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
      CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_REST_ADVERTISED_HOST_NAME: "connect"
      CONNECT_LOG4J_ROOT_LOGLEVEL: "INFO"
      CONNECT_LOG4J_LOGGERS: "org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR,io.zeebe.kafka.connect=TRACE,io.zeebe.client=WARN"
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_PLUGIN_PATH: '/usr/share/java,/etc/kafka-connect/jars'
    volumes:
      - ./connectors:/etc/kafka-connect/jars/
    depends_on:
      - schema-registry
      - kafka

  control-center:
    image: confluentinc/cp-enterprise-control-center:5.5.3
    hostname: control-center
    ports:
      - "9021:9021"
    environment:
      CONTROL_CENTER_ZOOKEEPER_CONNECT: "zookeeper:2181"
      CONTROL_CENTER_BOOTSTRAP_SERVERS: "kafka:19092"
      CONTROL_CENTER_REPLICATION_FACTOR: 1
      CONTROL_CENTER_CONNECT_CLUSTER: "connect:8083"
      CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
    depends_on:
      - zookeeper
      - schema-registry
      - kafka
      - connect

source connector:

{
  "name": "ping",
  "config": {
    "connector.class": "io.zeebe.kafka.connect.ZeebeSourceConnector",
    "tasks.max": "1",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": false,
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": false,

    "zeebe.client.gateway.address": "localhost:26500",
    "zeebe.client.requestTimeout": "10000",
    "zeebe.client.security.plaintext": true,
    
    "zeebe.client.job.worker": "kafka-connector",
    "zeebe.client.worker.maxJobsActive": "100",
    "zeebe.client.job.pollinterval": "2000",
    "zeebe.client.job.timeout": "5000",
    "job.types": "ping",
    "job.header.topics": "topic"
  }
}

The problem was in the kafka compose.
I had to change the ${DOCKER_HOST_IP:-127.0.0.1} part int the following variable to my ipv4 adress

KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka:19092,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092
1 Like

Hi @mPatrik

Great that you got it working! Is there somewhere that this should be documented for future reference that would have made it easy for you if it were there already?

Josh

Hi!
Yes, it would have been better if this part of the documentation previously contained it.

And in the source connector > zeebe.client.gateway.address property is not localhost, its my my ipv4 adress too