Seeking Assistance with Token Update for Pyzeebe Worker in Camunda 8 Self-Managed Setup

Hello everyone,

I’ve set up Camunda 8 in a self-managed environment using Helm charts and have configured an ingress for all the services. My current project involves getting the Pyzeebe worker to operate over a secure channel. So far, I’ve managed to ensure it works within the token’s retention lifetime. However, I’m aiming to have it run continuously without manual token renewal. Does anyone here have experience or a solution for automatically updating the token? Below is the code I’m working with:

...
import asyncio
import requests
import grpc

token = get_access_token()
# grpc stuff
token_credential = grpc.access_token_call_credentials(token)
ssl_credentials = grpc.ssl_channel_credentials(root_certificates=root_certificates)
composite_credentials = grpc.composite_channel_credentials(ssl_credentials, token_credential)
channel = create_secure_channel(hostname="my-zeebe-gateway.mydomain.com", port=443, channel_credentials=composite_credentials) # Create grpc channel

# now create the worker
worker = ZeebeWorker(channel)



async def on_error(exception: Exception, job: Job):
    """
    on_error will be called when the task fails
    """
    logger.error(exception)
    await job.set_error_status(f"Failed to handle job {job}. Error: {str(exception)}")

@worker.task(task_type="authorisatie_mail_sturen", exception_handler=on_error)
def example_task(job: Job,  *args, **kwargs) -> dict:

    return {"output": f"It works!"}

loop = asyncio.get_event_loop()
loop.run_until_complete(worker.work())

Any guidance or suggestions would be greatly appreciated. Thank you in advance for your help!

For what it’s worth, I discovered my own solution, drawing inspiration from some obscure parts of a GitHub post for guidance. Accordingly, I’ve adjusted my code in the following way:

# Custom Auth Metadata Plugin for checking and updating the token before each RPC call
class CustomAuthMetadataPlugin(grpc.AuthMetadataPlugin):
    def __init__(self, token_refresh_callback):
        self.token_refresh_callback = token_refresh_callback
        self.token = None
        self.expiry = 0

    def __call__(self, context, callback):
        # Check if the token is expired
        current_time = time.time()
        if self.token is None or current_time >= self.expiry:
            logger.debug("Refreshing access token")
            self.token = self.token_refresh_callback()
            # Assuming token expiry is 300 seconds from now. Adjust if the expiry logic is different.
            self.expiry = current_time + 300 - 30  # Refresh 30 seconds before actual expiry

        # Add authentication header with the token
        metadata = (('authorization', f'Bearer {self.token}'),)
        callback(metadata, None)

# token = get_access_token()
# token_credential = grpc.access_token_call_credentials(token)
# ssl_credentials = grpc.ssl_channel_credentials(root_certificates=root_certificates)
# composite_credentials = grpc.composite_channel_credentials(ssl_credentials, token_credential)

# Your existing code for getting access token and setting up SSL
token_refresh_callback = refresh_access_token
auth_metadata_plugin = CustomAuthMetadataPlugin(token_refresh_callback)

# Now, instead of directly using access token credentials, use the plugin with call credentials
call_credentials = grpc.metadata_call_credentials(auth_metadata_plugin)
ssl_credentials = grpc.ssl_channel_credentials(root_certificates=root_certificates)
composite_credentials = grpc.composite_channel_credentials(ssl_credentials, call_credentials)

#stays the same
channel = create_secure_channel(hostname="my-zeebe-gateway.mydomain.com", port=443, channel_credentials=composite_credentials)
worker = ZeebeWorker(channel) # Create a zeebe worker

Now it works as desired.

1 Like

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.