任务提交 taskCommit

1. 方法入口

  • 注解说明
    @PostMapping("/task/commit"):HTTP POST 请求映射,接收任务提交请求。
    @Validated(Insert.class):基于分组校验规则对 RpaCommitDTO 进行参数校验。
    @RequestBody:绑定请求体中的 JSON 数据到 RpaCommitDTO 对象。
  • 参数
    • rpaCommitDTO:任务提交的请求参数(如应用编码、提交人、文件地址等)。
    • httpServletRequest:用于获取请求头中的 appKey
  • 返回值AjaxResult 统一响应结构(包含任务ID或错误信息)。

2. 动态设置 AppCode

java
reSetAppCode(rpaCommitDTO);
  • 动作:根据策略表动态修改 rpaCommitDTO 中的 appCode
  • 目的:实现灵活的业务路由(例如灰度发布或环境切换)。

3. 特殊业务路由

java
// 长春1.0特殊处理
if (AppCodeEnum.hasAppCode(rpaCommitDTO.getAppCode())) {
return sbsbVersion1Commit(rpaCommitDTO, httpServletRequest);
}

// 社保士兵任务处理
if (StrUtil.startWith(rpaCommitDTO.getAppCode(), APP_CODE_SBSB_PRE)) {
return sbSTaskCommit(rpaCommitDTO, httpServletRequest);
}
  • 路由逻辑
    1. 长春1.0:若 appCode 匹配预设枚举值,走独立的老版本逻辑。
    2. 社保士兵任务:若 appCode 以特定前缀开头,执行专用分支。
  • 特点:通过 appCode 实现业务解耦,分离新旧版本逻辑。

4. 通用流程处理

4.1 查询应用信息
java
RpaApp rpaApp = rpaAppMapper.selectByAppCode(rpaCommitDTO.getAppCode());
if (Objects.isNull(rpaApp)) {
return AjaxResult.error("未查询到应用相关信息");
}
  • 动作:根据 appCode 查询 RpaApp 表,校验应用是否存在。
  • 异常处理:应用不存在时直接返回错误。

4.2 验证缴纳账号
java
Integer accountId = rpaPaymentAccountMapper.selectAccountIdByNameAndAppId(
rpaCommitDTO.getAccountName(),
rpaApp.getId()
);
if (Objects.isNull(accountId)) {
return AjaxResult.error("该应用缴纳账号不存在");
}
  • 动作:根据账号名称和应用ID查询缴纳账号ID。
  • 目的:确保任务绑定的账号有效。

4.3 构建任务业务对象
java
RpaTaskBO rpaTaskBO = new RpaTaskBO();
rpaTaskBO.setAppendix(rpaCommitDTO.getFileUrl()); // 附件地址
rpaTaskBO.setAppCode(rpaCommitDTO.getAppCode()); // 应用编码
rpaTaskBO.setSubmitter(rpaCommitDTO.getSubmitter()); // 提交人
rpaTaskBO.setField1(accountId.toString()); // 缴纳账号ID
rpaTaskBO.setIsAuth(RpaTaskConstant.TASK_AUTH_NOT_NEED); // 无需权限校验
  • 目的:将 DTO 转换为业务对象,传递策略执行所需参数。

4.4 确定运行节点
java
// 1. 优先查询指定节点关系(按应用+账号)
RpaAppNodeRelation rpaAppNodeRelation = rpaAppNodeRelationMapper
.judgeAppointNodeRelation(rpaCommitDTO.getAppCode(), accountId);

// 2. 若未找到,查询默认节点关系(按应用)
if (Objects.isNull(rpaAppNodeRelation)) {
rpaAppNodeRelation = rpaAppNodeRelationMapper
.selectDefaultNodeByAppCode(rpaCommitDTO.getAppCode());
}

// 3. 最终校验节点是否存在
if (Objects.isNull(rpaAppNodeRelation)) {
return AjaxResult.error("该应用未绑定节点!");
}
  • 策略
    • 高优先级:按应用和账号精确绑定节点。
    • 低优先级:按应用绑定默认节点。
  • 目的:动态分配任务执行节点,支持多租户隔离。

4.5 策略模式生成任务
java
RpaTask rpaTask = BizStrategyFactory
.getBizStrategy(rpaApp.getBusinessStrategy())
.execute(rpaTaskBO);
  • 动作:通过工厂模式根据 businessStrategy 字段选择具体策略类,生成任务实体。
  • 扩展性:支持不同业务类型(如社保增减员、税务申报)的任务差异化处理。

4.6 任务持久化
java
// 设置任务基础属性
rpaTask.setTransactionNo(snowflake.nextId()); // 雪花算法生成唯一ID
rpaTask.setSubmitter(rpaTaskBO.getSubmitter());
rpaTask.setSubmitTime(new Date());
rpaTask.setStatus(RpaTaskConstant.TASK_STATUS_WAIT_EXECUTE); // 初始状态:等待执行
rpaTask.setCreateBy(rpaTaskBO.getSubmitter());
rpaTask.setUpdateBy(rpaTaskBO.getSubmitter());

// 设置 appKey(从请求头获取)
String appKey = httpServletRequest.getHeader(RpaTaskConstant.TASK_APP_KEY_HEADER);
rpaTask.setAppKey(appKey);

// 插入数据库
rpaTaskMapper.insertRpaTask(rpaTask);
  • 关键字段
    • transactionNo:全局唯一任务流水号。
    • appKey:标识调用方身份,用于权限控制。

4.7 发送超时检查消息
java
producer.send(TopicConstant.RPA_LONG_TIME_WAIT_TASK, rpaTask.getId(), 16);
  • 动作:向消息队列发送延迟消息(如 16 分钟)。
  • 目的:若任务长时间未执行,触发超时告警或补偿机制。

4.8 触发任务推送
java
SpringUtils.getBean(this.getClass()).pushTask(rpaTask.getId());
  • 实现:通过 Spring 工具类获取代理对象,调用异步方法 pushTask
  • 作用:立即触发任务调度,寻找可用客户端节点执行任务。

4.9 保存推送记录
java
Long recordId = saveSendRecord(
rpaCommitDTO,
appKey,
rpaTask.getId(),
TASK_TYPE_RPA,
"",
""
);
  • 隐含逻辑:记录任务派发日志,用于后续审计或对账。

4.10 更新统计信息
java
dataAnalyseService.updateDataAnalyseDailyServiceNum(rpaCommitDTO.getSubmitter());
  • 业务意义:统计每日服务人数,支持运营数据分析。

推送任务 pushTask (核心)

1. 方法入口

  • 注解说明
    @Async:异步执行,任务推送操作不会阻塞主线程。
    @Transactional:事务管理,确保数据库操作原子性(需注意异步线程的事务传播)。
  • 参数taskId 任务唯一标识。

2. 任务状态校验

java
RpaTaskVO rpaTaskVO = rpaTaskMapper.selectRpaTaskById(taskId);
if (rpaTaskVO.getStatus() == RpaTaskConstant.TASK_STATUS_WAIT_EXECUTE) {
// 后续逻辑...
}
  • 动作:通过任务ID查询任务详情,检查是否为「等待执行」状态。
  • 目的:仅处理待执行任务,避免重复调度。

3. 获取可用客户端节点

java
List<RpaNodeVO> clients = getExecuteTaskClientIds(rpaTaskVO);
  • 动作:调用 getExecuteTaskClientIds 方法,根据任务属性(如应用类型、牌照等)筛选符合条件的客户端节点。
  • 典型实现:可能基于任务绑定的运行节点配置或默认策略选择节点。

4. 客户端在线状态过滤

java
List<RpaNodeVO> onlineRpaNodes = clients.stream()
.filter(item -> item.getOnlineStatus() == RpaNodeConstant.NODE_ONLINE_STATUS_YES)
.collect(Collectors.toList());
  • 动作:过滤出在线状态的客户端节点。
  • 目的:确保任务仅派发到活跃节点,避免无效调度。

5. 选择空闲客户端

java
Optional<RpaNodeVO> first = onlineRpaNodes.stream()
.filter(item -> item.getBusyStatus() == RpaNodeConstant.NODE_BUSY_STATUS_NO)
.findFirst();
  • 动作:选择第一个「非繁忙」状态的在线节点。
  • 策略特点:简单但可能不均衡,适合低并发场景。

6. 客户端繁忙处理

java
if (!first.isPresent()) {
log.info("客户端繁忙,等待空闲后拉取任务:taskId:{}", taskId);
return;
}
  • 动作:若无可用节点,记录日志并退出,依赖客户端主动拉取任务。
  • 优势:避免频繁轮询,减少服务端压力。

7. 策略执行与任务派发

java
RpaApp rpaApp = rpaAppMapper.selectByAppCode(rpaTaskVO.getAppCode());
String executorParam = PrgmStrategyFactory.getPrgmStrategy(rpaApp.getAppStrategy()).execute(rpaTaskVO);
RpaTaskExecuteDTO dto = buildExecuteDTO(taskId, rpaNodeVO, executorParam);
boolean isOk = this.executeTask(dto);
  • 关键步骤
    1. 查询应用配置:获取任务关联的应用策略。
    2. 策略模式:通过工厂生成具体策略,动态生成执行参数(如URL、表单数据等)。
    3. 任务派发:调用 executeTask 方法(可能通过RPC或消息队列)触发客户端执行。

8. 任务状态更新

java
if (isOk) {
RpaTask task = buildTaskUpdate(taskId, rpaNodeVO);
int update = rpaTaskMapper.updateRpaTask(task);
if (update == 0) {
log.warn("任务状态更新失败: taskId={}", taskId);
}
}
  • 动作:若派发成功,更新任务状态为「调度中」并记录执行节点。
  • 事务性:依托 @Transactional 确保数据一致性。

9. 异常场景通知

java
// 客户端不在线
notifySubmitter(rpaTaskVO, RpaTaskConstant.TASK_NODE_OFFLINE_NOTICE);

// 节点不可用
notifySubmitter(rpaTaskVO, RpaTaskConstant.TASK_NODE_DISABLE_NOTICE);
  • 动作:通过邮件、站内信等方式通知提交者任务异常状态。
  • 用户体验:明确反馈问题原因,减少用户困惑。

流程总结

  1. 状态校验 → 2. 节点筛选 → 3. 在线检查 → 4. 空闲选择 → 5. 策略执行 → 6. 任务派发 → 7. 状态更新
  • 分支处理:涵盖客户端离线、繁忙、节点不可用等异常场景,保证流程健壮性。