首页
在线工具
搜索
1
Kuboard与KubeSphere的区别:Kubernetes管理平台对比
2
ShardingSphere使用中的重点问题剖析
3
Flowable工作流引擎源码深度解析
4
用AI生成的原型设计稿效果还可以
5
如何将Virtualbox和VMware虚拟机相互转换
杂谈与随笔
工具与效率
源码阅读
技术管理
运维
数据库
前端开发
后端开发
Search
标签搜索
Angular
Docker
Phabricator
SpringBoot
Java
Chrome
SpringSecurity
SpringCloud
DDD
Git
Mac
K8S
Kubernetes
ESLint
SSH
高并发
Eclipse
Javascript
Vim
Centos
Jonathan
累计撰写
86
篇文章
累计收到
0
条评论
首页
栏目
杂谈与随笔
工具与效率
源码阅读
技术管理
运维
数据库
前端开发
后端开发
页面
搜索到
1
篇与
的结果
2021-10-22
Flowable工作流引擎源码深度解析
Flowable工作流引擎源码深度解析 前言 Flowable是一个轻量级的业务流程引擎,基于BPMN 2.0规范实现,是Activiti项目的一个分支。作为Java生态中最流行的工作流引擎之一,了解其内部实现对于定制化开发和性能优化至关重要。本文将深入分析Flowable的核心源码结构和执行逻辑,帮助开发者更好地理解和使用这一强大工具。 核心架构概览 Flowable的源码主要分为以下几个核心模块: flowable-engine:核心引擎实现 flowable-bpmn-converter:BPMN模型转换器 flowable-process-validation:流程验证模块 flowable-image-generator:流程图生成模块 flowable-rest:REST API实现 其中,flowable-engine是最核心的部分,我们的分析也将主要集中在这个模块上。 ProcessEngine初始化流程 Flowable的入口是ProcessEngine接口,通常通过ProcessEngineConfiguration来创建。让我们看看其初始化过程: public class ProcessEngineConfigurationImpl extends ProcessEngineConfiguration { public ProcessEngine buildProcessEngine() { init(); ProcessEngineImpl processEngine = new ProcessEngineImpl(this); postProcessEngineInitialisation(); return processEngine; } public void init() { initCommandContextFactory(); initTransactionContextFactory(); initCommandExecutors(); initServices(); initDataSource(); initDbSchema(); initBeans(); initTransactionFactory(); // 其他初始化方法... } } 初始化过程主要包括: 初始化命令上下文工厂 初始化事务上下文工厂 初始化命令执行器 初始化各种服务 初始化数据源和数据库架构 初始化事务工厂等 这种设计遵循了良好的工厂模式和构建器模式。 命令模式的应用 Flowable大量使用了命令模式,所有对流程的操作都被封装为Command对象: public interface Command<T> { T execute(CommandContext commandContext); } 执行命令的是CommandExecutor,它主要有两个实现: CommandExecutorImpl:普通实现 TransactionCommandExecutor:带事务的实现 让我们看看CommandExecutorImpl的实现: public class CommandExecutorImpl implements CommandExecutor { protected CommandContextFactory commandContextFactory; protected TransactionContextFactory transactionContextFactory; public <T> T execute(Command<T> command) { CommandContext commandContext = commandContextFactory.createCommandContext(command); try { T result = command.execute(commandContext); commandContext.close(); return result; } catch (Exception e) { commandContext.exception(e); } finally { try { commandContext.close(); } catch (Exception e) { // 日志记录 } } return null; } } 这种设计确保了所有流程操作都在一个一致的上下文中执行,并且可以正确处理事务和异常。 流程定义加载 当我们部署一个BPMN文件时,Flowable会解析它并转换为内部模型。核心类是BpmnParser: public class BpmnParser { public BpmnParse createParse() { return new BpmnParse(this); } public BpmnParse parse(InputStream inputStream) { BpmnParse bpmnParse = createParse(); bpmnParse.sourceInputStream = inputStream; bpmnParse.execute(); return bpmnParse; } } BpmnParse类负责具体的解析逻辑: public class BpmnParse extends BpmnParseHandler { public BpmnParse execute() { try { // 解析XML文档 DocumentBuilderFactory dbf = XmlUtil.createSafeDocumentBuilderFactory(); Document document = dbf.newDocumentBuilder().parse(sourceInputStream); // 解析BPMN元素 parseRootElement(document.getDocumentElement()); // 处理流程定义 processDI(); // 完成解析 executeParse(); } catch (Exception e) { // 异常处理 } return this; } protected void parseRootElement(Element rootElement) { // 解析流程、任务、网关等元素 } } 这个过程将BPMN文件转换为ProcessDefinitionEntity对象,其中包含了流程的所有信息。 流程执行引擎 流程实例的执行由ExecutionEntity类负责: public class ExecutionEntity implements Execution, ExecutionListenerContainer { protected String id; protected ProcessDefinitionEntity processDefinition; protected String businessKey; protected String activityId; protected ExecutionEntity parent; protected List<ExecutionEntity> executions = new ArrayList<>(); // 其他属性... public void start() { CommandContext commandContext = Context.getCommandContext(); // 执行流程实例启动逻辑 // 触发事件监听器 // 执行第一个活动节点 } public void continueExecution() { ExecutionEntity execution = this; while (execution != null && execution.isActive()) { ActivityImpl activity = execution.getActivity(); if (activity != null) { // 执行当前活动节点 execution = activity.execute(execution); } else { // 已完成 execution = null; } } } } 每个流程实例都对应一个ExecutionEntity,每个并行流程也对应一个子ExecutionEntity。 任务管理 任务由TaskEntity类表示: public class TaskEntity implements Task { protected String id; protected String name; protected String description; protected String assignee; protected Date createTime; protected String executionId; protected String processInstanceId; // 其他属性... public void complete() { // 验证任务状态 // 执行任务完成逻辑 // 触发事件监听器 // 推进流程执行 } } 任务的创建、分配和完成都是通过TaskService接口实现的: public interface TaskService { Task newTask(); void saveTask(Task task); void deleteTask(String taskId); void claim(String taskId, String userId); void complete(String taskId); // 其他方法... } 事件监听机制 Flowable提供了丰富的事件监听机制,核心接口是FlowableEventListener: public interface FlowableEventListener { void onEvent(FlowableEvent event); boolean isFailOnException(); } 事件类型由FlowableEventType枚举定义,包括流程启动、任务创建、流程完成等多种类型。 事件的分发由FlowableEventDispatcher接口负责: public interface FlowableEventDispatcher { void addEventListener(FlowableEventListener listener); void addEventListener(FlowableEventListener listener, FlowableEventType... types); void removeEventListener(FlowableEventListener listener); void dispatchEvent(FlowableEvent event); } 这种设计允许我们在流程的各个阶段插入自定义逻辑。 数据持久化 Flowable使用MyBatis作为ORM框架进行数据持久化。核心接口是DbSqlSession: public class DbSqlSession implements Session { protected SqlSession sqlSession; protected DbSqlSessionFactory dbSqlSessionFactory; protected List<PersistentObject> insertedObjects = new ArrayList<>(); protected List<PersistentObject> updatedObjects = new ArrayList<>(); protected List<PersistentObject> deletedObjects = new ArrayList<>(); public void flush() { // 处理插入对象 for (PersistentObject insertedObject : insertedObjects) { String insertStatement = dbSqlSessionFactory.getInsertStatement(insertedObject); sqlSession.insert(insertStatement, insertedObject); } // 处理更新对象 for (PersistentObject updatedObject : updatedObjects) { String updateStatement = dbSqlSessionFactory.getUpdateStatement(updatedObject); sqlSession.update(updateStatement, updatedObject); } // 处理删除对象 for (PersistentObject deletedObject : deletedObjects) { String deleteStatement = dbSqlSessionFactory.getDeleteStatement(deletedObject); sqlSession.delete(deleteStatement, deletedObject); } // 清空缓存 insertedObjects.clear(); updatedObjects.clear(); deletedObjects.clear(); } } 所有实体对象的变更都会被记录在这些列表中,然后在事务提交时一次性写入数据库。 性能优化 Flowable做了许多性能优化,其中最重要的是缓存机制: public class DeploymentCache<T> { protected Map<String, T> cache = new HashMap<>(); protected int limit; protected LinkedList<String> keyList = new LinkedList<>(); public void add(String id, T obj) { if (limit > 0 && keyList.size() >= limit) { String oldestKey = keyList.removeFirst(); cache.remove(oldestKey); } cache.put(id, obj); keyList.addLast(id); } public T get(String id) { return cache.get(id); } public void remove(String id) { cache.remove(id); keyList.remove(id); } } 这种LRU缓存确保了频繁使用的流程定义可以快速获取,而不需要重复从数据库加载。 扩展点分析 Flowable提供了丰富的扩展点,允许开发者定制化流程行为: TaskListener:任务生命周期监听器 ExecutionListener:执行流程监听器 ActivityBehavior:自定义活动行为 ExpressionManager:表达式管理器 VariableType:自定义变量类型 以TaskListener为例: public interface TaskListener { String EVENTNAME_CREATE = "create"; String EVENTNAME_ASSIGNMENT = "assignment"; String EVENTNAME_COMPLETE = "complete"; String EVENTNAME_DELETE = "delete"; void notify(DelegateTask delegateTask); } 通过实现这个接口,我们可以在任务的各个生命周期阶段插入自定义逻辑。 异步执行器 Flowable 6引入了新的异步执行器,替代了旧的JobExecutor: public class DefaultAsyncExecutor implements AsyncExecutor { protected ThreadPoolExecutor threadPoolExecutor; protected RejectedExecutionHandler rejectedExecutionHandler; protected ThreadFactory threadFactory; public void start() { if (threadPoolExecutor == null) { threadPoolExecutor = new ThreadPoolExecutor( corePoolSize, maxPoolSize, keepAliveTime, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(queueSize), threadFactory, rejectedExecutionHandler ); } // 启动作业获取线程 startJobAcquisitionThread(); } protected void executeAsyncJob(Job job) { Runnable runnable = new ExecuteAsyncRunnable(job, this); threadPoolExecutor.execute(runnable); } } 这种设计使得Flowable可以高效地处理大量的异步任务。 我很乐意解释Flowable 6.x中如何实现动态加节点,并提供一个实用的demo。 在Flowable 6.x中,动态修改流程实例主要通过RuntimeService的API来实现,特别是通过createProcessInstanceModification方法。下面我将详细介绍实现方法并提供一个完整的示例。 package com.example.flowable; import org.flowable.engine.*; import org.flowable.engine.impl.cfg.StandaloneProcessEngineConfiguration; import org.flowable.engine.repository.Deployment; import org.flowable.engine.repository.ProcessDefinition; import org.flowable.engine.runtime.ProcessInstance; import org.flowable.task.api.Task; import java.util.HashMap; import java.util.List; import java.util.Map; /** * Flowable 6.x 动态添加节点示例 */ public class FlowableDynamicTaskDemo { public static void main(String[] args) { // 初始化流程引擎 ProcessEngine processEngine = initProcessEngine(); // 获取各种服务 RepositoryService repositoryService = processEngine.getRepositoryService(); RuntimeService runtimeService = processEngine.getRuntimeService(); TaskService taskService = processEngine.getTaskService(); // 部署流程定义 Deployment deployment = repositoryService.createDeployment() .addClasspathResource("dynamic-process.bpmn20.xml") .deploy(); ProcessDefinition processDefinition = repositoryService.createProcessDefinitionQuery() .deploymentId(deployment.getId()) .singleResult(); System.out.println("流程定义部署完成: " + processDefinition.getName()); // 启动流程实例 Map<String, Object> variables = new HashMap<>(); variables.put("applicant", "张三"); variables.put("amount", 5000); ProcessInstance processInstance = runtimeService.startProcessInstanceByKey("dynamicProcess", variables); System.out.println("流程实例启动成功,ID: " + processInstance.getId()); // 查询当前任务 Task currentTask = taskService.createTaskQuery() .processInstanceId(processInstance.getId()) .singleResult(); System.out.println("当前任务: " + currentTask.getName()); // 动态添加一个审核任务节点 System.out.println("开始动态添加节点..."); dynamicallyAddTask(runtimeService, processInstance.getId(), currentTask.getId()); // 完成当前任务 taskService.complete(currentTask.getId()); System.out.println("完成任务: " + currentTask.getName()); // 查看动态添加的任务 List<Task> tasks = taskService.createTaskQuery() .processInstanceId(processInstance.getId()) .list(); for (Task task : tasks) { System.out.println("当前活动任务: " + task.getName() + ", ID: " + task.getId()); // 完成动态添加的任务 taskService.complete(task.getId()); System.out.println("完成任务: " + task.getName()); } // 检查流程是否结束 ProcessInstance runningInstance = runtimeService.createProcessInstanceQuery() .processInstanceId(processInstance.getId()) .singleResult(); if (runningInstance == null) { System.out.println("流程实例已完成"); } else { System.out.println("流程实例仍在运行"); } } /** * 动态添加任务节点 */ private static void dynamicallyAddTask(RuntimeService runtimeService, String processInstanceId, String activityId) { runtimeService.createProcessInstanceModification(processInstanceId) .startBeforeActivity(activityId) // 在当前活动前启动 .addExecution() // 添加一个执行分支 .callActivity("dynamicTask") // 调用一个动态任务活动 .setVariable("reviewer", "李四") // 设置变量 .setVariable("dynamicTaskName", "财务经理审核") // 动态任务名称 .execute(); System.out.println("动态任务添加成功"); } /** * 初始化流程引擎 */ private static ProcessEngine initProcessEngine() { ProcessEngineConfiguration cfg = new StandaloneProcessEngineConfiguration() .setJdbcUrl("jdbc:h2:mem:flowable;DB_CLOSE_DELAY=-1") .setJdbcUsername("sa") .setJdbcPassword("") .setJdbcDriver("org.h2.Driver") .setDatabaseSchemaUpdate(ProcessEngineConfiguration.DB_SCHEMA_UPDATE_TRUE); return cfg.buildProcessEngine(); } } <?xml version="1.0" encoding="UTF-8"?> <definitions xmlns="http://www.omg.org/spec/BPMN/20100524/MODEL" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:flowable="http://flowable.org/bpmn" targetNamespace="http://www.flowable.org/processdef"> <process id="dynamicProcess" name="动态节点示例流程" isExecutable="true"> <startEvent id="startEvent" name="开始"/> <sequenceFlow id="flow1" sourceRef="startEvent" targetRef="firstTask"/> <userTask id="firstTask" name="部门经理审批" flowable:assignee="${applicant}"> <documentation>申请金额: ${amount}</documentation> </userTask> <sequenceFlow id="flow2" sourceRef="firstTask" targetRef="dynamicTask"/> <userTask id="dynamicTask" name="动态任务" flowable:assignee="${reviewer}"> <documentation>这是一个可以被动态添加的任务节点</documentation> </userTask> <sequenceFlow id="flow3" sourceRef="dynamicTask" targetRef="endEvent"/> <endEvent id="endEvent" name="结束"/> </process> </definitions> 在Flowable 6.x中动态添加节点主要有以下几种方法: 主要方法 流程实例修改 - 通过RuntimeService提供的API: runtimeService.createProcessInstanceModification(processInstanceId) .startBeforeActivity("activityId") // 在指定活动前启动 .execute(); 动态子流程注入 - 在运行时动态添加子流程: runtimeService.createProcessInstanceModification(processInstanceId) .addExecution() .callActivity("subProcessId") .execute(); 动态创建活动 - 直接创建和执行新活动: runtimeService.createProcessInstanceModification(processInstanceId) .startBeforeActivity("userTaskId") .setVariable("assignee", "动态分配的用户") .execute(); 使用场景 动态添加节点在以下场景特别有用: 基于业务规则动态调整审批流程 根据申请金额/内容添加额外的审核步骤 特殊情况下插入临时审核/处理节点 灵活处理流程异常情况 实现要点 确保节点ID唯一性:动态添加的节点必须有唯一的ID 维护执行链的完整性:确保流程能正确流转 变量传递:为动态节点设置必要的流程变量 权限管理:动态添加的任务需要正确分配处理人 注意事项 过度使用动态节点会使流程追踪和维护变得困难 动态节点可能不会显示在流程图上,需要额外的审计记录 升级到Flowable 7时,部分API可能会有变化 确保动态添加的节点在流程异常时能够被正确处理 这个示例展示了如何在Flowable 6.x版本中动态添加一个审核任务节点。你可以根据实际需求调整代码,例如添加多个节点、条件节点或并行节点。 结论 通过深入分析Flowable的源码,我们可以看到它采用了许多优秀的设计模式: 命令模式:封装所有流程操作 工厂模式:创建各种对象 构建器模式:配置引擎 策略模式:不同的活动行为 观察者模式:事件监听机制 这些设计使得Flowable既灵活又强大,能够适应各种复杂的业务流程需求。同时,它的性能优化策略也确保了在高负载环境下的稳定运行。 对于想要深入了解工作流引擎实现的开发者,Flowable的源码提供了一个很好的学习案例。通过理解其内部机制,我们可以更好地使用和扩展这个强大的引擎。 参考资源 Flowable GitHub仓库:https://github.com/flowable/flowable-engine Flowable官方文档:https://www.flowable.org/docs/userguide/index.html BPMN 2.0规范:https://www.omg.org/spec/BPMN/2.0/
2021年10月22日