feat: 2.6.0

1. 新增组关闭 重试任务、定时任务、工作流任务不支持手动触发
2. 修复任务完成事务未提交,工作流完成异常
This commit is contained in:
byteblogs168 2024-01-17 09:41:23 +08:00
parent c6e30e88cd
commit 209aa20e3c
4 changed files with 85 additions and 39 deletions

View File

@ -18,6 +18,8 @@ import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.transaction.support.TransactionSynchronization;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import org.springframework.util.CollectionUtils; import org.springframework.util.CollectionUtils;
import java.time.LocalDateTime; import java.time.LocalDateTime;
@ -43,7 +45,7 @@ public class JobTaskBatchHandler {
List<JobTask> jobTasks = jobTaskMapper.selectList( List<JobTask> jobTasks = jobTaskMapper.selectList(
new LambdaQueryWrapper<JobTask>() new LambdaQueryWrapper<JobTask>()
.select(JobTask::getTaskStatus, JobTask::getResultMessage) .select(JobTask::getTaskStatus, JobTask::getResultMessage)
.eq(JobTask::getTaskBatchId, completeJobBatchDTO.getTaskBatchId())); .eq(JobTask::getTaskBatchId, completeJobBatchDTO.getTaskBatchId()));
JobTaskBatch jobTaskBatch = new JobTaskBatch(); JobTaskBatch jobTaskBatch = new JobTaskBatch();
@ -76,27 +78,34 @@ public class JobTaskBatchHandler {
jobTaskBatch.setOperationReason(completeJobBatchDTO.getJobOperationReason()); jobTaskBatch.setOperationReason(completeJobBatchDTO.getJobOperationReason());
} }
if (Objects.nonNull(completeJobBatchDTO.getWorkflowNodeId()) && Objects.nonNull(completeJobBatchDTO.getWorkflowTaskBatchId())) { if (Objects.nonNull(completeJobBatchDTO.getWorkflowNodeId()) && Objects.nonNull(
// 若是工作流则开启下一个任务 completeJobBatchDTO.getWorkflowTaskBatchId())) {
try { TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
WorkflowNodeTaskExecuteDTO taskExecuteDTO = new WorkflowNodeTaskExecuteDTO(); @Override
taskExecuteDTO.setWorkflowTaskBatchId(completeJobBatchDTO.getWorkflowTaskBatchId()); public void afterCompletion(int status) {
taskExecuteDTO.setTaskExecutorScene(JobTaskExecutorSceneEnum.AUTO_WORKFLOW.getType()); // 若是工作流则开启下一个任务
taskExecuteDTO.setParentId(completeJobBatchDTO.getWorkflowNodeId()); try {
// 这里取第一个的任务执行结果 WorkflowNodeTaskExecuteDTO taskExecuteDTO = new WorkflowNodeTaskExecuteDTO();
taskExecuteDTO.setTaskBatchId(completeJobBatchDTO.getTaskBatchId()); taskExecuteDTO.setWorkflowTaskBatchId(completeJobBatchDTO.getWorkflowTaskBatchId());
ActorRef actorRef = ActorGenerator.workflowTaskExecutorActor(); taskExecuteDTO.setTaskExecutorScene(JobTaskExecutorSceneEnum.AUTO_WORKFLOW.getType());
actorRef.tell(taskExecuteDTO, actorRef); taskExecuteDTO.setParentId(completeJobBatchDTO.getWorkflowNodeId());
} catch (Exception e) { // 这里取第一个的任务执行结果
log.error("任务调度执行失败", e); taskExecuteDTO.setTaskBatchId(completeJobBatchDTO.getTaskBatchId());
} ActorRef actorRef = ActorGenerator.workflowTaskExecutorActor();
actorRef.tell(taskExecuteDTO, actorRef);
} catch (Exception e) {
log.error("任务调度执行失败", e);
}
}
});
} }
jobTaskBatch.setUpdateDt(LocalDateTime.now()); jobTaskBatch.setUpdateDt(LocalDateTime.now());
return 1 == jobTaskBatchMapper.update(jobTaskBatch, return 1 == jobTaskBatchMapper.update(jobTaskBatch,
new LambdaUpdateWrapper<JobTaskBatch>() new LambdaUpdateWrapper<JobTaskBatch>()
.eq(JobTaskBatch::getId, completeJobBatchDTO.getTaskBatchId()) .eq(JobTaskBatch::getId, completeJobBatchDTO.getTaskBatchId())
.in(JobTaskBatch::getTaskBatchStatus, JobTaskStatusEnum.NOT_COMPLETE) .in(JobTaskBatch::getTaskBatchStatus, JobTaskStatusEnum.NOT_COMPLETE)
); );
} }

View File

@ -26,10 +26,13 @@ import com.aizuda.easy.retry.server.web.service.JobService;
import com.aizuda.easy.retry.server.web.service.convert.JobConverter; import com.aizuda.easy.retry.server.web.service.convert.JobConverter;
import com.aizuda.easy.retry.server.web.service.convert.JobResponseVOConverter; import com.aizuda.easy.retry.server.web.service.convert.JobResponseVOConverter;
import com.aizuda.easy.retry.server.web.util.UserSessionUtils; import com.aizuda.easy.retry.server.web.util.UserSessionUtils;
import com.aizuda.easy.retry.template.datasource.access.AccessTemplate;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper; import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper;
import com.aizuda.easy.retry.template.datasource.persistence.po.GroupConfig;
import com.aizuda.easy.retry.template.datasource.persistence.po.Job; import com.aizuda.easy.retry.template.datasource.persistence.po.Job;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO; import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Qualifier;
@ -47,16 +50,14 @@ import java.util.Optional;
*/ */
@Service @Service
@Slf4j @Slf4j
@RequiredArgsConstructor
public class JobServiceImpl implements JobService { public class JobServiceImpl implements JobService {
@Autowired private final SystemProperties systemProperties;
private SystemProperties systemProperties; private final JobMapper jobMapper;
@Autowired
private JobMapper jobMapper;
@Autowired
@Qualifier("terminalJobPrepareHandler")
@Lazy @Lazy
private JobPrePareHandler jobPrePareHandler; private final JobPrePareHandler terminalJobPrepareHandler;
private final AccessTemplate accessTemplate;
@Override @Override
public PageResult<List<JobResponseVO>> getJobPage(JobQueryVO queryVO) { public PageResult<List<JobResponseVO>> getJobPage(JobQueryVO queryVO) {
@ -108,7 +109,7 @@ public class JobServiceImpl implements JobService {
public List<JobResponseVO> getJobNameList(String keywords, Long jobId) { public List<JobResponseVO> getJobNameList(String keywords, Long jobId) {
LambdaQueryWrapper<Job> queryWrapper = new LambdaQueryWrapper<Job>() LambdaQueryWrapper<Job> queryWrapper = new LambdaQueryWrapper<Job>()
.select(Job::getId, Job::getJobName); .select(Job::getId, Job::getJobName);
if (StrUtil.isNotBlank(keywords)) { if (StrUtil.isNotBlank(keywords)) {
queryWrapper.like(Job::getJobName, keywords.trim() + "%"); queryWrapper.like(Job::getJobName, keywords.trim() + "%");
} }
@ -128,7 +129,7 @@ public class JobServiceImpl implements JobService {
// 判断常驻任务 // 判断常驻任务
Job job = updateJobResident(jobRequestVO); Job job = updateJobResident(jobRequestVO);
job.setBucketIndex(HashUtil.bkdrHash(jobRequestVO.getGroupName() + jobRequestVO.getJobName()) job.setBucketIndex(HashUtil.bkdrHash(jobRequestVO.getGroupName() + jobRequestVO.getJobName())
% systemProperties.getBucketTotal()); % systemProperties.getBucketTotal());
job.setNextTriggerAt(calculateNextTriggerAt(jobRequestVO, DateUtils.toNowMilli())); job.setNextTriggerAt(calculateNextTriggerAt(jobRequestVO, DateUtils.toNowMilli()));
job.setNamespaceId(UserSessionUtils.currentUserSession().getNamespaceId()); job.setNamespaceId(UserSessionUtils.currentUserSession().getNamespaceId());
return 1 == jobMapper.insert(job); return 1 == jobMapper.insert(job);
@ -149,18 +150,19 @@ public class JobServiceImpl implements JobService {
if (Objects.equals(jobRequestVO.getTriggerType(), TriggerTypeEnum.WORKFLOW.getType())) { if (Objects.equals(jobRequestVO.getTriggerType(), TriggerTypeEnum.WORKFLOW.getType())) {
job.setNextTriggerAt(0L); job.setNextTriggerAt(0L);
// 非常驻任务 > 非常驻任务 // 非常驻任务 > 非常驻任务
} else if (Objects.equals(job.getResident(), StatusEnum.NO.getStatus()) && Objects.equals(updateJob.getResident(), } else if (Objects.equals(job.getResident(), StatusEnum.NO.getStatus()) && Objects.equals(
StatusEnum.NO.getStatus())) { updateJob.getResident(),
StatusEnum.NO.getStatus())) {
updateJob.setNextTriggerAt(calculateNextTriggerAt(jobRequestVO, DateUtils.toNowMilli())); updateJob.setNextTriggerAt(calculateNextTriggerAt(jobRequestVO, DateUtils.toNowMilli()));
} else if (Objects.equals(job.getResident(), StatusEnum.YES.getStatus()) && Objects.equals( } else if (Objects.equals(job.getResident(), StatusEnum.YES.getStatus()) && Objects.equals(
updateJob.getResident(), StatusEnum.NO.getStatus())) { updateJob.getResident(), StatusEnum.NO.getStatus())) {
// 常驻任务的触发时间 // 常驻任务的触发时间
long time = Optional.ofNullable(ResidentTaskCache.get(jobRequestVO.getId())) long time = Optional.ofNullable(ResidentTaskCache.get(jobRequestVO.getId()))
.orElse(DateUtils.toNowMilli()); .orElse(DateUtils.toNowMilli());
updateJob.setNextTriggerAt(calculateNextTriggerAt(jobRequestVO, time)); updateJob.setNextTriggerAt(calculateNextTriggerAt(jobRequestVO, time));
// 老的是不是常驻任务 新的是常驻任务 需要使用当前时间计算下次触发时间 // 老的是不是常驻任务 新的是常驻任务 需要使用当前时间计算下次触发时间
} else if (Objects.equals(job.getResident(), StatusEnum.NO.getStatus()) && Objects.equals( } else if (Objects.equals(job.getResident(), StatusEnum.NO.getStatus()) && Objects.equals(
updateJob.getResident(), StatusEnum.YES.getStatus())) { updateJob.getResident(), StatusEnum.YES.getStatus())) {
updateJob.setNextTriggerAt(DateUtils.toNowMilli()); updateJob.setNextTriggerAt(DateUtils.toNowMilli());
} }
@ -227,12 +229,19 @@ public class JobServiceImpl implements JobService {
Job job = jobMapper.selectById(jobId); Job job = jobMapper.selectById(jobId);
Assert.notNull(job, () -> new EasyRetryServerException("job can not be null.")); Assert.notNull(job, () -> new EasyRetryServerException("job can not be null."));
long count = accessTemplate.getGroupConfigAccess().count(new LambdaQueryWrapper<GroupConfig>()
.eq(GroupConfig::getGroupName, job.getGroupName())
.eq(GroupConfig::getNamespaceId, job.getNamespaceId())
.eq(GroupConfig::getGroupStatus, StatusEnum.YES.getStatus())
);
Assert.isTrue(count > 0, () -> new EasyRetryServerException("组:[{}]已经关闭,不支持手动执行.", job.getGroupName()));
JobTaskPrepareDTO jobTaskPrepare = JobTaskConverter.INSTANCE.toJobTaskPrepare(job); JobTaskPrepareDTO jobTaskPrepare = JobTaskConverter.INSTANCE.toJobTaskPrepare(job);
// 设置now表示立即执行 // 设置now表示立即执行
jobTaskPrepare.setNextTriggerAt(DateUtils.toNowMilli()); jobTaskPrepare.setNextTriggerAt(DateUtils.toNowMilli());
jobTaskPrepare.setTaskExecutorScene(JobTaskExecutorSceneEnum.MANUAL_JOB.getType()); jobTaskPrepare.setTaskExecutorScene(JobTaskExecutorSceneEnum.MANUAL_JOB.getType());
// 创建批次 // 创建批次
jobPrePareHandler.handler(jobTaskPrepare); terminalJobPrepareHandler.handler(jobTaskPrepare);
return Boolean.TRUE; return Boolean.TRUE;
} }
@ -241,10 +250,10 @@ public class JobServiceImpl implements JobService {
public List<JobResponseVO> getJobList(String groupName) { public List<JobResponseVO> getJobList(String groupName) {
String namespaceId = UserSessionUtils.currentUserSession().getNamespaceId(); String namespaceId = UserSessionUtils.currentUserSession().getNamespaceId();
List<Job> jobs = jobMapper.selectList(new LambdaQueryWrapper<Job>() List<Job> jobs = jobMapper.selectList(new LambdaQueryWrapper<Job>()
.select(Job::getId, Job::getJobName) .select(Job::getId, Job::getJobName)
.eq(Job::getNamespaceId, namespaceId) .eq(Job::getNamespaceId, namespaceId)
.eq(Job::getGroupName, groupName) .eq(Job::getGroupName, groupName)
.orderByDesc(Job::getCreateDt)); .orderByDesc(Job::getCreateDt));
List<JobResponseVO> jobResponseList = JobResponseVOConverter.INSTANCE.toJobResponseVOs(jobs); List<JobResponseVO> jobResponseList = JobResponseVOConverter.INSTANCE.toJobResponseVOs(jobs);
return jobResponseList; return jobResponseList;
} }

View File

@ -4,6 +4,7 @@ import cn.hutool.core.lang.Assert;
import cn.hutool.core.util.StrUtil; import cn.hutool.core.util.StrUtil;
import com.aizuda.easy.retry.client.model.GenerateRetryIdempotentIdDTO; import com.aizuda.easy.retry.client.model.GenerateRetryIdempotentIdDTO;
import com.aizuda.easy.retry.common.core.enums.RetryStatusEnum; import com.aizuda.easy.retry.common.core.enums.RetryStatusEnum;
import com.aizuda.easy.retry.common.core.enums.StatusEnum;
import com.aizuda.easy.retry.common.core.model.Result; import com.aizuda.easy.retry.common.core.model.Result;
import com.aizuda.easy.retry.common.core.util.JsonUtil; import com.aizuda.easy.retry.common.core.util.JsonUtil;
import com.aizuda.easy.retry.server.common.WaitStrategy; import com.aizuda.easy.retry.server.common.WaitStrategy;
@ -41,6 +42,7 @@ import com.aizuda.easy.retry.template.datasource.access.AccessTemplate;
import com.aizuda.easy.retry.template.datasource.access.TaskAccess; import com.aizuda.easy.retry.template.datasource.access.TaskAccess;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.RetryTaskLogMapper; import com.aizuda.easy.retry.template.datasource.persistence.mapper.RetryTaskLogMapper;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.RetryTaskLogMessageMapper; import com.aizuda.easy.retry.template.datasource.persistence.mapper.RetryTaskLogMessageMapper;
import com.aizuda.easy.retry.template.datasource.persistence.po.GroupConfig;
import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask; import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask;
import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTaskLog; import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTaskLog;
import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTaskLogMessage; import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTaskLogMessage;
@ -349,10 +351,19 @@ public class RetryTaskServiceImpl implements RetryTaskService {
@Override @Override
public boolean manualTriggerRetryTask(ManualTriggerTaskRequestVO requestVO) { public boolean manualTriggerRetryTask(ManualTriggerTaskRequestVO requestVO) {
String namespaceId = UserSessionUtils.currentUserSession().getNamespaceId();
long count = accessTemplate.getGroupConfigAccess().count(new LambdaQueryWrapper<GroupConfig>()
.eq(GroupConfig::getGroupName, requestVO.getGroupName())
.eq(GroupConfig::getNamespaceId, namespaceId)
.eq(GroupConfig::getGroupStatus, StatusEnum.YES.getStatus())
);
Assert.isTrue(count > 0, () -> new EasyRetryServerException("组:[{}]已经关闭,不支持手动执行.", requestVO.getGroupName()));
List<String> uniqueIds = requestVO.getUniqueIds(); List<String> uniqueIds = requestVO.getUniqueIds();
String namespaceId = UserSessionUtils.currentUserSession().getNamespaceId();
List<RetryTask> list = accessTemplate.getRetryTaskAccess().list( List<RetryTask> list = accessTemplate.getRetryTaskAccess().list(
requestVO.getGroupName(), namespaceId, requestVO.getGroupName(), namespaceId,
new LambdaQueryWrapper<RetryTask>() new LambdaQueryWrapper<RetryTask>()
@ -377,6 +388,13 @@ public class RetryTaskServiceImpl implements RetryTaskService {
List<String> uniqueIds = requestVO.getUniqueIds(); List<String> uniqueIds = requestVO.getUniqueIds();
String namespaceId = UserSessionUtils.currentUserSession().getNamespaceId(); String namespaceId = UserSessionUtils.currentUserSession().getNamespaceId();
long count = accessTemplate.getGroupConfigAccess().count(new LambdaQueryWrapper<GroupConfig>()
.eq(GroupConfig::getGroupName, requestVO.getGroupName())
.eq(GroupConfig::getNamespaceId, namespaceId)
.eq(GroupConfig::getGroupStatus, StatusEnum.YES.getStatus())
);
Assert.isTrue(count > 0, () -> new EasyRetryServerException("组:[{}]已经关闭,不支持手动执行.", requestVO.getGroupName()));
List<RetryTask> list = accessTemplate.getRetryTaskAccess().list(requestVO.getGroupName(), namespaceId, List<RetryTask> list = accessTemplate.getRetryTaskAccess().list(requestVO.getGroupName(), namespaceId,
new LambdaQueryWrapper<RetryTask>() new LambdaQueryWrapper<RetryTask>()

View File

@ -31,9 +31,11 @@ import com.aizuda.easy.retry.server.web.service.WorkflowService;
import com.aizuda.easy.retry.server.web.service.convert.WorkflowConverter; import com.aizuda.easy.retry.server.web.service.convert.WorkflowConverter;
import com.aizuda.easy.retry.server.web.service.handler.WorkflowHandler; import com.aizuda.easy.retry.server.web.service.handler.WorkflowHandler;
import com.aizuda.easy.retry.server.web.util.UserSessionUtils; import com.aizuda.easy.retry.server.web.util.UserSessionUtils;
import com.aizuda.easy.retry.template.datasource.access.AccessTemplate;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper; import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.WorkflowMapper; import com.aizuda.easy.retry.template.datasource.persistence.mapper.WorkflowMapper;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.WorkflowNodeMapper; import com.aizuda.easy.retry.template.datasource.persistence.mapper.WorkflowNodeMapper;
import com.aizuda.easy.retry.template.datasource.persistence.po.GroupConfig;
import com.aizuda.easy.retry.template.datasource.persistence.po.Job; import com.aizuda.easy.retry.template.datasource.persistence.po.Job;
import com.aizuda.easy.retry.template.datasource.persistence.po.Workflow; import com.aizuda.easy.retry.template.datasource.persistence.po.Workflow;
import com.aizuda.easy.retry.template.datasource.persistence.po.WorkflowNode; import com.aizuda.easy.retry.template.datasource.persistence.po.WorkflowNode;
@ -70,7 +72,7 @@ public class WorkflowServiceImpl implements WorkflowService {
@Lazy @Lazy
private final WorkflowPrePareHandler terminalWorkflowPrepareHandler; private final WorkflowPrePareHandler terminalWorkflowPrepareHandler;
private final JobMapper jobMapper; private final JobMapper jobMapper;
private final AccessTemplate accessTemplate;
@Override @Override
@Transactional @Transactional
@ -280,6 +282,14 @@ public class WorkflowServiceImpl implements WorkflowService {
Workflow workflow = workflowMapper.selectById(id); Workflow workflow = workflowMapper.selectById(id);
Assert.notNull(workflow, () -> new EasyRetryServerException("workflow can not be null.")); Assert.notNull(workflow, () -> new EasyRetryServerException("workflow can not be null."));
long count = accessTemplate.getGroupConfigAccess().count(new LambdaQueryWrapper<GroupConfig>()
.eq(GroupConfig::getGroupName, workflow.getGroupName())
.eq(GroupConfig::getNamespaceId, workflow.getNamespaceId())
.eq(GroupConfig::getGroupStatus, StatusEnum.YES.getStatus())
);
Assert.isTrue(count > 0, () -> new EasyRetryServerException("组:[{}]已经关闭,不支持手动执行.", workflow.getGroupName()));
WorkflowTaskPrepareDTO prepareDTO = WorkflowTaskConverter.INSTANCE.toWorkflowTaskPrepareDTO(workflow); WorkflowTaskPrepareDTO prepareDTO = WorkflowTaskConverter.INSTANCE.toWorkflowTaskPrepareDTO(workflow);
// 设置now表示立即执行 // 设置now表示立即执行
prepareDTO.setNextTriggerAt(DateUtils.toNowMilli()); prepareDTO.setNextTriggerAt(DateUtils.toNowMilli());