搭建flowable并没有想象中难,我自身纯手动搭建也就花了1个小时整合到spring boot框架中,简单地实现了一些接口,至于要整合到实际项目中要看实际项目的需求,建议是往通用的方向实现,这样可以做到一劳永逸,不至于每个新的流程都要另起一套接口。 点击获取项目demo
整个流程的ID与名称配置 节点配置ID和名称 节点指定用户或用户组可以在文件生成之后在通过文本编辑器上修改,因为这个插件支持activiti,并不完全支持flowable,所以展示不了flowable的标签
流程指向箭头配置流程条件和名称
approved && !((total_amount - advance_amount) >= 300000000 || adjust_amount >= 20000000 || (adjust_amount * 50) >= total_amount) 这种就是流程条件,里面的每个变量都是流程变量,后续会通过代码层面将流程变量添加进来。流程条件是支持计算表达式的基本的流程画图完成之后,我们需要为每个节点指定一个节点权限,因为是flowable的标签在流程设计器上看不到,所以需要打开文本编辑器,这里使用sublime打开。
上图中有两个节点对应两种场景,一种是为节点指定用户,一种是为节点指定用户组。这里为节点指定用户,这个用户是可变的,因为使用的是流程变量,而不是写死的值,当然也可以是写死的值,但是不推荐,这样做的目的是为以后人员变动预留空间。为节点指定用户组,按照上图这种写法即可,在查询待办的时候需要根据这个值进行匹配则可以获取当前的待办信息。
编写流程接口,运行项目
1.配置application.yml文件,接通mysql数据源。 application.yml文件
spring: profiles: active: dev aop: # spring aop 开启cglig proxy-target-class: true jackson: property-naming-strategy: CAMEL_CASE_TO_LOWER_CASE_WITH_UNDERSCORES jpa: database: mysql show-sql: true hibernate: ddl-auto: update properties: hibernate: dialect: org.hibernate.dialect.MySQL5Dialect format_sql: false flowable: #关闭定时任务JOB async-executor-activate: false history-level: full # FinancialAuditProcess 对账单流程启动key #mybatis扫描包 mybatis: mapper-locations: classpath:mapper/*Mapper.xml type-handlers-package: guoyu.com.service.workflowsrv.common server: port: 9090application-dev.yml
spring: datasource: driver-class-name: com.mysql.jdbc.Driver url: 'jdbc:mysql://localhost/workflow?useUnicode=true&characterEncoding=utf-8&useSSL=true&nullCatalogMeansCurrent=true' username: root password: 123456789 #日志打印sql logging: level: com: example: mapper : INFO # swagger扫描包 base: package: guoyu.com.controller # httpclient配置 http: maxTotal: 100 #最大连接数 defaultMaxPerRoute: 20 #并发数 connectTimeout: 1000 #创建连接的最长时间 connectionRequestTimeout: 500 #从连接池中获取到连接的最长时间 socketTimeout: 10000 #数据传输的最长时间 staleConnectionCheckEnabled: true #提交请求前测试连接是否可用FlowableApplication.java springboot启动入口
package guoyu.com; import org.mybatis.spring.annotation.MapperScan; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import springfox.documentation.swagger2.annotations.EnableSwagger2; @MapperScan("guoyu.com.service.workflowsrv.mapper") @SpringBootApplication @EnableSwagger2 public class FlowableApplication { public static void main(String[] args) { SpringApplication.run(FlowableApplication.class, args); } }entry目录里的BaseResponse.java,只是用来做一个返回响应的结构体
package guoyu.com.entry; import io.swagger.annotations.ApiModel; import io.swagger.annotations.ApiModelProperty; import liquibase.pro.packaged.E; import java.io.Serializable; /** * 响应基类 * @author jack guo * * @param <T> */ @ApiModel(value="BaseResponse", description = "响应信息") public class BaseResponse<T> implements Serializable{ @ApiModelProperty(value = "是否响应成功") private boolean success = false; @ApiModelProperty(value = "响应结果") private T result; @ApiModelProperty(value = "错误处理") private Error error = null; public boolean isSuccess() { return success; } public void setSuccess(boolean success) { this.success = success; } public T getResult() { return result; } public void setResult(T result) { this.result = result; } public Error getError() { return error; } public void setError(Error error) { this.error = error; } @ApiModel(value="Error", description = "错误处理") public static class Error { @ApiModelProperty(value = "错误码") private String code = ""; @ApiModelProperty(value = "错误信息") private String message = ""; public Error(String code, String msg){ this.code = code; this.message = msg; } public String getCode() { return code; } public void setCode(String code) { this.code = code; } public String getMessage() { return message; } public void setMessage(String message) { this.message = message; } @Override public String toString() { return "Error [code=" + code + ", message=" + message + "]"; } } public BaseResponse(boolean isSuccess, T result, Error error){ this.success = isSuccess; this.result = result; this.error = error; } public static BaseResponse success(Object result){ return new BaseResponse(true, result, null); } public static BaseResponse fail(String code, String msg){ return new BaseResponse(false, null, new Error(code, msg)); } @Override public String toString() { return "BaseResponse [success=" + success + ", result=" + result + ", error=" + error + "]"; } }其实我搭建的demo里service都可以不写,直接调用flowable提供的service.所以最主要的还是看controller目录里的核心代码。 WorkflowController.java,代码块如下:
package guoyu.com.controller; import com.mysql.cj.xdevapi.JsonArray; import guoyu.com.entry.BaseResponse; import io.swagger.annotations.ApiOperation; import liquibase.pro.packaged.A; import org.flowable.bpmn.model.BpmnModel; import org.flowable.engine.*; import org.flowable.engine.repository.Deployment; import org.flowable.engine.runtime.Execution; import org.flowable.engine.runtime.ProcessInstance; import org.flowable.image.ProcessDiagramGenerator; import org.flowable.task.api.Task; import org.flowable.task.api.TaskQuery; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; import javax.servlet.http.HttpServletResponse; import java.io.InputStream; import java.io.OutputStream; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; /** * Created by guoyu on 2020/7/1 */ @RestController @RequestMapping(value = "/workflow") public class WorkflowController { private final Logger logger = LoggerFactory.getLogger(WorkflowController.class); @Autowired private RepositoryService repositoryService; @Autowired private RuntimeService runtimeService; @Autowired private TaskService taskService; @Autowired private ProcessEngine processEngine; /** * 部署流程 * @param processName 流程定义名 * @param resourcePath 如flowable/process.bpmn * @return */ @RequestMapping(value = "/create/process", method = RequestMethod.GET) public BaseResponse<String> createProcess(String processName, String resourcePath){ Deployment deployment = repositoryService.createDeployment().name(processName).addClasspathResource(resourcePath).deploy(); return BaseResponse.success("部署流程成功"); } /** * 发起流程 * @return */ @RequestMapping(value = "/apply", method = RequestMethod.GET) public BaseResponse<String> applyProcess(){ String processName = "FinancialAuditProcess"; Map<String,Object> map = new HashMap<>(); //指定发起人 map.put("user_account", "guoyu"); ProcessInstance processInstance = runtimeService.startProcessInstanceByKey(processName, map); logger.info("流程实例ID:"+processInstance.getProcessInstanceId()); return BaseResponse.success("发起流程成功"); } /** * 查询发起人待办列表(针对用户) * @return */ @RequestMapping(value = "/user/list", method = RequestMethod.GET) public BaseResponse<List<Task>> todoList(){ String user = "guoyu"; List<Task> tasks = taskService.createTaskQuery().taskCandidateOrAssigned(user).orderByTaskCreateTime().desc().list(); List<Object> list = new ArrayList<>(); for(Task task : tasks){ list.add(task.toString()); } return BaseResponse.success(list); } /** * 查询发起人待办列表(针对用户组) * @return */ @RequestMapping(value = "/role/list", method = RequestMethod.GET) public BaseResponse<List<Task>> todoRoleList(){ List<String> roleCodes = new ArrayList<>(); roleCodes.add("FINANCE-BOOKING-REPORT-AUDIT-1"); roleCodes.add("FINANCE-BOOKING-REPORT-AUDIT-2"); List<Task> tasks = taskService.createTaskQuery().taskCandidateGroupIn(roleCodes).orderByTaskCreateTime().desc().list(); List<Object> list = new ArrayList<>(); for(Task task : tasks){ list.add(task.toString()); } return BaseResponse.success(list); } /** * 跳转下一步节点 */ @RequestMapping(value = "/approve", method = RequestMethod.GET) public BaseResponse<String> approveProcess( @RequestParam( value = "task_id") String taskId,@RequestParam( value = "approved") Boolean approved){ String user = "guoyu"; Map<String,Object> map = new HashMap<>(); //先申领任务,相当于用户将这个流程任务占用,其他在这个用户组里的用户不能看到该流程任务 taskService.claim(taskId,user); //设置流程变量 map.put("approved", approved); map.put("total_amount", 1000000000); map.put("advance_amount", 50000); map.put("adjust_amount", 45000); //再流转下一个节点 taskService.complete(taskId, map); return BaseResponse.success("审核完成"); } /** * 将节点移动到任意节点上 * @param taskId * @return */ @RequestMapping(value = "/withdraw", method = RequestMethod.GET) public BaseResponse<String> withdrawProcess( @RequestParam( value = "task_id") String taskId){ //获取当前任务,让其移动到审核节点位置 Task task = taskService.createTaskQuery().taskId(taskId).singleResult(); if(task != null ) { //将节点移动到审核节点 runtimeService.createChangeActivityStateBuilder().processInstanceId(task.getProcessInstanceId()).moveActivityIdTo(task.getTaskDefinitionKey(), "apply").changeState(); }else{ return BaseResponse.fail("500","撤回失败"); } return BaseResponse.success("撤回成功"); } /** * 获取流程变量 * @param taskId * @return */ @RequestMapping(value = "/variable", method = RequestMethod.GET) public BaseResponse<String> getVariables(@RequestParam( value = "task_id") String taskId){ Task task = taskService.createTaskQuery().taskId(taskId).singleResult(); Map<String, Object> variables = null; if(task != null){ variables = runtimeService.getVariables(task.getExecutionId()); } return BaseResponse.success(variables); } /** * 生成流程图 * * @param processId 任务ID */ @ApiOperation(value="查看对账单流程图状态", notes="查看对账单流程图状态") @RequestMapping(value = "processDiagram", method = RequestMethod.GET) public void genProcessDiagram(HttpServletResponse httpServletResponse, @RequestParam( value = "process_id") String processId) throws Exception { ProcessInstance pi = runtimeService.createProcessInstanceQuery().processInstanceId(processId).singleResult(); //流程走完的不显示图 if (pi == null) { return; } Task task = taskService.createTaskQuery().processInstanceId(pi.getId()).singleResult(); //使用流程实例ID,查询正在执行的执行对象表,返回流程实例对象 String InstanceId = task.getProcessInstanceId(); List<Execution> executions = runtimeService .createExecutionQuery() .processInstanceId(InstanceId) .list(); //得到正在执行的Activity的Id List<String> activityIds = new ArrayList<>(); List<String> flows = new ArrayList<>(); for (Execution exe : executions) { List<String> ids = runtimeService.getActiveActivityIds(exe.getId()); activityIds.addAll(ids); } //获取流程图 BpmnModel bpmnModel = repositoryService.getBpmnModel(pi.getProcessDefinitionId()); ProcessEngineConfiguration engconf = processEngine.getProcessEngineConfiguration(); ProcessDiagramGenerator diagramGenerator = engconf.getProcessDiagramGenerator(); InputStream in = diagramGenerator.generateDiagram(bpmnModel,"bmp", activityIds,flows,"宋体","宋体","宋体",null,1.0); OutputStream out = null; byte[] buf = new byte[1024]; int legth = 0; try { out = httpServletResponse.getOutputStream(); while ((legth = in.read(buf)) != -1) { out.write(buf, 0, legth); } } finally { if (in != null) { in.close(); } if (out != null) { out.close(); } } } } /** * 删除流程实例 * @param processId 流程实例ID * @return */ @RequestMapping(value = "/delete/process", method = RequestMethod.GET) public BaseResponse<String> deleteProcess(@RequestParam( value = "process_id") String processId){ runtimeService.deleteProcessInstance(processId, ""); return BaseResponse.success("删除流程实例成功"); } /** * 申领任务 * 其实申领的意思就是当在一个用户组中所有有这个权限的用户都可以同时看到这个待办信息, * 这个待办信息可以理解为公布出来的任务,需要有人去领取这个任务,那么一旦领取这个任务,其他有这个节点操作权限的用户就不会看到这个待办信息, * 因为已经被这个用户领取了 * @param taskId * @param user * @return */ @RequestMapping(value = "/claim", method = RequestMethod.GET) public BaseResponse<String> claim(@RequestParam( value = "task_id") String taskId,@RequestParam( value = "user") String user){ taskService.claim(taskId,user); return BaseResponse.success("流程申领"); } /** * 取消申领任务 * 一旦取消申领,那么有这个节点操作权限的用户在待办上又可以看到, * 申领和取消申领是一种锁定机制,使得多个用户在待办操作上不会出现执行同一个当前节点的任务 * @param taskId * @return */ @RequestMapping(value = "/unclaim", method = RequestMethod.GET) public BaseResponse<String> claim(@RequestParam( value = "task_id") String taskId){ taskService.unclaim(taskId); return BaseResponse.success("流程申领"); } /** * 获取流程的历史节点列表 * 获取的是这个流程实例走过的节点,当然也可以获取到开始节点、网关、线等信息,下面是只过滤了用户任务节点"userTask"的信息 * @param processId 流程ID * @return */ @RequestMapping(value = "/history/list", method = RequestMethod.GET) public BaseResponse<List<HistoricActivityInstance>> historyList(@RequestParam( value = "process_id") String processId){ List<HistoricActivityInstance> activities = historyService.createHistoricActivityInstanceQuery() .processInstanceId(processId).activityType("userTask").finished() .orderByHistoricActivityInstanceEndTime().desc().list(); return BaseResponse.success(activities); }controller里的接口如下:
流程部署流程发起流程待办流程审核(执行下一步节点)流程撤回(节点任意跳转)获取流程变量流程图展示删除流程实例流程申领取消申领任务查询流程活动历史这些接口支持普通的业务场景,有一些比较复杂的应用场景,光靠flowable的接口无法实现,那就需要去探索flowable特有的60张表,只有摸清楚框架里的表是干嘛的,你才能对这个框架了解透彻,demo我会挂在博客上。可以下载下来跑一下项目。
待办列表api的复杂使用
List<Task> tasks = taskService.createTaskQuery().taskCandidateGroupIn(roleCodes).orderByTaskCreateTime().desc().list();以上这段代码只是查询过滤出多个用户组的待办信息,并根据执行任务创建时间倒序排序。 在实际项目中,一个用户查询到的待办信息很多,假设有1000条待办信息,那么我们在界面上不能全部展示出来,而是需要分页,而且还需要加上一些过滤条件方便在数据量多的情况下过滤出想要及时处理的数据。可以参照下面的写法:
//创建一个查询任务 TaskQuery taskQuery = taskService.createTaskQuery(); //后面可以接上各种查询条件,因为是链式调用 List<String> roleCodes = new ArrayList<>(); roleCodes.add("角色code1"); roleCodes.add("角色code2"); //查询角色code集合 taskQuery = taskQuery.taskCandidateGroupIn(roleCodes); String startTime = "2020-06-10 11:48:10"; String endTime = "2020-06-20 23:00:00"; //根据任务的起止时间查询 taskQuery = taskQuery.processVariableValueLessThan("create_time", endTime).processVariableValueGreaterThan("create_time", startTime); //根据业务类型查询 String businessType = "finance"; //财务审批类型 taskQuery = taskQuery.processVariableValueEquals("business_type", businessType); //根据任务时间排序并分页, 分页10条 int pageIndex = 0; int pageSize = 10; List<Task> tasks = taskQuery.orderByTaskCreateTime().desc().listPage(pageIndex, pageSize); //根据以上条件查询出来的结果计算总条数 long total = taskQuery.count();flowable数据表详解 我们在接入flowable框架的时候,运行项目如果数据库没有flowable自带的表会自动创建出来,总共60张表,每张表都有其作用,下面会具体介绍表的用途。
数据表的命名规则 ACT_RE_* ’RE’表示repository(存储)。RepositoryService接口操作的表。带此前缀的表包含的是静态信息,如,流程定义,流程的资源(图片,规则等)。 ACT_RU_* ’RU’表示runtime。这是运行时的表存储着流程变量,用户任务,变量,职责(job)等运行时的数据。flowable只存储实例执行期间的运行时数据,当流程实例结束时,将删除这些记录。这就保证了这些运行时的表小且快。 ACT_ID_* ’ID’表示identity(组织机构)。这些表包含标识的信息,如用户,用户组,等等。 ACT_HI_* ’HI’表示history。就是这些表包含着历史的相关数据,如结束的流程实例,变量,任务,等等。 ACT_GE_* 普通数据,各种情况都使用的数据。数据表的介绍 表分类表名描述运行实例表(10)ACT_RU_DEADLETTER_JOB正在运行的任务表ACT_RU_EVENT_SUBSCR运行时事件ACT_RU_EXECUTION *运行时流程执行实例ACT_RU_HISTORY_JOB历史作业表ACT_RU_IDENTITYLINK运行时用户关系信息ACT_RU_JOB运行时作业表ACT_RU_SUSPENDED_JOB暂停作业表ACT_RU_TASK *运行时任务表ACT_RU_TIMER_JOB定时作业表ACT_RU_VARIABLE *运行时变量表流程历史记录(8)ACT_HI_ACTINST *历史的流程实例,记录流转的流程组件(节点、网关)ACT_HI_ATTACHMENT历史的流程附件ACT_HI_TASKINST *历史的任务实例ACT_HI_VARINST *历史的流程运行中的变量信息ACT_HI_COMMENT 历史的流程操作信息记录ACT_HI_DETAIL *历史的流程变量记录表ACT_HI_IDENTITYLINK历史的流程身份认证记录ACT_HI_PROCINST 历史的流程定义实例记录资源表(2)ACT_GE_BYTEARRAY *存储部署的流程文件ACT_GE_PROPERTY *记录系统相关属性身份认证表(9)ACT_ID_BYTEARRAY 二进制数据表ACT_ID_GROUP 用户组信息表ACT_ID_INFO 用户信息详情表ACT_ID_MEMBERSHIP 人与组关系表ACT_ID_PRIV 权限表ACT_ID_PROPERTY 属性表ACT_ID_TOKEN 系统登录日志表ACT_ID_USER 用户表ACT_ID_PRIV_MAPPING 私有人与组关系表 流程定义表(3)ACT_RE_DEPLOYMENT *部署单元信息 (记录部署时间)ACT_RE_MODEL模型信息ACT_RE_PROCDEF *已部署的流程定义(记录部署的资源路径)CMMN流程引擎数据表(11)ACT_CMMN_CASEDEFACT_CMMN_DATABASECHANGELOGLOCKACT_CMMN_DATABASECHANGELOG ACT_CMMN_DEPLOYMENTACT_CMMN_DEPLOYMENT_RESOURCEACT_CMMN_HI_CASE_INST ACT_CMMN_HI_MIL_INSTACT_CMMN_RU_CASE_INSTACT_CMMN_RU_MIL_INST ACT_CMMN_RU_PLAN_ITEM_INSTACT_CMMN_RU_SENTRY_PART_INSTDMN流程引擎数据表(6)ACT_DMN_DATABASECHANGELOGLOCKACT_DMN_DATABASECHANGELOGACT_DMN_DECISION_TABLEACT_DMN_DEPLOYMENTACT_DMN_DEPLOYMENT_RESOURCE ACT_DMN_HI_DECISION_EXECUTION 表单引擎数据表(6)ACT_FO_DATABASECHANGELOGACT_FO_FORM_DEFINITIONACT_FO_FORM_DEPLOYMENTACT_FO_FORM_INSTANCE ACT_FO_FORM_RESOURCE ACT_FO_DATABASECHANGELOGLOCK内容引擎数据表(3)ACT_CO_DATABASECHANGELOGLOCKACT_CO_DATABASECHANGELOGACT_CO_CONTENT_ITEM其他表(2)ACT_EVT_LOG事件日志记录ACT_PROCDEF_INFO流程定义信息Flowable引擎
流程引擎 (流程部署、待办查询、历史任务、身份认证、流程展示) *CMMN引擎(支持异步服务、手动激活规则、自动完成、用户事件监听器)DMN引擎(决策引擎, 如排他网关设置条件可以进行识别并决定走向) *IDM身份识别引擎 (自带一套身份认证体系)表单引擎 (自带一套组件编辑器,可在界面画表单,实现流程流转)内容引擎 (在mybatis上封装获取表元数据、流程模型等)自定义sql 如果你的业务对于flowable的api接口还不满足,那么你可以写一些自定义的sql。如下代码示例:
//查询待办信息 public List<Task> QueryTask(String businessType, List<String> businessIds) { if(StringUtils.isEmpty(businessType) || ListUtil.isEmpty(businessIds)) { logger.warn("QueryTaskRecord请求参数为空. businessType:"+businessType+" businessIds:"+businessIds); return new ArrayList<>(); } StringBuffer businessIdsSb = new StringBuffer(); for(int i = 0;i < businessIds.size(); i++) { String businessId = businessIds.get(i); if (i == businessIds.size() - 1) { businessIdsSb.append("'" + businessId + "'"); } else { businessIdsSb.append("'" + businessId + "',"); } } StringBuffer sb = new StringBuffer(); sb.append("SELECT RES.*FROM ACT_RU_TASK RES " + "LEFT JOIN ACT_RU_VARIABLE VAR ON RES.`PROC_INST_ID_` = VAR.`PROC_INST_ID_`" + "WHERE RES.`PROC_INST_ID_` IN (" + "SELECT `PROC_INST_ID_` FROM ACT_RU_VARIABLE " + "WHERE `PROC_INST_ID_` IN (" + "SELECT `PROC_INST_ID_` FROM ACT_RU_VARIABLE " + "WHERE `NAME_` = 'business_id' AND `TEXT_` IN (" + businessIdsSb.toString() + ")" + ") AND `NAME_` = 'business_type' AND `TEXT_` = '" + businessType + "'" + ") group by VAR.`PROC_INST_ID_`" + "order by RES.`CREATE_TIME_` desc"); //通过传入自定义sql,返回的结果必须是ACT_RU_TASK的结果集,因为Task对应的表是ACT_RU_TASK,传入 List<Task> tasks = taskService.createNativeTaskQuery().sql(sb.toString()).parameter("business_type", businessType).list(); return tasks; }当然你也可以使用mybatis等orm框架去实现自定义sql,但是使用TaskService和HistoryService服务提供自定义sql的api是有一定区别。这点需要注意,flowable框架的自定义sql的api在执行方法的时候有事务的隔离性,它允许节点执行跳转到下一个节点操作的时候查询待办可以到下一节点的待办信息,而使用orm框架则无法做到在同一方法里执行节点跳转再去查询下一节点的待办信息,因为此时的下一节点的数据还没存入到db中所以查询不到。 所以,我认为flowable框架是对自定义sql的api做了一些处理,做了事务的隔离性读取到未提交的数据,这里推荐使用它的api,虽然字符串拼接的sql属于硬编码,也不太雅观。