Hello,
I have implemented an automatic migration to the latest version based on the activity ids and like to provide the implementation to you:
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.camunda.bpm.engine.ProcessEngine;
import org.camunda.bpm.engine.RepositoryService;
import org.camunda.bpm.engine.RuntimeService;
import org.camunda.bpm.engine.migration.MigratingProcessInstanceValidationException;
import org.camunda.bpm.engine.migration.MigrationPlan;
import org.camunda.bpm.engine.migration.MigrationPlanBuilder;
import org.camunda.bpm.engine.migration.MigrationPlanExecutionBuilder;
import org.camunda.bpm.engine.repository.ProcessDefinition;
import org.camunda.bpm.engine.repository.ProcessDefinitionQuery;
import org.camunda.bpm.engine.runtime.Execution;
import org.camunda.bpm.engine.runtime.ProcessInstance;
import org.camunda.bpm.model.bpmn.BpmnModelInstance;
import org.camunda.bpm.model.bpmn.instance.Activity;
import org.camunda.bpm.model.xml.instance.ModelElementInstance;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
@Service
public class ProcessMigratonService {
private static final Logger LOGGER = LoggerFactory.getLogger(ProcessMigratonService.class);
@Value("${pubx.process.migration.afterStartup:false}")
private boolean enableMigrationAfterStartup;
@Value("${pubx.process.migration.batchSize:1000}")
private int migrationBatchSize = 1000;
private RuntimeService runtimeService;
private RepositoryService repositoryService;
@Autowired
private ProcessMigratonService self;
@EventListener(ApplicationReadyEvent.class)
public void executeAutomaticMigrations() {
if (this.enableMigrationAfterStartup) {
migrate();
}
}
public void migrate() {
for (final ProcessDefinition procDef : getProcessDefinitions()) {
this.self.migrate(procDef);
}
}
@Async
public void migrate(final ProcessDefinition procDef) {
for (final ProcessDefinition procDefOutdated : getOutdatedProcessDefinitions(procDef)) {
final String sourceProcessDefinitionId = procDefOutdated.getId();
final String targetProcessDefinitionId = procDef.getId();
migrate(sourceProcessDefinitionId, targetProcessDefinitionId);
}
}
private void migrate(final String sourceProcessDefinitionId, final String targetProcessDefinitionId) {
final MigrationPlanBuilder migrationPlanBuilder = this.runtimeService
.createMigrationPlan(sourceProcessDefinitionId, targetProcessDefinitionId).mapEqualActivities();
final MigrationPlan migrationPlan = migrationPlanBuilder.build();
final Set<String> activityIds = getActivityIds(sourceProcessDefinitionId);
for (final String activityId : activityIds) {
final List<ProcessInstance> processInstancesBefore = this.runtimeService.createProcessInstanceQuery()
.processDefinitionId(sourceProcessDefinitionId).activityIdIn(activityId).active().list();
if (!processInstancesBefore.isEmpty()) {
final List<String> processInstanceIds = getIds(processInstancesBefore);
ProcessMigratonService.LOGGER.info("Migrating {} process instance(s) from {} to {}", Integer.valueOf(processInstanceIds.size()),
sourceProcessDefinitionId, targetProcessDefinitionId);
final List<List<String>> processInstanceIdsBatchs = toBatches(processInstanceIds, this.migrationBatchSize);
for (final List<String> processInstanceIdsBatch : processInstanceIdsBatchs) {
migrate(sourceProcessDefinitionId, targetProcessDefinitionId, migrationPlan, processInstanceIdsBatch);
}
}
}
suspendEmptyProcessDefinition(sourceProcessDefinitionId);
}
private void migrate(final String sourceProcessDefinitionId, final String targetProcessDefinitionId, final MigrationPlan migrationPlan,
final List<String> processInstanceIdsBatch) {
try {
final MigrationPlanExecutionBuilder migration = this.runtimeService.newMigration(migrationPlan)
.processInstanceIds(processInstanceIdsBatch);
migration.execute();
} catch (final MigratingProcessInstanceValidationException e) {
ProcessMigratonService.LOGGER.error("Migrating batch of {} process instance(s) from {} to {} failed",
Integer.valueOf(processInstanceIdsBatch.size()), sourceProcessDefinitionId, targetProcessDefinitionId, e);
}
}
private void suspendEmptyProcessDefinition(final String processDefinitionId) {
final List<ProcessInstance> processInstancesAfter = this.runtimeService.createProcessInstanceQuery()
.processDefinitionId(processDefinitionId).active().list();
if (processInstancesAfter.isEmpty()) {
this.repositoryService.suspendProcessDefinitionById(processDefinitionId);
} else {
ProcessMigratonService.LOGGER.info("Was not able to migrate {} process instance(s) from {}",
Integer.valueOf(processInstancesAfter.size()), processDefinitionId);
}
}
private List<ProcessDefinition> getProcessDefinitions() {
final ProcessDefinitionQuery processDefinitionQuery = this.repositoryService.createProcessDefinitionQuery();
final List<ProcessDefinition> result = processDefinitionQuery.orderByProcessDefinitionKey().asc().latestVersion().list();
return result;
}
private List<ProcessDefinition> getOutdatedProcessDefinitions(final ProcessDefinition procDef) {
final ProcessDefinitionQuery processDefinitionQuery = this.repositoryService.createProcessDefinitionQuery();
final List<ProcessDefinition> list = processDefinitionQuery.processDefinitionKey(procDef.getKey()).list();
final List<ProcessDefinition> result = new ArrayList<>();
for (final ProcessDefinition entry : list) {
if (!entry.isSuspended() && (entry.getVersion() != procDef.getVersion())) {
result.add(entry);
}
}
return result;
}
private Set<String> getActivityIds(final String processDefinitionId) {
final BpmnModelInstance bpmModel = this.repositoryService.getBpmnModelInstance(processDefinitionId);
final Set<String> activityIds = new HashSet<>();
final Collection<ModelElementInstance> activties = bpmModel.getModelElementsByType(bpmModel.getModel().getType(Activity.class));
for (final ModelElementInstance activty : activties) {
activityIds.add(((Activity) activty).getId());
}
return activityIds;
}
private List<String> getIds(final List<? extends Execution> list) {
final List<String> result = new ArrayList<>();
for (final Execution entry : list) {
result.add(entry.getId());
}
return result;
}
public static <TObject> List<List<TObject>> toBatches(final List<TObject> list, final int batchSize) {
final List<List<TObject>> result = new ArrayList<>();
List<TObject> batch = null;
int batchCount = 0;
for (final TObject entry : list) {
if (batch == null) {
batch = new ArrayList<>();
result.add(batch);
}
batch.add(entry);
if ((batchCount++ > batchSize) && (batchSize >= 0)) {
batch = null;
batchCount = 0;
}
}
return result;
}
@SuppressWarnings("squid:S3242")
@Autowired
public void setCamunda(final ProcessEngine camunda) {
this.runtimeService = camunda.getRuntimeService();
this.repositoryService = camunda.getRepositoryService();
}
}