Flowable工作流引擎源码深度解析
前言
Flowable是一个轻量级的业务流程引擎,基于BPMN 2.0规范实现,是Activiti项目的一个分支。作为Java生态中最流行的工作流引擎之一,了解其内部实现对于定制化开发和性能优化至关重要。本文将深入分析Flowable的核心源码结构和执行逻辑,帮助开发者更好地理解和使用这一强大工具。
核心架构概览
Flowable的源码主要分为以下几个核心模块:
- flowable-engine:核心引擎实现
- flowable-bpmn-converter:BPMN模型转换器
- flowable-process-validation:流程验证模块
- flowable-image-generator:流程图生成模块
- flowable-rest:REST API实现
其中,flowable-engine
是最核心的部分,我们的分析也将主要集中在这个模块上。
ProcessEngine初始化流程
Flowable的入口是ProcessEngine
接口,通常通过ProcessEngineConfiguration
来创建。让我们看看其初始化过程:
public class ProcessEngineConfigurationImpl extends ProcessEngineConfiguration {
public ProcessEngine buildProcessEngine() {
init();
ProcessEngineImpl processEngine = new ProcessEngineImpl(this);
postProcessEngineInitialisation();
return processEngine;
}
public void init() {
initCommandContextFactory();
initTransactionContextFactory();
initCommandExecutors();
initServices();
initDataSource();
initDbSchema();
initBeans();
initTransactionFactory();
// 其他初始化方法...
}
}
初始化过程主要包括:
- 初始化命令上下文工厂
- 初始化事务上下文工厂
- 初始化命令执行器
- 初始化各种服务
- 初始化数据源和数据库架构
- 初始化事务工厂等
这种设计遵循了良好的工厂模式和构建器模式。
命令模式的应用
Flowable大量使用了命令模式,所有对流程的操作都被封装为Command对象:
public interface Command<T> {
T execute(CommandContext commandContext);
}
执行命令的是CommandExecutor
,它主要有两个实现:
CommandExecutorImpl
:普通实现TransactionCommandExecutor
:带事务的实现
让我们看看CommandExecutorImpl
的实现:
public class CommandExecutorImpl implements CommandExecutor {
protected CommandContextFactory commandContextFactory;
protected TransactionContextFactory transactionContextFactory;
public <T> T execute(Command<T> command) {
CommandContext commandContext = commandContextFactory.createCommandContext(command);
try {
T result = command.execute(commandContext);
commandContext.close();
return result;
} catch (Exception e) {
commandContext.exception(e);
} finally {
try {
commandContext.close();
} catch (Exception e) {
// 日志记录
}
}
return null;
}
}
这种设计确保了所有流程操作都在一个一致的上下文中执行,并且可以正确处理事务和异常。
流程定义加载
当我们部署一个BPMN文件时,Flowable会解析它并转换为内部模型。核心类是BpmnParser
:
public class BpmnParser {
public BpmnParse createParse() {
return new BpmnParse(this);
}
public BpmnParse parse(InputStream inputStream) {
BpmnParse bpmnParse = createParse();
bpmnParse.sourceInputStream = inputStream;
bpmnParse.execute();
return bpmnParse;
}
}
BpmnParse
类负责具体的解析逻辑:
public class BpmnParse extends BpmnParseHandler {
public BpmnParse execute() {
try {
// 解析XML文档
DocumentBuilderFactory dbf = XmlUtil.createSafeDocumentBuilderFactory();
Document document = dbf.newDocumentBuilder().parse(sourceInputStream);
// 解析BPMN元素
parseRootElement(document.getDocumentElement());
// 处理流程定义
processDI();
// 完成解析
executeParse();
} catch (Exception e) {
// 异常处理
}
return this;
}
protected void parseRootElement(Element rootElement) {
// 解析流程、任务、网关等元素
}
}
这个过程将BPMN文件转换为ProcessDefinitionEntity
对象,其中包含了流程的所有信息。
流程执行引擎
流程实例的执行由ExecutionEntity
类负责:
public class ExecutionEntity implements Execution, ExecutionListenerContainer {
protected String id;
protected ProcessDefinitionEntity processDefinition;
protected String businessKey;
protected String activityId;
protected ExecutionEntity parent;
protected List<ExecutionEntity> executions = new ArrayList<>();
// 其他属性...
public void start() {
CommandContext commandContext = Context.getCommandContext();
// 执行流程实例启动逻辑
// 触发事件监听器
// 执行第一个活动节点
}
public void continueExecution() {
ExecutionEntity execution = this;
while (execution != null && execution.isActive()) {
ActivityImpl activity = execution.getActivity();
if (activity != null) {
// 执行当前活动节点
execution = activity.execute(execution);
} else {
// 已完成
execution = null;
}
}
}
}
每个流程实例都对应一个ExecutionEntity
,每个并行流程也对应一个子ExecutionEntity
。
任务管理
任务由TaskEntity
类表示:
public class TaskEntity implements Task {
protected String id;
protected String name;
protected String description;
protected String assignee;
protected Date createTime;
protected String executionId;
protected String processInstanceId;
// 其他属性...
public void complete() {
// 验证任务状态
// 执行任务完成逻辑
// 触发事件监听器
// 推进流程执行
}
}
任务的创建、分配和完成都是通过TaskService
接口实现的:
public interface TaskService {
Task newTask();
void saveTask(Task task);
void deleteTask(String taskId);
void claim(String taskId, String userId);
void complete(String taskId);
// 其他方法...
}
事件监听机制
Flowable提供了丰富的事件监听机制,核心接口是FlowableEventListener
:
public interface FlowableEventListener {
void onEvent(FlowableEvent event);
boolean isFailOnException();
}
事件类型由FlowableEventType
枚举定义,包括流程启动、任务创建、流程完成等多种类型。
事件的分发由FlowableEventDispatcher
接口负责:
public interface FlowableEventDispatcher {
void addEventListener(FlowableEventListener listener);
void addEventListener(FlowableEventListener listener, FlowableEventType... types);
void removeEventListener(FlowableEventListener listener);
void dispatchEvent(FlowableEvent event);
}
这种设计允许我们在流程的各个阶段插入自定义逻辑。
数据持久化
Flowable使用MyBatis作为ORM框架进行数据持久化。核心接口是DbSqlSession
:
public class DbSqlSession implements Session {
protected SqlSession sqlSession;
protected DbSqlSessionFactory dbSqlSessionFactory;
protected List<PersistentObject> insertedObjects = new ArrayList<>();
protected List<PersistentObject> updatedObjects = new ArrayList<>();
protected List<PersistentObject> deletedObjects = new ArrayList<>();
public void flush() {
// 处理插入对象
for (PersistentObject insertedObject : insertedObjects) {
String insertStatement = dbSqlSessionFactory.getInsertStatement(insertedObject);
sqlSession.insert(insertStatement, insertedObject);
}
// 处理更新对象
for (PersistentObject updatedObject : updatedObjects) {
String updateStatement = dbSqlSessionFactory.getUpdateStatement(updatedObject);
sqlSession.update(updateStatement, updatedObject);
}
// 处理删除对象
for (PersistentObject deletedObject : deletedObjects) {
String deleteStatement = dbSqlSessionFactory.getDeleteStatement(deletedObject);
sqlSession.delete(deleteStatement, deletedObject);
}
// 清空缓存
insertedObjects.clear();
updatedObjects.clear();
deletedObjects.clear();
}
}
所有实体对象的变更都会被记录在这些列表中,然后在事务提交时一次性写入数据库。
性能优化
Flowable做了许多性能优化,其中最重要的是缓存机制:
public class DeploymentCache<T> {
protected Map<String, T> cache = new HashMap<>();
protected int limit;
protected LinkedList<String> keyList = new LinkedList<>();
public void add(String id, T obj) {
if (limit > 0 && keyList.size() >= limit) {
String oldestKey = keyList.removeFirst();
cache.remove(oldestKey);
}
cache.put(id, obj);
keyList.addLast(id);
}
public T get(String id) {
return cache.get(id);
}
public void remove(String id) {
cache.remove(id);
keyList.remove(id);
}
}
这种LRU缓存确保了频繁使用的流程定义可以快速获取,而不需要重复从数据库加载。
扩展点分析
Flowable提供了丰富的扩展点,允许开发者定制化流程行为:
- TaskListener:任务生命周期监听器
- ExecutionListener:执行流程监听器
- ActivityBehavior:自定义活动行为
- ExpressionManager:表达式管理器
- VariableType:自定义变量类型
以TaskListener
为例:
public interface TaskListener {
String EVENTNAME_CREATE = "create";
String EVENTNAME_ASSIGNMENT = "assignment";
String EVENTNAME_COMPLETE = "complete";
String EVENTNAME_DELETE = "delete";
void notify(DelegateTask delegateTask);
}
通过实现这个接口,我们可以在任务的各个生命周期阶段插入自定义逻辑。
异步执行器
Flowable 6引入了新的异步执行器,替代了旧的JobExecutor:
public class DefaultAsyncExecutor implements AsyncExecutor {
protected ThreadPoolExecutor threadPoolExecutor;
protected RejectedExecutionHandler rejectedExecutionHandler;
protected ThreadFactory threadFactory;
public void start() {
if (threadPoolExecutor == null) {
threadPoolExecutor = new ThreadPoolExecutor(
corePoolSize,
maxPoolSize,
keepAliveTime,
TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(queueSize),
threadFactory,
rejectedExecutionHandler
);
}
// 启动作业获取线程
startJobAcquisitionThread();
}
protected void executeAsyncJob(Job job) {
Runnable runnable = new ExecuteAsyncRunnable(job, this);
threadPoolExecutor.execute(runnable);
}
}
这种设计使得Flowable可以高效地处理大量的异步任务。
我很乐意解释Flowable 6.x中如何实现动态加节点,并提供一个实用的demo。
在Flowable 6.x中,动态修改流程实例主要通过RuntimeService
的API来实现,特别是通过createProcessInstanceModification
方法。下面我将详细介绍实现方法并提供一个完整的示例。
package com.example.flowable;
import org.flowable.engine.*;
import org.flowable.engine.impl.cfg.StandaloneProcessEngineConfiguration;
import org.flowable.engine.repository.Deployment;
import org.flowable.engine.repository.ProcessDefinition;
import org.flowable.engine.runtime.ProcessInstance;
import org.flowable.task.api.Task;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* Flowable 6.x 动态添加节点示例
*/
public class FlowableDynamicTaskDemo {
public static void main(String[] args) {
// 初始化流程引擎
ProcessEngine processEngine = initProcessEngine();
// 获取各种服务
RepositoryService repositoryService = processEngine.getRepositoryService();
RuntimeService runtimeService = processEngine.getRuntimeService();
TaskService taskService = processEngine.getTaskService();
// 部署流程定义
Deployment deployment = repositoryService.createDeployment()
.addClasspathResource("dynamic-process.bpmn20.xml")
.deploy();
ProcessDefinition processDefinition = repositoryService.createProcessDefinitionQuery()
.deploymentId(deployment.getId())
.singleResult();
System.out.println("流程定义部署完成: " + processDefinition.getName());
// 启动流程实例
Map<String, Object> variables = new HashMap<>();
variables.put("applicant", "张三");
variables.put("amount", 5000);
ProcessInstance processInstance = runtimeService.startProcessInstanceByKey("dynamicProcess", variables);
System.out.println("流程实例启动成功,ID: " + processInstance.getId());
// 查询当前任务
Task currentTask = taskService.createTaskQuery()
.processInstanceId(processInstance.getId())
.singleResult();
System.out.println("当前任务: " + currentTask.getName());
// 动态添加一个审核任务节点
System.out.println("开始动态添加节点...");
dynamicallyAddTask(runtimeService, processInstance.getId(), currentTask.getId());
// 完成当前任务
taskService.complete(currentTask.getId());
System.out.println("完成任务: " + currentTask.getName());
// 查看动态添加的任务
List<Task> tasks = taskService.createTaskQuery()
.processInstanceId(processInstance.getId())
.list();
for (Task task : tasks) {
System.out.println("当前活动任务: " + task.getName() + ", ID: " + task.getId());
// 完成动态添加的任务
taskService.complete(task.getId());
System.out.println("完成任务: " + task.getName());
}
// 检查流程是否结束
ProcessInstance runningInstance = runtimeService.createProcessInstanceQuery()
.processInstanceId(processInstance.getId())
.singleResult();
if (runningInstance == null) {
System.out.println("流程实例已完成");
} else {
System.out.println("流程实例仍在运行");
}
}
/**
* 动态添加任务节点
*/
private static void dynamicallyAddTask(RuntimeService runtimeService, String processInstanceId, String activityId) {
runtimeService.createProcessInstanceModification(processInstanceId)
.startBeforeActivity(activityId) // 在当前活动前启动
.addExecution() // 添加一个执行分支
.callActivity("dynamicTask") // 调用一个动态任务活动
.setVariable("reviewer", "李四") // 设置变量
.setVariable("dynamicTaskName", "财务经理审核") // 动态任务名称
.execute();
System.out.println("动态任务添加成功");
}
/**
* 初始化流程引擎
*/
private static ProcessEngine initProcessEngine() {
ProcessEngineConfiguration cfg = new StandaloneProcessEngineConfiguration()
.setJdbcUrl("jdbc:h2:mem:flowable;DB_CLOSE_DELAY=-1")
.setJdbcUsername("sa")
.setJdbcPassword("")
.setJdbcDriver("org.h2.Driver")
.setDatabaseSchemaUpdate(ProcessEngineConfiguration.DB_SCHEMA_UPDATE_TRUE);
return cfg.buildProcessEngine();
}
}
<?xml version="1.0" encoding="UTF-8"?>
<definitions xmlns="http://www.omg.org/spec/BPMN/20100524/MODEL"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:flowable="http://flowable.org/bpmn"
targetNamespace="http://www.flowable.org/processdef">
<process id="dynamicProcess" name="动态节点示例流程" isExecutable="true">
<startEvent id="startEvent" name="开始"/>
<sequenceFlow id="flow1" sourceRef="startEvent" targetRef="firstTask"/>
<userTask id="firstTask" name="部门经理审批" flowable:assignee="${applicant}">
<documentation>申请金额: ${amount}</documentation>
</userTask>
<sequenceFlow id="flow2" sourceRef="firstTask" targetRef="dynamicTask"/>
<userTask id="dynamicTask" name="动态任务" flowable:assignee="${reviewer}">
<documentation>这是一个可以被动态添加的任务节点</documentation>
</userTask>
<sequenceFlow id="flow3" sourceRef="dynamicTask" targetRef="endEvent"/>
<endEvent id="endEvent" name="结束"/>
</process>
</definitions>
在Flowable 6.x中动态添加节点主要有以下几种方法:
主要方法
-
流程实例修改 - 通过RuntimeService提供的API:
runtimeService.createProcessInstanceModification(processInstanceId) .startBeforeActivity("activityId") // 在指定活动前启动 .execute();
-
动态子流程注入 - 在运行时动态添加子流程:
runtimeService.createProcessInstanceModification(processInstanceId) .addExecution() .callActivity("subProcessId") .execute();
-
动态创建活动 - 直接创建和执行新活动:
runtimeService.createProcessInstanceModification(processInstanceId) .startBeforeActivity("userTaskId") .setVariable("assignee", "动态分配的用户") .execute();
使用场景
动态添加节点在以下场景特别有用:
- 基于业务规则动态调整审批流程
- 根据申请金额/内容添加额外的审核步骤
- 特殊情况下插入临时审核/处理节点
- 灵活处理流程异常情况
实现要点
- 确保节点ID唯一性:动态添加的节点必须有唯一的ID
- 维护执行链的完整性:确保流程能正确流转
- 变量传递:为动态节点设置必要的流程变量
- 权限管理:动态添加的任务需要正确分配处理人
注意事项
- 过度使用动态节点会使流程追踪和维护变得困难
- 动态节点可能不会显示在流程图上,需要额外的审计记录
- 升级到Flowable 7时,部分API可能会有变化
- 确保动态添加的节点在流程异常时能够被正确处理
这个示例展示了如何在Flowable 6.x版本中动态添加一个审核任务节点。你可以根据实际需求调整代码,例如添加多个节点、条件节点或并行节点。
结论
通过深入分析Flowable的源码,我们可以看到它采用了许多优秀的设计模式:
- 命令模式:封装所有流程操作
- 工厂模式:创建各种对象
- 构建器模式:配置引擎
- 策略模式:不同的活动行为
- 观察者模式:事件监听机制
这些设计使得Flowable既灵活又强大,能够适应各种复杂的业务流程需求。同时,它的性能优化策略也确保了在高负载环境下的稳定运行。
对于想要深入了解工作流引擎实现的开发者,Flowable的源码提供了一个很好的学习案例。通过理解其内部机制,我们可以更好地使用和扩展这个强大的引擎。
参考资源
- Flowable GitHub仓库:https://github.com/flowable/flowable-engine
- Flowable官方文档:https://www.flowable.org/docs/userguide/index.html
- BPMN 2.0规范:https://www.omg.org/spec/BPMN/2.0/
评论