is it possible to lock an external task by its id. Currently in the Java API I only see the possibility to fetch and lock a list of tasks or make an ordinary query.
The ordinary query would be sufficient, but I do not see how I can then lock the found task by a worker?
the ticket, that you are referring to is not implemented yet. You can vote for it to increase chances of it actually getting considered to be planned for one of the future releases.
For now you would have to use esisting Fetch and Lock API.
The fact is that the solution proposed on JIRA is labeled as a workaround, but it’s impossible. Even if you have the id of the task (got by a external-task rest query) , there’s no “lock on single id” in the rest API.
I thought about a different solution that involves setting the topic dinamically (for example topic = “anytopic.specific_identifier”).
Whatever specific_identifier is, it must be enough to identify only one external task for a certain worker, so the topic in itself will act as filter to find at most one task.
I’m not sure yet how to decide what the spcecific_identifier could be though.
This solutions isn’t neat and involves to register on the fly a worker with a different topic each time.
in search of a solution to the exact same problem that you describe we implemented a custom CommandService that would execute a LockExternalTaskById command.
The service bean would look something like
package custom.process.manager.service;
import org.camunda.bpm.engine.ProcessEngine;
import org.camunda.bpm.engine.impl.cfg.ProcessEngineConfigurationImpl;
import org.camunda.bpm.engine.impl.interceptor.Command;
import org.camunda.bpm.engine.impl.interceptor.CommandExecutor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class CustomCommandService {
private CommandExecutor executor;
@Autowired
public CustomCommandService(@Autowired ProcessEngine processEngine) {
executor = ((ProcessEngineConfigurationImpl) processEngine.getProcessEngineConfiguration()).getCommandExecutorTxRequired();
}
/**
* executes a Command which implements the {@link Command} interface
* @param command the command from type T
* @return returns the result from type T
*/
public <T> T execute (Command<T> command){
return executor.execute(command);
}
}
and the LockExternalTaskByIdCmd is implemented similar to Camunda’s FetchExternalTasksCmd:
package custom.process.manager.service.command;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import org.apache.commons.lang3.StringUtils;
import org.camunda.bpm.engine.externaltask.LockedExternalTask;
import org.camunda.bpm.engine.impl.db.DbEntity;
import org.camunda.bpm.engine.impl.db.entitymanager.OptimisticLockingListener;
import org.camunda.bpm.engine.impl.db.entitymanager.operation.DbEntityOperation;
import org.camunda.bpm.engine.impl.db.entitymanager.operation.DbOperation;
import org.camunda.bpm.engine.impl.externaltask.LockedExternalTaskImpl;
import org.camunda.bpm.engine.impl.interceptor.Command;
import org.camunda.bpm.engine.impl.interceptor.CommandContext;
import org.camunda.bpm.engine.impl.persistence.entity.ExternalTaskEntity;
import org.camunda.bpm.engine.impl.util.ClockUtil;
import custom.process.manager.configuration.Logger;
import custom.process.manager.service.CustomProcessService;
import custom.process.manager.service.CustomTaskService;
public class LockExternalTaskByIdCmd implements Command<List<LockedExternalTask>> {
List<String> ids;
String workerId;
long lockDuration;
List<String> variablesToFetch;
Logger logger = Logger.COMMAND_LOGGER_INSTANCE;
public LockExternalTaskByIdCmd() {
variablesToFetch = Arrays.asList(CustomProcessService.BO_ID_SYMBOL);
}
@Override
public List<LockedExternalTask> execute(CommandContext commandContext) {
List<LockedExternalTask> list = new ArrayList<LockedExternalTask>();
// validate input
if (!validateInput()) {
throw new IllegalArgumentException("Invalid parameters for locking tasks");
}
// lock each task from lost
for (String id : ids) {
ExternalTaskEntity entity = commandContext.getExternalTaskManager().findExternalTaskById(id);
// check if task is already locked
if (entity.getLockExpirationTime() == null || entity.getLockExpirationTime().before(ClockUtil.getCurrentTime())) {
entity.lock(workerId, lockDuration);
// add task to list
list.add(LockedExternalTaskImpl.fromEntity(entity, variablesToFetch, false));
}
}
// add optimistic locking listener to confirm camunda entity management
filterOnOptimisticLockingFailure(commandContext, list);
return list;
}
/**
* filter task ist with optimistic looking to remove not existing tasks from
* list
*
* @param commandContext
* the command conext given from the executor
* @param tasks
* the task list
*/
private void filterOnOptimisticLockingFailure(CommandContext commandContext, final List<LockedExternalTask> tasks) {
commandContext.getDbEntityManager().registerOptimisticLockingListener(new OptimisticLockingListener() {
public Class<? extends DbEntity> getEntityType() {
return ExternalTaskEntity.class;
}
public void failedOperation(DbOperation operation) {
if (operation instanceof DbEntityOperation) {
DbEntityOperation dbEntityOperation = (DbEntityOperation) operation;
DbEntity dbEntity = dbEntityOperation.getEntity();
boolean failedOperationEntityInList = false;
Iterator<LockedExternalTask> it = tasks.iterator();
while (it.hasNext()) {
LockedExternalTask resultTask = it.next();
if (resultTask.getId().equals(dbEntity.getId())) {
it.remove();
failedOperationEntityInList = true;
break;
}
}
if (!failedOperationEntityInList) {
logger.error("" + operation);
}
}
}
});
}
public LockExternalTaskByIdCmd setIds(List<String> ids) {
this.ids = ids;
return this;
}
public LockExternalTaskByIdCmd setWorkerId(String workerId) {
this.workerId = workerId;
return this;
}
public LockExternalTaskByIdCmd setLockDuration(Integer lockDuration) {
this.lockDuration = lockDuration;
return this;
}
public boolean validateInput() {
return ids.stream().filter(id -> StringUtils.isNoneEmpty(id)).count() > 0 && StringUtils.isNotEmpty(workerId)
&& lockDuration > 0;
}
}
Our resolver utilizes this custom command service like
...
@Autowired
private CustomCommandService commandService;
...
public List<ServiceTask> lockAndFetchServiceTasks(List<String> ids, String workerId, Integer lockTime) {
List<LockedExternalTask> list = new ArrayList<>();
// fetch and lock tasks by id
logger.debug("trying to fetch and lock service task with ids: [ {} ]", ids);
list = commandService.execute(new LockExternalTaskByIdCmd().setIds(ids).setLockDuration(lockTime).setWorkerId(workerId));
// map tasks to Service tasks
logger.debug("returning [ {} ] locked tasks", list.size());
return list.stream().map(this::mapServiceTask).collect(Collectors.toList());
}
Maybe this becomes obsolete by a Camunda implementation. For now we are going with this solution to lock and complete external tasks targeted with their respective ID.
This is a life saver! I was assuming that the externalTask was much more analogous to a userTask then it actually is. The camunda implementation pushes you down the path of having generic workers processing topic queues. That doesn’t work for my use case!