Flowable工作流引擎源码深度解析

jonathan
2021-10-22 / 0 评论

Flowable工作流引擎源码深度解析

前言

Flowable是一个轻量级的业务流程引擎,基于BPMN 2.0规范实现,是Activiti项目的一个分支。作为Java生态中最流行的工作流引擎之一,了解其内部实现对于定制化开发和性能优化至关重要。本文将深入分析Flowable的核心源码结构和执行逻辑,帮助开发者更好地理解和使用这一强大工具。

核心架构概览

Flowable的源码主要分为以下几个核心模块:

  1. flowable-engine:核心引擎实现
  2. flowable-bpmn-converter:BPMN模型转换器
  3. flowable-process-validation:流程验证模块
  4. flowable-image-generator:流程图生成模块
  5. flowable-rest:REST API实现

其中,flowable-engine是最核心的部分,我们的分析也将主要集中在这个模块上。

ProcessEngine初始化流程

Flowable的入口是ProcessEngine接口,通常通过ProcessEngineConfiguration来创建。让我们看看其初始化过程:

public class ProcessEngineConfigurationImpl extends ProcessEngineConfiguration {
    
    public ProcessEngine buildProcessEngine() {
        init();
        ProcessEngineImpl processEngine = new ProcessEngineImpl(this);
        postProcessEngineInitialisation();
        return processEngine;
    }
    
    public void init() {
        initCommandContextFactory();
        initTransactionContextFactory();
        initCommandExecutors();
        initServices();
        initDataSource();
        initDbSchema();
        initBeans();
        initTransactionFactory();
        // 其他初始化方法...
    }
}

初始化过程主要包括:

  1. 初始化命令上下文工厂
  2. 初始化事务上下文工厂
  3. 初始化命令执行器
  4. 初始化各种服务
  5. 初始化数据源和数据库架构
  6. 初始化事务工厂等

这种设计遵循了良好的工厂模式和构建器模式。

命令模式的应用

Flowable大量使用了命令模式,所有对流程的操作都被封装为Command对象:

public interface Command<T> {
    T execute(CommandContext commandContext);
}

执行命令的是CommandExecutor,它主要有两个实现:

  1. CommandExecutorImpl:普通实现
  2. TransactionCommandExecutor:带事务的实现

让我们看看CommandExecutorImpl的实现:

public class CommandExecutorImpl implements CommandExecutor {
    
    protected CommandContextFactory commandContextFactory;
    protected TransactionContextFactory transactionContextFactory;
    
    public <T> T execute(Command<T> command) {
        CommandContext commandContext = commandContextFactory.createCommandContext(command);
        
        try {
            T result = command.execute(commandContext);
            commandContext.close();
            return result;
        } catch (Exception e) {
            commandContext.exception(e);
        } finally {
            try {
                commandContext.close();
            } catch (Exception e) {
                // 日志记录
            }
        }
        
        return null;
    }
}

这种设计确保了所有流程操作都在一个一致的上下文中执行,并且可以正确处理事务和异常。

流程定义加载

当我们部署一个BPMN文件时,Flowable会解析它并转换为内部模型。核心类是BpmnParser

public class BpmnParser {
    
    public BpmnParse createParse() {
        return new BpmnParse(this);
    }
    
    public BpmnParse parse(InputStream inputStream) {
        BpmnParse bpmnParse = createParse();
        bpmnParse.sourceInputStream = inputStream;
        bpmnParse.execute();
        return bpmnParse;
    }
}

BpmnParse类负责具体的解析逻辑:

public class BpmnParse extends BpmnParseHandler {
    
    public BpmnParse execute() {
        try {
            // 解析XML文档
            DocumentBuilderFactory dbf = XmlUtil.createSafeDocumentBuilderFactory();
            Document document = dbf.newDocumentBuilder().parse(sourceInputStream);
            
            // 解析BPMN元素
            parseRootElement(document.getDocumentElement());
            
            // 处理流程定义
            processDI();
            
            // 完成解析
            executeParse();
            
        } catch (Exception e) {
            // 异常处理
        }
        
        return this;
    }
    
    protected void parseRootElement(Element rootElement) {
        // 解析流程、任务、网关等元素
    }
}

这个过程将BPMN文件转换为ProcessDefinitionEntity对象,其中包含了流程的所有信息。

流程执行引擎

流程实例的执行由ExecutionEntity类负责:

public class ExecutionEntity implements Execution, ExecutionListenerContainer {
    
    protected String id;
    protected ProcessDefinitionEntity processDefinition;
    protected String businessKey;
    protected String activityId;
    protected ExecutionEntity parent;
    protected List<ExecutionEntity> executions = new ArrayList<>();
    // 其他属性...
    
    public void start() {
        CommandContext commandContext = Context.getCommandContext();
        // 执行流程实例启动逻辑
        // 触发事件监听器
        // 执行第一个活动节点
    }
    
    public void continueExecution() {
        ExecutionEntity execution = this;
        while (execution != null && execution.isActive()) {
            ActivityImpl activity = execution.getActivity();
            if (activity != null) {
                // 执行当前活动节点
                execution = activity.execute(execution);
            } else {
                // 已完成
                execution = null;
            }
        }
    }
}

每个流程实例都对应一个ExecutionEntity,每个并行流程也对应一个子ExecutionEntity

任务管理

任务由TaskEntity类表示:

public class TaskEntity implements Task {
    
    protected String id;
    protected String name;
    protected String description;
    protected String assignee;
    protected Date createTime;
    protected String executionId;
    protected String processInstanceId;
    // 其他属性...
    
    public void complete() {
        // 验证任务状态
        // 执行任务完成逻辑
        // 触发事件监听器
        // 推进流程执行
    }
}

任务的创建、分配和完成都是通过TaskService接口实现的:

public interface TaskService {
    Task newTask();
    void saveTask(Task task);
    void deleteTask(String taskId);
    void claim(String taskId, String userId);
    void complete(String taskId);
    // 其他方法...
}

事件监听机制

Flowable提供了丰富的事件监听机制,核心接口是FlowableEventListener

public interface FlowableEventListener {
    void onEvent(FlowableEvent event);
    boolean isFailOnException();
}

事件类型由FlowableEventType枚举定义,包括流程启动、任务创建、流程完成等多种类型。

事件的分发由FlowableEventDispatcher接口负责:

public interface FlowableEventDispatcher {
    void addEventListener(FlowableEventListener listener);
    void addEventListener(FlowableEventListener listener, FlowableEventType... types);
    void removeEventListener(FlowableEventListener listener);
    void dispatchEvent(FlowableEvent event);
}

这种设计允许我们在流程的各个阶段插入自定义逻辑。

数据持久化

Flowable使用MyBatis作为ORM框架进行数据持久化。核心接口是DbSqlSession

public class DbSqlSession implements Session {
    
    protected SqlSession sqlSession;
    protected DbSqlSessionFactory dbSqlSessionFactory;
    protected List<PersistentObject> insertedObjects = new ArrayList<>();
    protected List<PersistentObject> updatedObjects = new ArrayList<>();
    protected List<PersistentObject> deletedObjects = new ArrayList<>();
    
    public void flush() {
        // 处理插入对象
        for (PersistentObject insertedObject : insertedObjects) {
            String insertStatement = dbSqlSessionFactory.getInsertStatement(insertedObject);
            sqlSession.insert(insertStatement, insertedObject);
        }
        
        // 处理更新对象
        for (PersistentObject updatedObject : updatedObjects) {
            String updateStatement = dbSqlSessionFactory.getUpdateStatement(updatedObject);
            sqlSession.update(updateStatement, updatedObject);
        }
        
        // 处理删除对象
        for (PersistentObject deletedObject : deletedObjects) {
            String deleteStatement = dbSqlSessionFactory.getDeleteStatement(deletedObject);
            sqlSession.delete(deleteStatement, deletedObject);
        }
        
        // 清空缓存
        insertedObjects.clear();
        updatedObjects.clear();
        deletedObjects.clear();
    }
}

所有实体对象的变更都会被记录在这些列表中,然后在事务提交时一次性写入数据库。

性能优化

Flowable做了许多性能优化,其中最重要的是缓存机制:

public class DeploymentCache<T> {
    
    protected Map<String, T> cache = new HashMap<>();
    protected int limit;
    protected LinkedList<String> keyList = new LinkedList<>();
    
    public void add(String id, T obj) {
        if (limit > 0 && keyList.size() >= limit) {
            String oldestKey = keyList.removeFirst();
            cache.remove(oldestKey);
        }
        
        cache.put(id, obj);
        keyList.addLast(id);
    }
    
    public T get(String id) {
        return cache.get(id);
    }
    
    public void remove(String id) {
        cache.remove(id);
        keyList.remove(id);
    }
}

这种LRU缓存确保了频繁使用的流程定义可以快速获取,而不需要重复从数据库加载。

扩展点分析

Flowable提供了丰富的扩展点,允许开发者定制化流程行为:

  1. TaskListener:任务生命周期监听器
  2. ExecutionListener:执行流程监听器
  3. ActivityBehavior:自定义活动行为
  4. ExpressionManager:表达式管理器
  5. VariableType:自定义变量类型

TaskListener为例:

public interface TaskListener {
    
    String EVENTNAME_CREATE = "create";
    String EVENTNAME_ASSIGNMENT = "assignment";
    String EVENTNAME_COMPLETE = "complete";
    String EVENTNAME_DELETE = "delete";
    
    void notify(DelegateTask delegateTask);
}

通过实现这个接口,我们可以在任务的各个生命周期阶段插入自定义逻辑。

异步执行器

Flowable 6引入了新的异步执行器,替代了旧的JobExecutor:

public class DefaultAsyncExecutor implements AsyncExecutor {
    
    protected ThreadPoolExecutor threadPoolExecutor;
    protected RejectedExecutionHandler rejectedExecutionHandler;
    protected ThreadFactory threadFactory;
    
    public void start() {
        if (threadPoolExecutor == null) {
            threadPoolExecutor = new ThreadPoolExecutor(
                corePoolSize, 
                maxPoolSize, 
                keepAliveTime, 
                TimeUnit.MILLISECONDS, 
                new ArrayBlockingQueue<>(queueSize), 
                threadFactory, 
                rejectedExecutionHandler
            );
        }
        
        // 启动作业获取线程
        startJobAcquisitionThread();
    }
    
    protected void executeAsyncJob(Job job) {
        Runnable runnable = new ExecuteAsyncRunnable(job, this);
        threadPoolExecutor.execute(runnable);
    }
}

这种设计使得Flowable可以高效地处理大量的异步任务。

我很乐意解释Flowable 6.x中如何实现动态加节点,并提供一个实用的demo。

在Flowable 6.x中,动态修改流程实例主要通过RuntimeService的API来实现,特别是通过createProcessInstanceModification方法。下面我将详细介绍实现方法并提供一个完整的示例。

package com.example.flowable;

import org.flowable.engine.*;
import org.flowable.engine.impl.cfg.StandaloneProcessEngineConfiguration;
import org.flowable.engine.repository.Deployment;
import org.flowable.engine.repository.ProcessDefinition;
import org.flowable.engine.runtime.ProcessInstance;
import org.flowable.task.api.Task;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * Flowable 6.x 动态添加节点示例
 */
public class FlowableDynamicTaskDemo {

    public static void main(String[] args) {
        // 初始化流程引擎
        ProcessEngine processEngine = initProcessEngine();
        
        // 获取各种服务
        RepositoryService repositoryService = processEngine.getRepositoryService();
        RuntimeService runtimeService = processEngine.getRuntimeService();
        TaskService taskService = processEngine.getTaskService();
        
        // 部署流程定义
        Deployment deployment = repositoryService.createDeployment()
                .addClasspathResource("dynamic-process.bpmn20.xml")
                .deploy();
        
        ProcessDefinition processDefinition = repositoryService.createProcessDefinitionQuery()
                .deploymentId(deployment.getId())
                .singleResult();
        
        System.out.println("流程定义部署完成: " + processDefinition.getName());
        
        // 启动流程实例
        Map<String, Object> variables = new HashMap<>();
        variables.put("applicant", "张三");
        variables.put("amount", 5000);
        
        ProcessInstance processInstance = runtimeService.startProcessInstanceByKey("dynamicProcess", variables);
        System.out.println("流程实例启动成功,ID: " + processInstance.getId());
        
        // 查询当前任务
        Task currentTask = taskService.createTaskQuery()
                .processInstanceId(processInstance.getId())
                .singleResult();
        System.out.println("当前任务: " + currentTask.getName());
        
        // 动态添加一个审核任务节点
        System.out.println("开始动态添加节点...");
        dynamicallyAddTask(runtimeService, processInstance.getId(), currentTask.getId());
        
        // 完成当前任务
        taskService.complete(currentTask.getId());
        System.out.println("完成任务: " + currentTask.getName());
        
        // 查看动态添加的任务
        List<Task> tasks = taskService.createTaskQuery()
                .processInstanceId(processInstance.getId())
                .list();
        
        for (Task task : tasks) {
            System.out.println("当前活动任务: " + task.getName() + ", ID: " + task.getId());
            
            // 完成动态添加的任务
            taskService.complete(task.getId());
            System.out.println("完成任务: " + task.getName());
        }
        
        // 检查流程是否结束
        ProcessInstance runningInstance = runtimeService.createProcessInstanceQuery()
                .processInstanceId(processInstance.getId())
                .singleResult();
        
        if (runningInstance == null) {
            System.out.println("流程实例已完成");
        } else {
            System.out.println("流程实例仍在运行");
        }
    }
    
    /**
     * 动态添加任务节点
     */
    private static void dynamicallyAddTask(RuntimeService runtimeService, String processInstanceId, String activityId) {
        runtimeService.createProcessInstanceModification(processInstanceId)
                .startBeforeActivity(activityId)  // 在当前活动前启动
                .addExecution()  // 添加一个执行分支
                .callActivity("dynamicTask")  // 调用一个动态任务活动
                .setVariable("reviewer", "李四")  // 设置变量
                .setVariable("dynamicTaskName", "财务经理审核")  // 动态任务名称
                .execute();
        
        System.out.println("动态任务添加成功");
    }
    
    /**
     * 初始化流程引擎
     */
    private static ProcessEngine initProcessEngine() {
        ProcessEngineConfiguration cfg = new StandaloneProcessEngineConfiguration()
                .setJdbcUrl("jdbc:h2:mem:flowable;DB_CLOSE_DELAY=-1")
                .setJdbcUsername("sa")
                .setJdbcPassword("")
                .setJdbcDriver("org.h2.Driver")
                .setDatabaseSchemaUpdate(ProcessEngineConfiguration.DB_SCHEMA_UPDATE_TRUE);
        
        return cfg.buildProcessEngine();
    }
}

<?xml version="1.0" encoding="UTF-8"?>
<definitions xmlns="http://www.omg.org/spec/BPMN/20100524/MODEL"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xmlns:flowable="http://flowable.org/bpmn"
             targetNamespace="http://www.flowable.org/processdef">

  <process id="dynamicProcess" name="动态节点示例流程" isExecutable="true">
    <startEvent id="startEvent" name="开始"/>
    
    <sequenceFlow id="flow1" sourceRef="startEvent" targetRef="firstTask"/>
    
    <userTask id="firstTask" name="部门经理审批" flowable:assignee="${applicant}">
      <documentation>申请金额: ${amount}</documentation>
    </userTask>
    
    <sequenceFlow id="flow2" sourceRef="firstTask" targetRef="dynamicTask"/>
    
    <userTask id="dynamicTask" name="动态任务" flowable:assignee="${reviewer}">
      <documentation>这是一个可以被动态添加的任务节点</documentation>
    </userTask>
    
    <sequenceFlow id="flow3" sourceRef="dynamicTask" targetRef="endEvent"/>
    
    <endEvent id="endEvent" name="结束"/>
  </process>

</definitions>

在Flowable 6.x中动态添加节点主要有以下几种方法:

主要方法

  1. 流程实例修改 - 通过RuntimeService提供的API:

    runtimeService.createProcessInstanceModification(processInstanceId)
        .startBeforeActivity("activityId")  // 在指定活动前启动
        .execute();
    
  2. 动态子流程注入 - 在运行时动态添加子流程:

    runtimeService.createProcessInstanceModification(processInstanceId)
        .addExecution()
        .callActivity("subProcessId")
        .execute();
    
  3. 动态创建活动 - 直接创建和执行新活动:

    runtimeService.createProcessInstanceModification(processInstanceId)
        .startBeforeActivity("userTaskId")
        .setVariable("assignee", "动态分配的用户")
        .execute();
    

使用场景

动态添加节点在以下场景特别有用:

  1. 基于业务规则动态调整审批流程
  2. 根据申请金额/内容添加额外的审核步骤
  3. 特殊情况下插入临时审核/处理节点
  4. 灵活处理流程异常情况

实现要点

  1. 确保节点ID唯一性:动态添加的节点必须有唯一的ID
  2. 维护执行链的完整性:确保流程能正确流转
  3. 变量传递:为动态节点设置必要的流程变量
  4. 权限管理:动态添加的任务需要正确分配处理人

注意事项

  1. 过度使用动态节点会使流程追踪和维护变得困难
  2. 动态节点可能不会显示在流程图上,需要额外的审计记录
  3. 升级到Flowable 7时,部分API可能会有变化
  4. 确保动态添加的节点在流程异常时能够被正确处理

这个示例展示了如何在Flowable 6.x版本中动态添加一个审核任务节点。你可以根据实际需求调整代码,例如添加多个节点、条件节点或并行节点。

结论

通过深入分析Flowable的源码,我们可以看到它采用了许多优秀的设计模式:

  1. 命令模式:封装所有流程操作
  2. 工厂模式:创建各种对象
  3. 构建器模式:配置引擎
  4. 策略模式:不同的活动行为
  5. 观察者模式:事件监听机制

这些设计使得Flowable既灵活又强大,能够适应各种复杂的业务流程需求。同时,它的性能优化策略也确保了在高负载环境下的稳定运行。

对于想要深入了解工作流引擎实现的开发者,Flowable的源码提供了一个很好的学习案例。通过理解其内部机制,我们可以更好地使用和扩展这个强大的引擎。

参考资源

  1. Flowable GitHub仓库:https://github.com/flowable/flowable-engine
  2. Flowable官方文档:https://www.flowable.org/docs/userguide/index.html
  3. BPMN 2.0规范:https://www.omg.org/spec/BPMN/2.0/

评论

博主关闭了当前页面的评论