I’m trying to decouple my external tasks from the process engine using some message broker (currently I’m using SQS - SNS). My goal is to after the process engine commit of the external task to the database send a message to my message broker in order the external task read the message and process the task. I have implemented a plugin following the camunda provided tutorials and almost everything works fine except for the notify method at the execution listener.
public class ProgressLoggingExecutionListener implements ExecutionListener {
private final Logger LOGGER = Logger.getLogger(this.getClass().getName());
private final SnsClient snsClient;
private final ObjectMapper objectMapper;
public String requestsTopicTopicArn;
private String topicName;
// constructor with extension property value as parameter
public ProgressLoggingExecutionListener(String topicName, String requestsTopicTopicArn) {
this.topicName = topicName;
this.objectMapper = new ObjectMapper();
this.snsClient = SnsClient.create();
this.requestsTopicTopicArn = requestsTopicTopicArn;
}
// notify method is executed when Execution Listener is called
@Override
public void notify(DelegateExecution execution) throws Exception {
LOGGER.info("EXTERNAL TASK TOPIC RUNNING: " + topicName);
Context.getCommandContext().getTransactionContext()
.addTransactionListener(TransactionState.COMMITTING, commandContext ->{
ExternalTask externalTask = execution.getProcessEngineServices()
.getExternalTaskService()
.createExternalTaskQuery()
.executionId(execution.getId())
.singleResult();
if (externalTask != null) {
try {
sendMessage(externalTask, execution.getVariables());
} catch (Exception e) {
LOGGER.log(Level.SEVERE, "Error sending SQS message", e);
}
} else {
LOGGER.info("External for model {" + execution.getBpmnModelElementInstance().getName() + "} not found");
}
});
LOGGER.info("EXTERNAL TASK TOPIC COMPLETED: " + topicName);
}
private void sendMessage(ExternalTask externalTask, Map<String, Object> variables) throws Exception {
LOGGER.info("Sending SNS message for topic: {" + topicName + "}");
Map<String, MessageAttributeValue> messageAttributes = new HashMap<>();
messageAttributes.put(Constants.ATTRIBUTE_ROUTING_KEY, MessageAttributeValue.builder()
.stringValue(topicName)
.dataType("String")
.build());
String businessKey = externalTask.getBusinessKey();
String workflow = externalTask.getProcessDefinitionKey();
String topicName = externalTask.getTopicName();
String externalTaskId = externalTask.getId();
WorkflowMessage workflowMessage = WorkflowMessage.builder()
.businessKey(businessKey)
.workflow(workflow)
.externalTaskId(externalTaskId)
.externalTaskName("")
.responseQueueUrl(topicName + "-queue-responses")
.variables(variables)
.build();
snsClient.publish(PublishRequest.builder()
.topicArn(requestsTopicTopicArn)
.message(objectMapper.writeValueAsString(workflowMessage))
.messageAttributes(messageAttributes)
.messageGroupId(topicName)
.build());
}
}
Following is the parse listener of the plugin:
public class ProgressLoggingSupportParseListener extends AbstractBpmnParseListener {
public final String requestsTopicTopicArn;
public ProgressLoggingSupportParseListener(@Value("${aws.requests-topic:none}") String requestsTopicTopicArn) {
this.requestsTopicTopicArn = requestsTopicTopicArn;
}
// parse given service task to get the attributes of the property extension elements
@Override
public void parseServiceTask(Element serviceTaskElement, ScopeImpl scope, ActivityImpl activity) {
ActivityBehavior activityBehavior = activity.getActivityBehavior();
if (activityBehavior instanceof ExternalTaskActivityBehavior) {
//ExternalTaskActivityBehavior externalTaskActivityBehavior = (ExternalTaskActivityBehavior) activityBehavior;
String topicName = serviceTaskElement.attributeNS(BpmnParse.CAMUNDA_BPMN_EXTENSIONS_NS,
BpmnParse.PROPERTYNAME_EXTERNAL_TASK_TOPIC);
activity.addListener(ExecutionListener.EVENTNAME_START, new ProgressLoggingExecutionListener(topicName,
this.requestsTopicTopicArn));
}
}
}
The code above returns the following exception when a trigger the service task corresponding to the external task:
2022-02-21 07:23:47.818 WARN 2276 --- [nio-8989-exec-2] org.camunda.bpm.engine.rest.exception : ENGINE-REST-HTTP500 java.lang.NullPointerException
at org.camunda.bpm.engine.impl.persistence.entity.ExecutionEntity.getProcessEngineServices(ExecutionEntity.java:1952)
at br.com.ordermanagement.ProgressLoggingExecutionListener.lambda$0(ProgressLoggingExecutionListener.java:48)
at org.camunda.bpm.engine.spring.SpringTransactionContext$2.beforeCommit(SpringTransactionContext.java:80)
at org.springframework.transaction.support.TransactionSynchronizationUtils.triggerBeforeCommit(TransactionSynchronizationUtils.java:97)
at org.springframework.transaction.support.AbstractPlatformTransactionManager.triggerBeforeCommit(AbstractPlatformTransactionManager.java:916)
at org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:727)
at org.springframework.transaction.support.AbstractPlatformTransactionManager.commit(AbstractPlatformTransactionManager.java:711)
at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:152)
at org.camunda.bpm.engine.spring.SpringTransactionInterceptor.execute(SpringTransactionInterceptor.java:70)
at org.camunda.bpm.engine.impl.interceptor.ProcessApplicationContextInterceptor.execute(ProcessApplicationContextInterceptor.java:70)
at org.camunda.bpm.engine.impl.interceptor.CommandCounterInterceptor.execute(CommandCounterInterceptor.java:35)
at org.camunda.bpm.engine.impl.interceptor.LogInterceptor.execute(LogInterceptor.java:33)
at org.camunda.bpm.engine.impl.FormServiceImpl.submitStartForm(FormServiceImpl.java:83)
at org.camunda.bpm.engine.rest.sub.repository.impl.ProcessDefinitionResourceImpl.submitForm(ProcessDefinitionResourceImpl.java:190)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.lang.reflect.Method.invoke(Unknown Source)
at org.glassfish.jersey.server.model.internal.ResourceMethodInvocationHandlerFactory.lambda$static$0(ResourceMethodInvocationHandlerFactory.java:52)
at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher$1.run(AbstractJavaResourceMethodDispatcher.java:124)
at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.invoke(AbstractJavaResourceMethodDispatcher.java:167)
at org.glassfish.jersey.server.model.internal.JavaResourceMethodDispatcherProvider$TypeOutInvoker.doDispatch(JavaResourceMethodDispatcherProvider.java:219)
at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.dispatch(AbstractJavaResourceMethodDispatcher.java:79)
at org.glassfish.jersey.server.model.ResourceMethodInvoker.invoke(ResourceMethodInvoker.java:475)
at org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:397)
at org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:81)
at org.glassfish.jersey.server.ServerRuntime$1.run(ServerRuntime.java:255)
at org.glassfish.jersey.internal.Errors$1.call(Errors.java:248)
at org.glassfish.jersey.internal.Errors$1.call(Errors.java:244)
at org.glassfish.jersey.internal.Errors.process(Errors.java:292)
at org.glassfish.jersey.internal.Errors.process(Errors.java:274)
at org.glassfish.jersey.internal.Errors.process(Errors.java:244)
at org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:265)
at org.glassfish.jersey.server.ServerRuntime.process(ServerRuntime.java:234)
at org.glassfish.jersey.server.ApplicationHandler.handle(ApplicationHandler.java:680)
at org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:394)
at org.glassfish.jersey.servlet.WebComponent.service(WebComponent.java:346)
at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:366)
at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:319)
at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:205)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:227)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)
at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:53)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)
at org.camunda.bpm.webapp.impl.engine.ProcessEnginesFilter.applyFilter(ProcessEnginesFilter.java:145)
at org.camunda.bpm.webapp.impl.filter.AbstractTemplateFilter.doFilter(AbstractTemplateFilter.java:58)
at org.camunda.bpm.spring.boot.starter.webapp.filter.LazyDelegateFilter.doFilter(LazyDelegateFilter.java:60)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)
at org.camunda.bpm.webapp.impl.security.filter.headersec.HttpHeaderSecurityFilter.doFilter(HttpHeaderSecurityFilter.java:89)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)
at org.camunda.bpm.webapp.impl.security.filter.CsrfPreventionFilter.doFilter(CsrfPreventionFilter.java:177)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)
at org.camunda.bpm.webapp.impl.security.filter.SecurityFilter.doFilterSecure(SecurityFilter.java:73)
at org.camunda.bpm.webapp.impl.security.filter.SecurityFilter.doFilter(SecurityFilter.java:57)
at org.camunda.bpm.spring.boot.starter.webapp.filter.LazyDelegateFilter.doFilter(LazyDelegateFilter.java:60)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)
at org.camunda.bpm.webapp.impl.security.auth.AuthenticationFilter$1.execute(AuthenticationFilter.java:62)
at org.camunda.bpm.webapp.impl.security.auth.AuthenticationFilter$1.execute(AuthenticationFilter.java:60)
at org.camunda.bpm.webapp.impl.security.SecurityActions.runWithAuthentications(SecurityActions.java:44)
at org.camunda.bpm.webapp.impl.security.auth.AuthenticationFilter.doFilter(AuthenticationFilter.java:60)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)
at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:100)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)
at org.springframework.web.filter.FormContentFilter.doFilterInternal(FormContentFilter.java:93)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)
at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:201)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)
at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:197)
at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:97)
at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:542)
at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:135)
at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:92)
at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:78)
at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:357)
at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:382)
at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:65)
at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:893)
at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1726)
at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49)
at org.apache.tomcat.util.threads.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1191)
at org.apache.tomcat.util.threads.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:659)
at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)
at java.lang.Thread.run(Unknown Source)
I really belive that the problem is related with the state of the trasaction context inside the below code:
Context.getCommandContext().getTransactionContext()
.addTransactionListener(TransactionState.COMMITTING, commandContext ->{
ExternalTask externalTask = execution.getProcessEngineServices()
.getExternalTaskService()
.createExternalTaskQuery()
.executionId(execution.getId())
.singleResult();
if (externalTask != null) {
try {
sendMessage(externalTask, execution.getVariables());
} catch (Exception e) {
LOGGER.log(Level.SEVERE, "Error sending SQS message", e);
}
} else {
LOGGER.info("External for model {" + execution.getBpmnModelElementInstance().getName() + "} not found");
}
Could some one give me some help?