feat:2.4.0

1. 完成任务的新增更新删除
This commit is contained in:
byteblogs168 2023-10-16 23:13:26 +08:00
parent cdd9748399
commit 01eefc2774
22 changed files with 111 additions and 78 deletions

View File

@ -220,7 +220,7 @@ CREATE TABLE `job` (
`group_name` varchar(64) NOT NULL COMMENT '组名称', `group_name` varchar(64) NOT NULL COMMENT '组名称',
`job_name` varchar(64) NOT NULL COMMENT '名称', `job_name` varchar(64) NOT NULL COMMENT '名称',
`args_str` text NOT NULL COMMENT '执行方法参数', `args_str` text NOT NULL COMMENT '执行方法参数',
`args_type` tinyint(4) NOT NULL DEFAULT '' COMMENT '参数类型 ', `args_type` tinyint(4) NOT NULL DEFAULT '1' COMMENT '参数类型 ',
`next_trigger_at` datetime NOT NULL COMMENT '下次触发时间', `next_trigger_at` datetime NOT NULL COMMENT '下次触发时间',
`job_status` tinyint(4) NOT NULL DEFAULT '1' COMMENT '重试状态 0、关闭、1、开启', `job_status` tinyint(4) NOT NULL DEFAULT '1' COMMENT '重试状态 0、关闭、1、开启',
`task_type` varchar(255) DEFAULT NULL COMMENT '任务类型 1、集群 2、广播 3、切片', `task_type` varchar(255) DEFAULT NULL COMMENT '任务类型 1、集群 2、广播 3、切片',

View File

@ -16,9 +16,9 @@ import lombok.Getter;
public enum JobOperationReasonEnum { public enum JobOperationReasonEnum {
NONE(0, StrUtil.EMPTY), NONE(0, StrUtil.EMPTY),
EXECUTE_TIMEOUT(1, "执行超时"), EXECUTE_TIMEOUT(1, "任务执行超时"),
NOT_CLIENT(2, "无客户端节点"), NOT_CLIENT(2, "无客户端节点"),
JOB_CLOSED(3, "任务已关闭"), JOB_CLOSED(3, "JOB已关闭"),
; ;
private final int reason; private final int reason;

View File

@ -30,9 +30,9 @@ public class JobBatchResponseDO {
private Long jobId; private Long jobId;
/** /**
* 任务状态 0失败 1成功 * 任务状态
*/ */
private Integer taskStatus; private Integer taskBatchStatus;
/** /**
* 创建时间 * 创建时间

View File

@ -10,6 +10,7 @@ import java.time.LocalDateTime;
import lombok.Getter; import lombok.Getter;
import lombok.Setter; import lombok.Setter;
/** /**
* <p> * <p>
* 调度任务 * 调度任务

View File

@ -7,7 +7,7 @@
<id column="id" property="id" /> <id column="id" property="id" />
<result column="group_name" property="groupName" /> <result column="group_name" property="groupName" />
<result column="job_id" property="jobId" /> <result column="job_id" property="jobId" />
<result column="task_status" property="taskStatus" /> <result column="task_batch_status" property="taskBatchStatus" />
<result column="create_dt" property="createDt" /> <result column="create_dt" property="createDt" />
<result column="update_dt" property="updateDt" /> <result column="update_dt" property="updateDt" />
<result column="deleted" property="deleted" /> <result column="deleted" property="deleted" />
@ -31,6 +31,7 @@
and job_name like #{queryDO.jobName} and job_name like #{queryDO.jobName}
</if> </if>
and a.deleted = 0 and a.deleted = 0
order by id desc
</where> </where>
</select> </select>

View File

@ -14,7 +14,7 @@ import java.util.List;
public class ClientInfoUtils { public class ClientInfoUtils {
public static String generate(RegisterNodeInfo registerNodeInfo) { public static String generate(RegisterNodeInfo registerNodeInfo) {
return registerNodeInfo + StrUtil.AT + registerNodeInfo.address(); return registerNodeInfo.getHostId() + StrUtil.AT + registerNodeInfo.address();
} }
public static String clientId(String clientInfo) { public static String clientId(String clientInfo) {

View File

@ -1,5 +1,6 @@
package com.aizuda.easy.retry.server.job.task.dto; package com.aizuda.easy.retry.server.job.task.dto;
import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum;
import lombok.Data; import lombok.Data;
/** /**
@ -26,4 +27,7 @@ public class JobExecutorResultDTO {
private Object result; private Object result;
private JobOperationReasonEnum jobOperationReasonEnum;
} }

View File

@ -48,7 +48,7 @@ public class RealJobExecutorDTO extends BaseDTO {
private Integer executorType; private Integer executorType;
private String executorName; private String executorInfo;
private String clientId; private String clientId;

View File

@ -60,7 +60,7 @@ public class JobExecutorResultActor extends AbstractActor {
()-> new EasyRetryServerException("更新任务实例失败")); ()-> new EasyRetryServerException("更新任务实例失败"));
// 更新批次上的状态 // 更新批次上的状态
jobTaskBatchHandler.complete(result.getTaskBatchId()); jobTaskBatchHandler.complete(result.getTaskBatchId(), result.getJobOperationReasonEnum());
} }
}); });

View File

@ -2,6 +2,7 @@ package com.aizuda.easy.retry.server.job.task.support.executor;
import akka.actor.ActorRef; import akka.actor.ActorRef;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator; import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.util.ClientInfoUtils;
import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter; import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter;
import com.aizuda.easy.retry.server.job.task.dto.RealJobExecutorDTO; import com.aizuda.easy.retry.server.job.task.dto.RealJobExecutorDTO;
import com.aizuda.easy.retry.server.job.task.enums.TaskTypeEnum; import com.aizuda.easy.retry.server.job.task.enums.TaskTypeEnum;
@ -32,7 +33,9 @@ public class ClusterJobExecutor extends AbstractJobExecutor {
// 调度客户端 // 调度客户端
List<JobTask> taskList = context.getTaskList(); List<JobTask> taskList = context.getTaskList();
RealJobExecutorDTO realJobExecutor = JobTaskConverter.INSTANCE.toRealJobExecutorDTO(context, taskList.get(0)); JobTask jobTask = taskList.get(0);
RealJobExecutorDTO realJobExecutor = JobTaskConverter.INSTANCE.toRealJobExecutorDTO(context, jobTask);
realJobExecutor.setClientId(ClientInfoUtils.clientId(jobTask.getClientInfo()));
ActorRef actorRef = ActorGenerator.jobRealTaskExecutorActor(); ActorRef actorRef = ActorGenerator.jobRealTaskExecutorActor();
actorRef.tell(realJobExecutor, actorRef); actorRef.tell(realJobExecutor, actorRef);

View File

@ -61,7 +61,7 @@ public class JobExecutorContext {
private Integer executorType; private Integer executorType;
private String executorName; private String executorInfo;
/** /**

View File

@ -2,6 +2,7 @@ package com.aizuda.easy.retry.server.job.task.support.executor;
import akka.actor.ActorRef; import akka.actor.ActorRef;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator; import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.util.ClientInfoUtils;
import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter; import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter;
import com.aizuda.easy.retry.server.job.task.dto.RealJobExecutorDTO; import com.aizuda.easy.retry.server.job.task.dto.RealJobExecutorDTO;
import com.aizuda.easy.retry.server.job.task.enums.TaskTypeEnum; import com.aizuda.easy.retry.server.job.task.enums.TaskTypeEnum;
@ -32,7 +33,9 @@ public class ShardingJobExecutor extends AbstractJobExecutor {
protected void doExecute(JobExecutorContext context) { protected void doExecute(JobExecutorContext context) {
List<JobTask> taskList = context.getTaskList(); List<JobTask> taskList = context.getTaskList();
for (int i = 0; i < taskList.size(); i++) { for (int i = 0; i < taskList.size(); i++) {
RealJobExecutorDTO realJobExecutor = JobTaskConverter.INSTANCE.toRealJobExecutorDTO(context, taskList.get(i)); JobTask jobTask = taskList.get(i);
RealJobExecutorDTO realJobExecutor = JobTaskConverter.INSTANCE.toRealJobExecutorDTO(context, jobTask);
realJobExecutor.setClientId(ClientInfoUtils.clientId(jobTask.getClientInfo()));
realJobExecutor.setShardingIndex(i); realJobExecutor.setShardingIndex(i);
realJobExecutor.setShardingTotal(taskList.size()); realJobExecutor.setShardingTotal(taskList.size());
ActorRef actorRef = ActorGenerator.jobRealTaskExecutorActor(); ActorRef actorRef = ActorGenerator.jobRealTaskExecutorActor();

View File

@ -1,5 +1,6 @@
package com.aizuda.easy.retry.server.job.task.support.handler; package com.aizuda.easy.retry.server.job.task.support.handler;
import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum;
import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum; import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.easy.retry.common.core.enums.JobTaskStatusEnum; import com.aizuda.easy.retry.common.core.enums.JobTaskStatusEnum;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskBatchMapper; import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskBatchMapper;
@ -11,6 +12,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.util.List; import java.util.List;
import java.util.Objects;
/** /**
* @author: www.byteblogs.com * @author: www.byteblogs.com
@ -24,7 +26,7 @@ public class JobTaskBatchHandler {
@Autowired @Autowired
private JobTaskBatchMapper jobTaskBatchMapper; private JobTaskBatchMapper jobTaskBatchMapper;
public boolean complete(Long taskBatchId) { public boolean complete(Long taskBatchId, JobOperationReasonEnum jobOperationReasonEnum) {
List<JobTask> jobTasks = jobTaskMapper.selectList( List<JobTask> jobTasks = jobTaskMapper.selectList(
new LambdaQueryWrapper<JobTask>().select(JobTask::getTaskStatus) new LambdaQueryWrapper<JobTask>().select(JobTask::getTaskStatus)
@ -47,6 +49,9 @@ public class JobTaskBatchHandler {
jobTaskBatch.setTaskBatchStatus(JobTaskBatchStatusEnum.SUCCESS.getStatus()); jobTaskBatch.setTaskBatchStatus(JobTaskBatchStatusEnum.SUCCESS.getStatus());
} }
if (Objects.nonNull(jobOperationReasonEnum)) {
jobTaskBatch.setOperationReason(jobOperationReasonEnum.getReason());
}
jobTaskBatchMapper.updateById(jobTaskBatch); jobTaskBatchMapper.updateById(jobTaskBatch);
return true; return true;

View File

@ -1,5 +1,6 @@
package com.aizuda.easy.retry.server.job.task.support.prepare; package com.aizuda.easy.retry.server.job.task.support.prepare;
import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum;
import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum; import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.easy.retry.server.job.task.support.BlockStrategy; import com.aizuda.easy.retry.server.job.task.support.BlockStrategy;
import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter; import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter;
@ -38,7 +39,8 @@ public class RunningJobPrepareHandler extends AbstractJobPrePareHandler {
// 若存在所有的任务都是完成但是批次上的状态为运行中则是并发导致的未把批次状态变成为终态此处做一次兜底处理 // 若存在所有的任务都是完成但是批次上的状态为运行中则是并发导致的未把批次状态变成为终态此处做一次兜底处理
int blockStrategy = prepare.getBlockStrategy(); int blockStrategy = prepare.getBlockStrategy();
if (jobTaskBatchHandler.complete(prepare.getTaskBatchId())) { JobOperationReasonEnum jobOperationReasonEnum = JobOperationReasonEnum.NONE;
if (jobTaskBatchHandler.complete(prepare.getTaskBatchId(), jobOperationReasonEnum)) {
blockStrategy = BlockStrategyEnum.CONCURRENCY.getBlockStrategy(); blockStrategy = BlockStrategyEnum.CONCURRENCY.getBlockStrategy();
} else { } else {
// 计算超时时间 // 计算超时时间
@ -48,11 +50,13 @@ public class RunningJobPrepareHandler extends AbstractJobPrePareHandler {
if (delay > prepare.getExecutorTimeout() * 1000) { if (delay > prepare.getExecutorTimeout() * 1000) {
log.info("任务执行超时.taskBatchId:[{}] delay:[{}] executorTimeout:[{}]", prepare.getTaskBatchId(), delay, prepare.getExecutorTimeout() * 1000); log.info("任务执行超时.taskBatchId:[{}] delay:[{}] executorTimeout:[{}]", prepare.getTaskBatchId(), delay, prepare.getExecutorTimeout() * 1000);
blockStrategy = BlockStrategies.BlockStrategyEnum.OVERLAY.getBlockStrategy(); blockStrategy = BlockStrategies.BlockStrategyEnum.OVERLAY.getBlockStrategy();
jobOperationReasonEnum = JobOperationReasonEnum.EXECUTE_TIMEOUT;
} }
} }
BlockStrategies.BlockStrategyContext blockStrategyContext = JobTaskConverter.INSTANCE.toBlockStrategyContext(prepare); BlockStrategies.BlockStrategyContext blockStrategyContext = JobTaskConverter.INSTANCE.toBlockStrategyContext(prepare);
blockStrategyContext.setOperationReason(jobOperationReasonEnum);
BlockStrategy blockStrategyInterface = BlockStrategies.BlockStrategyEnum.getBlockStrategy(blockStrategy); BlockStrategy blockStrategyInterface = BlockStrategies.BlockStrategyEnum.getBlockStrategy(blockStrategy);
blockStrategyInterface.block(blockStrategyContext); blockStrategyInterface.block(blockStrategyContext);

View File

@ -47,6 +47,7 @@ public abstract class AbstractJobTaskStopHandler implements JobTaskStopHandler,
JobExecutorResultDTO jobExecutorResultDTO = JobTaskConverter.INSTANCE.toJobExecutorResultDTO(jobTask); JobExecutorResultDTO jobExecutorResultDTO = JobTaskConverter.INSTANCE.toJobExecutorResultDTO(jobTask);
jobExecutorResultDTO.setTaskStatus(JobTaskStatusEnum.STOP.getStatus()); jobExecutorResultDTO.setTaskStatus(JobTaskStatusEnum.STOP.getStatus());
jobExecutorResultDTO.setMessage("任务停止成功"); jobExecutorResultDTO.setMessage("任务停止成功");
jobExecutorResultDTO.setJobOperationReasonEnum(context.getJobOperationReasonEnum());
jobExecutorResultDTO.setTaskType(getTaskType().getType()); jobExecutorResultDTO.setTaskType(getTaskType().getType());
ActorRef actorRef = ActorGenerator.jobTaskExecutorResultActor(); ActorRef actorRef = ActorGenerator.jobTaskExecutorResultActor();
actorRef.tell(jobExecutorResultDTO, actorRef); actorRef.tell(jobExecutorResultDTO, actorRef);

View File

@ -1,5 +1,6 @@
package com.aizuda.easy.retry.server.job.task.support.stop; package com.aizuda.easy.retry.server.job.task.support.stop;
import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum;
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask; import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask;
import lombok.Data; import lombok.Data;
@ -46,4 +47,6 @@ public class TaskStopJobContext {
private List<JobTask> jobTasks; private List<JobTask> jobTasks;
private JobOperationReasonEnum jobOperationReasonEnum;
} }

View File

@ -1,6 +1,7 @@
package com.aizuda.easy.retry.server.job.task.support.strategy; package com.aizuda.easy.retry.server.job.task.support.strategy;
import com.aizuda.easy.retry.common.core.context.SpringContext; import com.aizuda.easy.retry.common.core.context.SpringContext;
import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException; import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.job.task.support.BlockStrategy; import com.aizuda.easy.retry.server.job.task.support.BlockStrategy;
import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter; import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter;
@ -64,6 +65,8 @@ public class BlockStrategies {
*/ */
private LocalDateTime nextTriggerAt; private LocalDateTime nextTriggerAt;
private JobOperationReasonEnum operationReason;
} }
private static final class DiscardBlockStrategy implements BlockStrategy { private static final class DiscardBlockStrategy implements BlockStrategy {
@ -83,6 +86,7 @@ public class BlockStrategies {
// 停止任务 // 停止任务
JobTaskStopHandler instanceInterrupt = JobTaskStopFactory.getJobTaskStop(context.taskType); JobTaskStopHandler instanceInterrupt = JobTaskStopFactory.getJobTaskStop(context.taskType);
TaskStopJobContext stopJobContext = JobTaskConverter.INSTANCE.toStopJobContext(context); TaskStopJobContext stopJobContext = JobTaskConverter.INSTANCE.toStopJobContext(context);
stopJobContext.setJobOperationReasonEnum(context.getOperationReason());
stopJobContext.setNeedUpdateTaskStatus(Boolean.TRUE); stopJobContext.setNeedUpdateTaskStatus(Boolean.TRUE);
instanceInterrupt.stop(stopJobContext); instanceInterrupt.stop(stopJobContext);

View File

@ -4,6 +4,9 @@ import lombok.Data;
import java.time.LocalDateTime; import java.time.LocalDateTime;
/** /**
* @author: www.byteblogs.com * @author: www.byteblogs.com
* @date : 2023-10-12 10:18 * @date : 2023-10-12 10:18
@ -30,9 +33,9 @@ public class JobBatchResponseVO {
private Long jobId; private Long jobId;
/** /**
* 任务状态 0失败 1成功 * 任务状态
*/ */
private Integer taskStatus; private Integer taskBatchStatus;
/** /**
* 创建时间 * 创建时间
@ -43,9 +46,9 @@ public class JobBatchResponseVO {
* 任务执行时间 * 任务执行时间
*/ */
private LocalDateTime executionAt; private LocalDateTime executionAt;
/** /**
* 操作原因 * 操作原因
*/ */
private Integer operationReason; private Integer operationReason;
@ -57,5 +60,5 @@ public class JobBatchResponseVO {
/** /**
* 执行器名称 * 执行器名称
*/ */
private String executorName; private String executorInfo;
} }

View File

@ -40,6 +40,7 @@ public class JobLogServiceImpl implements JobLogService {
queryWrapper.eq(JobLogMessage::getTaskBatchId, queryVO.getTaskBatchId()); queryWrapper.eq(JobLogMessage::getTaskBatchId, queryVO.getTaskBatchId());
} }
queryWrapper.orderByDesc(JobLogMessage::getId);
PageDTO<JobLogMessage> selectPage = jobLogMessageMapper.selectPage(pageDTO, queryWrapper); PageDTO<JobLogMessage> selectPage = jobLogMessageMapper.selectPage(pageDTO, queryWrapper);
return new PageResult<>(pageDTO, JobLogResponseVOConverter.INSTANCE.toJobLogResponseVOs(selectPage.getRecords())); return new PageResult<>(pageDTO, JobLogResponseVOConverter.INSTANCE.toJobLogResponseVOs(selectPage.getRecords()));

View File

@ -12,8 +12,8 @@
{{ jobBatchInfo.jobName }} {{ jobBatchInfo.jobName }}
</a-descriptions-item> </a-descriptions-item>
<a-descriptions-item label="状态"> <a-descriptions-item label="状态">
<a-tag :color="taskStatus[jobBatchInfo.taskStatus].color"> <a-tag :color="taskBatchStatus[jobBatchInfo.taskBatchStatus].color">
{{ taskStatus[jobBatchInfo.taskStatus].name }} {{ taskBatchStatus[jobBatchInfo.taskBatchStatus].name }}
</a-tag> </a-tag>
</a-descriptions-item> </a-descriptions-item>
<a-descriptions-item label="执行器类型"> <a-descriptions-item label="执行器类型">
@ -30,7 +30,7 @@
{{ jobBatchInfo.executionAt }} {{ jobBatchInfo.executionAt }}
</a-descriptions-item> </a-descriptions-item>
<a-descriptions-item label="执行器名称" span="4"> <a-descriptions-item label="执行器名称" span="4">
{{ jobBatchInfo.executorName }} {{ jobBatchInfo.executorInfo }}
</a-descriptions-item> </a-descriptions-item>
<a-descriptions-item label="创建时间"> <a-descriptions-item label="创建时间">
{{ jobBatchInfo.createDt }} {{ jobBatchInfo.createDt }}
@ -59,7 +59,7 @@ export default {
data () { data () {
return { return {
jobBatchInfo: null, jobBatchInfo: null,
taskStatus: enums.taskBatchStatus, taskBatchStatus: enums.taskBatchStatus,
operationReason: enums.operationReason, operationReason: enums.operationReason,
taskType: enums.taskType, taskType: enums.taskType,
triggerType: enums.triggerType, triggerType: enums.triggerType,

View File

@ -163,7 +163,7 @@ export default {
advanced: false, advanced: false,
// //
queryParam: {}, queryParam: {},
taskStatus: enums.taskBatchStatus, taskBatchStatus: enums.taskBatchStatus,
operationReason: enums.operationReason, operationReason: enums.operationReason,
// //
columns: [ columns: [

View File

@ -95,10 +95,10 @@
<span slot="clientInfo" slot-scope="text"> <span slot="clientInfo" slot-scope="text">
{{ text !== '' ? text.split('@')[1] : '' }} {{ text !== '' ? text.split('@')[1] : '' }}
</span> </span>
<p slot="expandedRowRender" style="margin: 0" slot-scope="record"> <!-- <p slot="expandedRowRender" style="margin: 0" slot-scope="record">-->
执行结果: {{ record.resultMessage }}<br/> <!-- 执行结果: {{ record.resultMessage }}<br/>-->
参数: {{ record.argsStr }} <!-- 参数: {{ record.argsStr }}-->
</p> <!-- </p>-->
<span slot="action" slot-scope="text, record"> <span slot="action" slot-scope="text, record">
<template> <template>
<a @click="handleLog(record)">日志</a> <a @click="handleLog(record)">日志</a>
@ -234,7 +234,7 @@ export default {
this.advanced = !this.advanced this.advanced = !this.advanced
}, },
handleLog (record) { handleLog (record) {
this.$router.push({ path: '/job/log/list', query: { taskBatchId: record.id, jobId: record.jobId } }) this.$router.push({ path: '/job/log/list', query: { taskBatchId: record.taskBatchId, jobId: record.jobId } })
}, },
handleOk (record) {}, handleOk (record) {},
handleSuspend (record) { handleSuspend (record) {