Implemented automatic migration to latest process version

Hello,

I have implemented an automatic migration to the latest version based on the activity ids and like to provide the implementation to you:

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

import org.camunda.bpm.engine.ProcessEngine;
import org.camunda.bpm.engine.RepositoryService;
import org.camunda.bpm.engine.RuntimeService;
import org.camunda.bpm.engine.migration.MigratingProcessInstanceValidationException;
import org.camunda.bpm.engine.migration.MigrationPlan;
import org.camunda.bpm.engine.migration.MigrationPlanBuilder;
import org.camunda.bpm.engine.migration.MigrationPlanExecutionBuilder;
import org.camunda.bpm.engine.repository.ProcessDefinition;
import org.camunda.bpm.engine.repository.ProcessDefinitionQuery;
import org.camunda.bpm.engine.runtime.Execution;
import org.camunda.bpm.engine.runtime.ProcessInstance;
import org.camunda.bpm.model.bpmn.BpmnModelInstance;
import org.camunda.bpm.model.bpmn.instance.Activity;
import org.camunda.bpm.model.xml.instance.ModelElementInstance;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;

@Service
public class ProcessMigratonService {

    private static final Logger LOGGER = LoggerFactory.getLogger(ProcessMigratonService.class);

    @Value("${pubx.process.migration.afterStartup:false}")
    private boolean enableMigrationAfterStartup;
    @Value("${pubx.process.migration.batchSize:1000}")
    private int migrationBatchSize = 1000;

    private RuntimeService runtimeService;
    private RepositoryService repositoryService;

    @Autowired
    private ProcessMigratonService self;


    @EventListener(ApplicationReadyEvent.class)
    public void executeAutomaticMigrations() {
        if (this.enableMigrationAfterStartup) {
            migrate();
        }
    }

    public void migrate() {
        for (final ProcessDefinition procDef : getProcessDefinitions()) {
            this.self.migrate(procDef);
        }
    }

    @Async
    public void migrate(final ProcessDefinition procDef) {
        for (final ProcessDefinition procDefOutdated : getOutdatedProcessDefinitions(procDef)) {
            final String sourceProcessDefinitionId = procDefOutdated.getId();
            final String targetProcessDefinitionId = procDef.getId();
            migrate(sourceProcessDefinitionId, targetProcessDefinitionId);
        }
    }

    private void migrate(final String sourceProcessDefinitionId, final String targetProcessDefinitionId) {
        final MigrationPlanBuilder migrationPlanBuilder = this.runtimeService
                .createMigrationPlan(sourceProcessDefinitionId, targetProcessDefinitionId).mapEqualActivities();
        final MigrationPlan migrationPlan = migrationPlanBuilder.build();

        final Set<String> activityIds = getActivityIds(sourceProcessDefinitionId);
        for (final String activityId : activityIds) {
            final List<ProcessInstance> processInstancesBefore = this.runtimeService.createProcessInstanceQuery()
                    .processDefinitionId(sourceProcessDefinitionId).activityIdIn(activityId).active().list();
            if (!processInstancesBefore.isEmpty()) {
                final List<String> processInstanceIds = getIds(processInstancesBefore);
                ProcessMigratonService.LOGGER.info("Migrating {} process instance(s) from {} to {}", Integer.valueOf(processInstanceIds.size()),
                        sourceProcessDefinitionId, targetProcessDefinitionId);
                final List<List<String>> processInstanceIdsBatchs = toBatches(processInstanceIds, this.migrationBatchSize);
                for (final List<String> processInstanceIdsBatch : processInstanceIdsBatchs) {
                    migrate(sourceProcessDefinitionId, targetProcessDefinitionId, migrationPlan, processInstanceIdsBatch);
                }
            }
        }
        suspendEmptyProcessDefinition(sourceProcessDefinitionId);
    }

    private void migrate(final String sourceProcessDefinitionId, final String targetProcessDefinitionId, final MigrationPlan migrationPlan,
            final List<String> processInstanceIdsBatch) {
        try {
            final MigrationPlanExecutionBuilder migration = this.runtimeService.newMigration(migrationPlan)
                    .processInstanceIds(processInstanceIdsBatch);
            migration.execute();
        } catch (final MigratingProcessInstanceValidationException e) {
            ProcessMigratonService.LOGGER.error("Migrating batch of {} process instance(s) from {} to {} failed",
                    Integer.valueOf(processInstanceIdsBatch.size()), sourceProcessDefinitionId, targetProcessDefinitionId, e);
        }
    }

    private void suspendEmptyProcessDefinition(final String processDefinitionId) {
        final List<ProcessInstance> processInstancesAfter = this.runtimeService.createProcessInstanceQuery()
                .processDefinitionId(processDefinitionId).active().list();
        if (processInstancesAfter.isEmpty()) {
            this.repositoryService.suspendProcessDefinitionById(processDefinitionId);
        } else {
            ProcessMigratonService.LOGGER.info("Was not able to migrate {} process instance(s) from {}",
                    Integer.valueOf(processInstancesAfter.size()), processDefinitionId);
        }
    }

    private List<ProcessDefinition> getProcessDefinitions() {
        final ProcessDefinitionQuery processDefinitionQuery = this.repositoryService.createProcessDefinitionQuery();
        final List<ProcessDefinition> result = processDefinitionQuery.orderByProcessDefinitionKey().asc().latestVersion().list();
        return result;
    }

    private List<ProcessDefinition> getOutdatedProcessDefinitions(final ProcessDefinition procDef) {
        final ProcessDefinitionQuery processDefinitionQuery = this.repositoryService.createProcessDefinitionQuery();
        final List<ProcessDefinition> list = processDefinitionQuery.processDefinitionKey(procDef.getKey()).list();
        final List<ProcessDefinition> result = new ArrayList<>();
        for (final ProcessDefinition entry : list) {
            if (!entry.isSuspended() && (entry.getVersion() != procDef.getVersion())) {
                result.add(entry);
            }
        }
        return result;
    }

    private Set<String> getActivityIds(final String processDefinitionId) {
        final BpmnModelInstance bpmModel = this.repositoryService.getBpmnModelInstance(processDefinitionId);
        final Set<String> activityIds = new HashSet<>();
        final Collection<ModelElementInstance> activties = bpmModel.getModelElementsByType(bpmModel.getModel().getType(Activity.class));
        for (final ModelElementInstance activty : activties) {
            activityIds.add(((Activity) activty).getId());
        }
        return activityIds;
    }

    private List<String> getIds(final List<? extends Execution> list) {
        final List<String> result = new ArrayList<>();
        for (final Execution entry : list) {
            result.add(entry.getId());
        }
        return result;
    }
    
    public static <TObject> List<List<TObject>> toBatches(final List<TObject> list, final int batchSize) {
        final List<List<TObject>> result = new ArrayList<>();
        List<TObject> batch = null;
        int batchCount = 0;
        for (final TObject entry : list) {
            if (batch == null) {
                batch = new ArrayList<>();
                result.add(batch);
            }
            batch.add(entry);
            if ((batchCount++ > batchSize) && (batchSize >= 0)) {
                batch = null;
                batchCount = 0;
            }
        }
        return result;
    }


    @SuppressWarnings("squid:S3242")
    @Autowired
    public void setCamunda(final ProcessEngine camunda) {
        this.runtimeService = camunda.getRuntimeService();
        this.repositoryService = camunda.getRepositoryService();
    }

}
3 Likes

Thats great! Thanks,
Do you have a git repo for this sot hat other people could find it easier or even contribute to it?

No, not a public one for such kind of things. But you are allowed to add it to your spring boot example app.

Could you put this as a pr to the spring boot example in the camunda-bpm-migration extension?

The extension aims to provide a fluent dsl for migration and I am currently working on an updated version. Thanks a lot

1 Like