feat: 2.4.0

1. 优化客户端负载均衡
This commit is contained in:
byteblogs168 2023-10-16 19:06:58 +08:00
parent 27fffc4d89
commit cdd9748399
51 changed files with 326 additions and 205 deletions

View File

@ -221,7 +221,6 @@ CREATE TABLE `job` (
`job_name` varchar(64) NOT NULL COMMENT '名称', `job_name` varchar(64) NOT NULL COMMENT '名称',
`args_str` text NOT NULL COMMENT '执行方法参数', `args_str` text NOT NULL COMMENT '执行方法参数',
`args_type` tinyint(4) NOT NULL DEFAULT '' COMMENT '参数类型 ', `args_type` tinyint(4) NOT NULL DEFAULT '' COMMENT '参数类型 ',
`ext_attrs` text COMMENT '扩展字段',
`next_trigger_at` datetime NOT NULL COMMENT '下次触发时间', `next_trigger_at` datetime NOT NULL COMMENT '下次触发时间',
`job_status` tinyint(4) NOT NULL DEFAULT '1' COMMENT '重试状态 0、关闭、1、开启', `job_status` tinyint(4) NOT NULL DEFAULT '1' COMMENT '重试状态 0、关闭、1、开启',
`task_type` varchar(255) DEFAULT NULL COMMENT '任务类型 1、集群 2、广播 3、切片', `task_type` varchar(255) DEFAULT NULL COMMENT '任务类型 1、集群 2、广播 3、切片',
@ -237,6 +236,7 @@ CREATE TABLE `job` (
`retry_interval` int(11) NOT NULL DEFAULT '0' COMMENT '重试间隔(s)', `retry_interval` int(11) NOT NULL DEFAULT '0' COMMENT '重试间隔(s)',
`bucket_index` int(11) NOT NULL DEFAULT '0' COMMENT 'bucket', `bucket_index` int(11) NOT NULL DEFAULT '0' COMMENT 'bucket',
`description` varchar(256) NOT NULL DEFAULT '' COMMENT '描述', `description` varchar(256) NOT NULL DEFAULT '' COMMENT '描述',
`ext_attrs` varchar(256) NULL default '' COMMENT '扩展字段',
`create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', `create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间', `update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
`deleted` tinyint(4) NOT NULL DEFAULT '0' COMMENT '逻辑删除 1、删除', `deleted` tinyint(4) NOT NULL DEFAULT '0' COMMENT '逻辑删除 1、删除',
@ -268,7 +268,7 @@ CREATE TABLE `job_task` (
`result_message` text NOT NULL COMMENT '执行结果', `result_message` text NOT NULL COMMENT '执行结果',
`args_str` text NOT NULL COMMENT '执行方法参数', `args_str` text NOT NULL COMMENT '执行方法参数',
`args_type` varchar(16) NOT NULL DEFAULT '' COMMENT '参数类型 text/json', `args_type` varchar(16) NOT NULL DEFAULT '' COMMENT '参数类型 text/json',
`ext_attrs` text NOT NULL COMMENT '扩展字段', `ext_attrs` varchar(256) NULL default '' COMMENT '扩展字段',
`create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', `create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间', `update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
PRIMARY KEY (`id`) PRIMARY KEY (`id`)

View File

@ -38,12 +38,12 @@ public class JobEndPoint {
jobContext.setTaskId(dispatchJob.getTaskId()); jobContext.setTaskId(dispatchJob.getTaskId());
jobContext.setTaskBatchId(dispatchJob.getTaskBatchId()); jobContext.setTaskBatchId(dispatchJob.getTaskBatchId());
jobContext.setGroupName(dispatchJob.getGroupName()); jobContext.setGroupName(dispatchJob.getGroupName());
jobContext.setExecutorName(dispatchJob.getExecutorName()); jobContext.setExecutorInfo(dispatchJob.getExecutorInfo());
jobContext.setParallelNum(dispatchJob.getParallelNum()); jobContext.setParallelNum(dispatchJob.getParallelNum());
jobContext.setTaskType(dispatchJob.getTaskType()); jobContext.setTaskType(dispatchJob.getTaskType());
jobContext.setExecutorTimeout(dispatchJob.getExecutorTimeout()); jobContext.setExecutorTimeout(dispatchJob.getExecutorTimeout());
JobExecutorInfo jobExecutorInfo = JobExecutorInfoCache.get(jobContext.getExecutorName()); JobExecutorInfo jobExecutorInfo = JobExecutorInfoCache.get(jobContext.getExecutorInfo());
if (jobExecutorInfo.isAnnotation()) { if (jobExecutorInfo.isAnnotation()) {
IJobExecutor iJobExecutor = SpringContext.getBeanByType(AnnotationJobExecutor.class); IJobExecutor iJobExecutor = SpringContext.getBeanByType(AnnotationJobExecutor.class);
iJobExecutor.jobExecute(jobContext); iJobExecutor.jobExecute(jobContext);

View File

@ -17,7 +17,7 @@ public class JobContext {
private String groupName; private String groupName;
private String executorName; private String executorInfo;
/** /**
* 任务类型 * 任务类型

View File

@ -19,7 +19,7 @@ public class AnnotationJobExecutor extends AbstractJobExecutor {
@Override @Override
protected ExecuteResult doJobExecute(final JobContext jobContext) { protected ExecuteResult doJobExecute(final JobContext jobContext) {
JobExecutorInfo jobExecutorInfo = JobExecutorInfoCache.get(jobContext.getExecutorName()); JobExecutorInfo jobExecutorInfo = JobExecutorInfoCache.get(jobContext.getExecutorInfo());
return (ExecuteResult) ReflectionUtils.invokeMethod(jobExecutorInfo.getMethod(), jobExecutorInfo.getExecutor(), jobContext); return (ExecuteResult) ReflectionUtils.invokeMethod(jobExecutorInfo.getMethod(), jobExecutorInfo.getExecutor(), jobContext);
} }
} }

View File

@ -33,13 +33,14 @@ public class DispatchJobRequest {
@NotNull(message = "executorType 不能为空") @NotNull(message = "executorType 不能为空")
private Integer executorType; private Integer executorType;
@NotBlank(message = "executorName 不能为空") @NotBlank(message = "executorInfo 不能为空")
private String executorName; private String executorInfo;
private Integer shardingTotal; private Integer shardingTotal;
private Integer shardingIndex; private Integer shardingIndex;
@NotBlank(message = "executorTimeout 不能为空")
private Integer executorTimeout; private Integer executorTimeout;
} }

View File

@ -0,0 +1,18 @@
package com.aizuda.easy.retry.template.datasource.persistence.dataobject;
import lombok.Data;
/**
* @author www.byteblogs.com
* @date 2023-10-15 23:03:01
* @since 2.4.0
*/
@Data
public class JobBatchQueryDO {
private String groupName;
private Integer taskBatchStatus;
private String jobName;
private Long jobId;
}

View File

@ -2,6 +2,8 @@ package com.aizuda.easy.retry.template.datasource.persistence.dataobject;
import lombok.Data; import lombok.Data;
import java.time.LocalDateTime;
/** /**
* @author www.byteblogs.com * @author www.byteblogs.com
* @date 2023-10-15 23:03:01 * @date 2023-10-15 23:03:01
@ -9,4 +11,58 @@ import lombok.Data;
*/ */
@Data @Data
public class JobBatchResponseDO { public class JobBatchResponseDO {
private Long id;
/**
* 组名称
*/
private String groupName;
/**
* 名称
*/
private String jobName;
/**
* 任务信息id
*/
private Long jobId;
/**
* 任务状态 0失败 1成功
*/
private Integer taskStatus;
/**
* 创建时间
*/
private LocalDateTime createDt;
/**
* 任务执行时间
*/
private LocalDateTime executionAt;
/**
* 操作原因
*/
private Integer operationReason;
/**
* 执行器类型 1Java
*/
private Integer executorType;
/**
* 执行器名称
*/
private String executorName;
private Integer taskType;
private Integer blockStrategy;
private Integer triggerType;
} }

View File

@ -1,8 +1,10 @@
package com.aizuda.easy.retry.template.datasource.persistence.mapper; package com.aizuda.easy.retry.template.datasource.persistence.mapper;
import com.aizuda.easy.retry.template.datasource.persistence.dataobject.JobBatchQueryDO;
import com.aizuda.easy.retry.template.datasource.persistence.dataobject.JobBatchResponseDO; import com.aizuda.easy.retry.template.datasource.persistence.dataobject.JobBatchResponseDO;
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTaskBatch; import com.aizuda.easy.retry.template.datasource.persistence.po.JobTaskBatch;
import com.baomidou.mybatisplus.core.mapper.BaseMapper; import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import org.apache.ibatis.annotations.Mapper; import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param; import org.apache.ibatis.annotations.Param;
@ -19,5 +21,5 @@ import java.util.List;
@Mapper @Mapper
public interface JobTaskBatchMapper extends BaseMapper<JobTaskBatch> { public interface JobTaskBatchMapper extends BaseMapper<JobTaskBatch> {
List<JobBatchResponseDO> selectJobBatchList(); List<JobBatchResponseDO> selectJobBatchList(IPage<JobTaskBatch> iPage, @Param("queryDO") JobBatchQueryDO queryDO);
} }

View File

@ -19,9 +19,6 @@ public class GroupConfig implements Serializable {
private Integer groupPartition; private Integer groupPartition;
@Deprecated
private Integer routeKey;
private Integer idGeneratorMode; private Integer idGeneratorMode;
private Integer version; private Integer version;

View File

@ -75,9 +75,9 @@ public class Job implements Serializable {
private Integer executorType; private Integer executorType;
/** /**
* 执行器名称 * 执行器信息
*/ */
private String executorName; private String executorInfo;
/** /**
* 触发类型 1.CRON 表达式 2. 固定时间 * 触发类型 1.CRON 表达式 2. 固定时间

View File

@ -56,11 +56,6 @@ public class JobLogMessage implements Serializable {
*/ */
private LocalDateTime createDt; private LocalDateTime createDt;
/**
* 客户端信息
*/
private String clientAddress;
/** /**
* 调度信息 * 调度信息
*/ */

View File

@ -52,9 +52,9 @@ public class JobTask implements Serializable {
private Long parentId; private Long parentId;
/** /**
* 执行状态 0失败 1成功 * 执行状态
*/ */
private Integer executeStatus; private Integer taskStatus;
/** /**
* 重试次数 * 重试次数
@ -69,7 +69,7 @@ public class JobTask implements Serializable {
/** /**
* 客户端ID * 客户端ID
*/ */
private String clientId; private String clientInfo;
/** /**
* 执行方法参数 * 执行方法参数

View File

@ -42,9 +42,9 @@ public class JobTaskBatch implements Serializable {
private Long jobId; private Long jobId;
/** /**
* 任务状态 0失败 1成功 * 任务批次状态
*/ */
private Integer taskStatus; private Integer taskBatchStatus;
/** /**
* 创建时间 * 创建时间

View File

@ -12,8 +12,26 @@
<result column="update_dt" property="updateDt" /> <result column="update_dt" property="updateDt" />
<result column="deleted" property="deleted" /> <result column="deleted" property="deleted" />
</resultMap> </resultMap>
<select id="selectJobBatchList" resultType="com.aizuda.easy.retry.template.datasource.persistence.dataobject.JobBatchResponseDO"> <select id="selectJobBatchList"
parameterType="com.aizuda.easy.retry.template.datasource.persistence.dataobject.JobBatchQueryDO"
resultType="com.aizuda.easy.retry.template.datasource.persistence.dataobject.JobBatchResponseDO">
SELECT a.*, b.job_name, b.task_type, b.block_strategy, b.trigger_type SELECT a.*, b.job_name, b.task_type, b.block_strategy, b.trigger_type
FROM job_task_batch a join job b on a.job_id = b.id FROM job_task_batch a join job b on a.job_id = b.id
<where>
<if test="queryDO.jobId != null">
and job_id = #{queryDO.jobId}
</if>
<if test="queryDO.groupName != null">
and a.group_name = #{queryDO.groupName}
</if>
<if test="queryDO.taskBatchStatus != null">
and task_status = #{queryDO.taskBatchStatus}
</if>
<if test="queryDO.jobName != null">
and job_name like #{queryDO.jobName}
</if>
and a.deleted = 0
</where>
</select> </select>
</mapper> </mapper>

View File

@ -0,0 +1,32 @@
package com.aizuda.easy.retry.server.common.util;
import cn.hutool.core.util.StrUtil;
import com.aizuda.easy.retry.server.common.dto.RegisterNodeInfo;
import com.google.common.base.Splitter;
import java.util.List;
/**
* @author: www.byteblogs.com
* @date : 2023-10-16 15:20
* @since : 2.4.0
*/
public class ClientInfoUtils {
public static String generate(RegisterNodeInfo registerNodeInfo) {
return registerNodeInfo + StrUtil.AT + registerNodeInfo.address();
}
public static String clientId(String clientInfo) {
return split(clientInfo).get(0);
}
public static String address(String clientInfo) {
return split(clientInfo).get(1);
}
public static List<String> split(String clientInfo) {
return Splitter.on(StrUtil.AT).trimResults().splitToList(clientInfo);
}
}

View File

@ -55,6 +55,10 @@
<groupId>com.github.rholder</groupId> <groupId>com.github.rholder</groupId>
<artifactId>guava-retrying</artifactId> <artifactId>guava-retrying</artifactId>
</dependency> </dependency>
<dependency>
<groupId>com.aizuda</groupId>
<artifactId>easy-retry-server-retry-task</artifactId>
</dependency>
</dependencies> </dependencies>
<build> <build>

View File

@ -22,8 +22,6 @@ public class JobExecutorResultDTO {
private String message; private String message;
private String clientId;
private Integer taskType; private Integer taskType;
private Object result; private Object result;

View File

@ -30,10 +30,6 @@ public class JobLogDTO {
*/ */
private Long taskId; private Long taskId;
/**
* 执行的客户端信息
*/
private String clientId;
/** /**
* 调度信息 * 调度信息

View File

@ -68,7 +68,6 @@ public interface JobTaskConverter {
@Mapping(source = "jobTask.groupName", target = "groupName"), @Mapping(source = "jobTask.groupName", target = "groupName"),
@Mapping(source = "jobTask.jobId", target = "jobId"), @Mapping(source = "jobTask.jobId", target = "jobId"),
@Mapping(source = "jobTask.taskBatchId", target = "taskBatchId"), @Mapping(source = "jobTask.taskBatchId", target = "taskBatchId"),
@Mapping(source = "jobTask.clientId", target = "clientId"),
@Mapping(source = "jobTask.id", target = "taskId"), @Mapping(source = "jobTask.id", target = "taskId"),
@Mapping(source = "jobTask.argsStr", target = "argsStr"), @Mapping(source = "jobTask.argsStr", target = "argsStr"),
@Mapping(source = "jobTask.argsType", target = "argsType"), @Mapping(source = "jobTask.argsType", target = "argsType"),

View File

@ -50,7 +50,7 @@ public class JobExecutorResultActor extends AbstractActor {
@Override @Override
protected void doInTransactionWithoutResult(final TransactionStatus status) { protected void doInTransactionWithoutResult(final TransactionStatus status) {
JobTask jobTask = new JobTask(); JobTask jobTask = new JobTask();
jobTask.setExecuteStatus(result.getTaskStatus()); jobTask.setTaskStatus(result.getTaskStatus());
if (Objects.nonNull(result.getResult())) { if (Objects.nonNull(result.getResult())) {
jobTask.setResultMessage(JsonUtil.toJsonString(result.getResult())); jobTask.setResultMessage(JsonUtil.toJsonString(result.getResult()));
} }
@ -66,7 +66,6 @@ public class JobExecutorResultActor extends AbstractActor {
JobLogDTO jobLogDTO = JobTaskConverter.INSTANCE.toJobLogDTO(result); JobLogDTO jobLogDTO = JobTaskConverter.INSTANCE.toJobLogDTO(result);
jobLogDTO.setMessage(result.getMessage()); jobLogDTO.setMessage(result.getMessage());
jobLogDTO.setClientId(result.getClientId());
jobLogDTO.setTaskId(result.getTaskId()); jobLogDTO.setTaskId(result.getTaskId());
ActorRef actorRef = ActorGenerator.jobLogActor(); ActorRef actorRef = ActorGenerator.jobLogActor();
actorRef.tell(jobLogDTO, actorRef); actorRef.tell(jobLogDTO, actorRef);

View File

@ -47,12 +47,6 @@ public class JobLogActor extends AbstractActor {
private void saveLogMessage(JobLogDTO jobLogDTO) { private void saveLogMessage(JobLogDTO jobLogDTO) {
JobLogMessage jobLogMessage = JobTaskConverter.INSTANCE.toJobLogMessage(jobLogDTO); JobLogMessage jobLogMessage = JobTaskConverter.INSTANCE.toJobLogMessage(jobLogDTO);
if (Objects.nonNull(jobLogDTO.getClientId())) {
Optional.ofNullable(CacheRegisterTable.getServerNode(jobLogDTO.getGroupName(), jobLogDTO.getClientId())).ifPresent(registerNodeInfo -> {
jobLogMessage.setClientAddress(registerNodeInfo.address());
});
}
jobLogMessage.setCreateDt(LocalDateTime.now()); jobLogMessage.setCreateDt(LocalDateTime.now());
jobLogMessage.setMessage(Optional.ofNullable(jobLogDTO.getMessage()).orElse(StrUtil.EMPTY)); jobLogMessage.setMessage(Optional.ofNullable(jobLogDTO.getMessage()).orElse(StrUtil.EMPTY));
jobLogMessage.setTaskId(Optional.ofNullable(jobLogMessage.getTaskId()).orElse(0L)); jobLogMessage.setTaskId(Optional.ofNullable(jobLogMessage.getTaskId()).orElse(0L));

View File

@ -53,7 +53,7 @@ public class JobTaskPrepareActor extends AbstractActor {
List<JobTaskBatch> notCompleteJobTaskBatchList = jobTaskBatchMapper.selectList(new LambdaQueryWrapper<JobTaskBatch>() List<JobTaskBatch> notCompleteJobTaskBatchList = jobTaskBatchMapper.selectList(new LambdaQueryWrapper<JobTaskBatch>()
.eq(JobTaskBatch::getJobId, prepare.getJobId()) .eq(JobTaskBatch::getJobId, prepare.getJobId())
.in(JobTaskBatch::getTaskStatus, NOT_COMPLETE)); .in(JobTaskBatch::getTaskBatchStatus, NOT_COMPLETE));
// 说明所以任务已经完成 // 说明所以任务已经完成
if (CollectionUtils.isEmpty(notCompleteJobTaskBatchList)) { if (CollectionUtils.isEmpty(notCompleteJobTaskBatchList)) {
@ -64,7 +64,7 @@ public class JobTaskPrepareActor extends AbstractActor {
prepare.setExecutionAt(jobTaskBatch.getExecutionAt()); prepare.setExecutionAt(jobTaskBatch.getExecutionAt());
prepare.setTaskBatchId(jobTaskBatch.getId()); prepare.setTaskBatchId(jobTaskBatch.getId());
for (JobPrePareHandler prePareHandler : prePareHandlers) { for (JobPrePareHandler prePareHandler : prePareHandlers) {
if (prePareHandler.matches(jobTaskBatch.getTaskStatus())) { if (prePareHandler.matches(jobTaskBatch.getTaskBatchStatus())) {
prePareHandler.handler(prepare); prePareHandler.handler(prepare);
break; break;
} }

View File

@ -11,9 +11,9 @@ import com.aizuda.easy.retry.server.common.dto.ScanTask;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException; import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.common.util.PartitionTaskUtils; import com.aizuda.easy.retry.server.common.util.PartitionTaskUtils;
import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter; import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter;
import com.aizuda.easy.retry.server.job.task.support.WaitStrategy;
import com.aizuda.easy.retry.server.job.task.dto.JobPartitionTask; import com.aizuda.easy.retry.server.job.task.dto.JobPartitionTask;
import com.aizuda.easy.retry.server.job.task.dto.JobTaskPrepareDTO; import com.aizuda.easy.retry.server.job.task.dto.JobTaskPrepareDTO;
import com.aizuda.easy.retry.server.job.task.support.WaitStrategy;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper; import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper;
import com.aizuda.easy.retry.template.datasource.persistence.po.Job; import com.aizuda.easy.retry.template.datasource.persistence.po.Job;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;

View File

@ -2,6 +2,7 @@ package com.aizuda.easy.retry.server.job.task.support.executor;
import akka.actor.ActorRef; import akka.actor.ActorRef;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator; import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.util.ClientInfoUtils;
import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter; import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter;
import com.aizuda.easy.retry.server.job.task.dto.RealJobExecutorDTO; import com.aizuda.easy.retry.server.job.task.dto.RealJobExecutorDTO;
import com.aizuda.easy.retry.server.job.task.enums.TaskTypeEnum; import com.aizuda.easy.retry.server.job.task.enums.TaskTypeEnum;
@ -33,6 +34,7 @@ public class BroadcastTaskJobExecutor extends AbstractJobExecutor {
for (JobTask jobTask : taskList) { for (JobTask jobTask : taskList) {
RealJobExecutorDTO realJobExecutor = JobTaskConverter.INSTANCE.toRealJobExecutorDTO(context, jobTask); RealJobExecutorDTO realJobExecutor = JobTaskConverter.INSTANCE.toRealJobExecutorDTO(context, jobTask);
realJobExecutor.setClientId(ClientInfoUtils.clientId(jobTask.getClientInfo()));
ActorRef actorRef = ActorGenerator.jobRealTaskExecutorActor(); ActorRef actorRef = ActorGenerator.jobRealTaskExecutorActor();
actorRef.tell(realJobExecutor, actorRef); actorRef.tell(realJobExecutor, actorRef);
} }

View File

@ -63,7 +63,6 @@ public class JobExecutorContext {
private String executorName; private String executorName;
private String clientId;
/** /**
* 最大重试次数 * 最大重试次数

View File

@ -107,6 +107,7 @@ public class RealJobExecutorActor extends AbstractActor {
LogUtils.error(log, "任务调度失败. taskInstanceId:[{}] count:[{}]", LogUtils.error(log, "任务调度失败. taskInstanceId:[{}] count:[{}]",
realJobExecutorDTO.getTaskBatchId(), attempt.getAttemptNumber(), attempt.getExceptionCause()); realJobExecutorDTO.getTaskBatchId(), attempt.getAttemptNumber(), attempt.getExceptionCause());
JobTask jobTask = new JobTask(); JobTask jobTask = new JobTask();
jobTask.setId(realJobExecutorDTO.getTaskId());
jobTask.setRetryCount((int) attempt.getAttemptNumber()); jobTask.setRetryCount((int) attempt.getAttemptNumber());
jobTaskMapper.updateById(jobTask); jobTaskMapper.updateById(jobTask);
} }

View File

@ -41,14 +41,14 @@ public class JobTaskBatchGenerator {
// 无执行的节点 // 无执行的节点
if (CollectionUtils.isEmpty(CacheRegisterTable.getServerNodeSet(context.getGroupName()))) { if (CollectionUtils.isEmpty(CacheRegisterTable.getServerNodeSet(context.getGroupName()))) {
jobTaskBatch.setTaskStatus(JobTaskBatchStatusEnum.CANCEL.getStatus()); jobTaskBatch.setTaskBatchStatus(JobTaskBatchStatusEnum.CANCEL.getStatus());
jobTaskBatch.setOperationReason(JobOperationReasonEnum.NOT_CLIENT.getReason()); jobTaskBatch.setOperationReason(JobOperationReasonEnum.NOT_CLIENT.getReason());
Assert.isTrue(1 == jobTaskBatchMapper.insert(jobTaskBatch), () -> new EasyRetryServerException("新增调度任务失败.jobId:[{}]", context.getJobId())); Assert.isTrue(1 == jobTaskBatchMapper.insert(jobTaskBatch), () -> new EasyRetryServerException("新增调度任务失败.jobId:[{}]", context.getJobId()));
return; return;
} }
// 生成一个新的任务 // 生成一个新的任务
jobTaskBatch.setTaskStatus(JobTaskBatchStatusEnum.WAITING.getStatus()); jobTaskBatch.setTaskBatchStatus(JobTaskBatchStatusEnum.WAITING.getStatus());
Assert.isTrue(1 == jobTaskBatchMapper.insert(jobTaskBatch), () -> new EasyRetryServerException("新增调度任务失败.jobId:[{}]", context.getJobId())); Assert.isTrue(1 == jobTaskBatchMapper.insert(jobTaskBatch), () -> new EasyRetryServerException("新增调度任务失败.jobId:[{}]", context.getJobId()));
// 进入时间轮 // 进入时间轮

View File

@ -6,11 +6,11 @@ import com.aizuda.easy.retry.common.core.enums.JobTaskStatusEnum;
import com.aizuda.easy.retry.server.common.cache.CacheRegisterTable; import com.aizuda.easy.retry.server.common.cache.CacheRegisterTable;
import com.aizuda.easy.retry.server.common.dto.RegisterNodeInfo; import com.aizuda.easy.retry.server.common.dto.RegisterNodeInfo;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException; import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.common.util.ClientInfoUtils;
import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter; import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter;
import com.aizuda.easy.retry.server.job.task.enums.TaskTypeEnum; import com.aizuda.easy.retry.server.job.task.enums.TaskTypeEnum;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper; import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskMapper; import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskMapper;
import com.aizuda.easy.retry.template.datasource.persistence.po.Job;
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask; import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@ -55,10 +55,10 @@ public class BroadcastTaskGenerator extends AbstractJobTaskGenerator {
List<JobTask> jobTasks = new ArrayList<>(serverNodes.size()); List<JobTask> jobTasks = new ArrayList<>(serverNodes.size());
for (RegisterNodeInfo serverNode : serverNodes) { for (RegisterNodeInfo serverNode : serverNodes) {
JobTask jobTask = JobTaskConverter.INSTANCE.toJobTaskInstance(context); JobTask jobTask = JobTaskConverter.INSTANCE.toJobTaskInstance(context);
jobTask.setClientId(serverNode.getHostId()); jobTask.setClientInfo(ClientInfoUtils.generate(serverNode));
jobTask.setArgsType(context.getArgsType()); jobTask.setArgsType(context.getArgsType());
jobTask.setArgsStr(context.getArgsStr()); jobTask.setArgsStr(context.getArgsStr());
jobTask.setExecuteStatus(JobTaskStatusEnum.RUNNING.getStatus()); jobTask.setTaskStatus(JobTaskStatusEnum.RUNNING.getStatus());
jobTask.setResultMessage(Optional.ofNullable(jobTask.getResultMessage()).orElse(StrUtil.EMPTY)); jobTask.setResultMessage(Optional.ofNullable(jobTask.getResultMessage()).orElse(StrUtil.EMPTY));
Assert.isTrue(1 == jobTaskMapper.insert(jobTask), () -> new EasyRetryServerException("新增任务实例失败")); Assert.isTrue(1 == jobTaskMapper.insert(jobTask), () -> new EasyRetryServerException("新增任务实例失败"));
jobTasks.add(jobTask); jobTasks.add(jobTask);

View File

@ -6,11 +6,11 @@ import com.aizuda.easy.retry.common.core.enums.JobTaskStatusEnum;
import com.aizuda.easy.retry.server.common.dto.RegisterNodeInfo; import com.aizuda.easy.retry.server.common.dto.RegisterNodeInfo;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException; import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.common.handler.ClientNodeAllocateHandler; import com.aizuda.easy.retry.server.common.handler.ClientNodeAllocateHandler;
import com.aizuda.easy.retry.server.common.util.ClientInfoUtils;
import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter; import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter;
import com.aizuda.easy.retry.server.job.task.enums.TaskTypeEnum; import com.aizuda.easy.retry.server.job.task.enums.TaskTypeEnum;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper; import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskMapper; import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskMapper;
import com.aizuda.easy.retry.template.datasource.persistence.po.Job;
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask; import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@ -54,10 +54,10 @@ public class ClusterTaskGenerator extends AbstractJobTaskGenerator {
// 新增任务实例 // 新增任务实例
JobTask jobTask = JobTaskConverter.INSTANCE.toJobTaskInstance(context); JobTask jobTask = JobTaskConverter.INSTANCE.toJobTaskInstance(context);
jobTask.setClientId(serverNode.getHostId()); jobTask.setClientInfo(ClientInfoUtils.generate(serverNode));
jobTask.setArgsType(context.getArgsType()); jobTask.setArgsType(context.getArgsType());
jobTask.setArgsStr(context.getArgsStr()); jobTask.setArgsStr(context.getArgsStr());
jobTask.setExecuteStatus(JobTaskStatusEnum.RUNNING.getStatus()); jobTask.setTaskStatus(JobTaskStatusEnum.RUNNING.getStatus());
jobTask.setResultMessage(Optional.ofNullable(jobTask.getResultMessage()).orElse(StrUtil.EMPTY)); jobTask.setResultMessage(Optional.ofNullable(jobTask.getResultMessage()).orElse(StrUtil.EMPTY));
Assert.isTrue(1 == jobTaskMapper.insert(jobTask), () -> new EasyRetryServerException("新增任务实例失败")); Assert.isTrue(1 == jobTaskMapper.insert(jobTask), () -> new EasyRetryServerException("新增任务实例失败"));

View File

@ -7,11 +7,10 @@ import com.aizuda.easy.retry.server.common.cache.CacheRegisterTable;
import com.aizuda.easy.retry.server.common.dto.RegisterNodeInfo; import com.aizuda.easy.retry.server.common.dto.RegisterNodeInfo;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException; import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.common.handler.ClientNodeAllocateHandler; import com.aizuda.easy.retry.server.common.handler.ClientNodeAllocateHandler;
import com.aizuda.easy.retry.server.common.util.ClientInfoUtils;
import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter; import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter;
import com.aizuda.easy.retry.server.job.task.enums.TaskTypeEnum; import com.aizuda.easy.retry.server.job.task.enums.TaskTypeEnum;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskMapper; import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskMapper;
import com.aizuda.easy.retry.template.datasource.persistence.po.Job;
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask; import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask;
import com.google.common.base.Splitter; import com.google.common.base.Splitter;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
@ -62,10 +61,10 @@ public class ShardingTaskGenerator extends AbstractJobTaskGenerator {
RegisterNodeInfo registerNodeInfo = nodeInfoList.get(Integer.parseInt(key) % serverNodes.size()); RegisterNodeInfo registerNodeInfo = nodeInfoList.get(Integer.parseInt(key) % serverNodes.size());
// 新增任务实例 // 新增任务实例
JobTask jobTask = JobTaskConverter.INSTANCE.toJobTaskInstance(context); JobTask jobTask = JobTaskConverter.INSTANCE.toJobTaskInstance(context);
jobTask.setClientId(registerNodeInfo.getHostId()); jobTask.setClientInfo(ClientInfoUtils.generate(registerNodeInfo));
jobTask.setArgsType(context.getArgsType()); jobTask.setArgsType(context.getArgsType());
jobTask.setArgsStr(value); jobTask.setArgsStr(value);
jobTask.setExecuteStatus(JobTaskStatusEnum.RUNNING.getStatus()); jobTask.setTaskStatus(JobTaskStatusEnum.RUNNING.getStatus());
jobTask.setResultMessage(Optional.ofNullable(jobTask.getResultMessage()).orElse(StrUtil.EMPTY)); jobTask.setResultMessage(Optional.ofNullable(jobTask.getResultMessage()).orElse(StrUtil.EMPTY));
Assert.isTrue(1 == jobTaskMapper.insert(jobTask), () -> new EasyRetryServerException("新增任务实例失败")); Assert.isTrue(1 == jobTaskMapper.insert(jobTask), () -> new EasyRetryServerException("新增任务实例失败"));
}); });

View File

@ -27,24 +27,24 @@ public class JobTaskBatchHandler {
public boolean complete(Long taskBatchId) { public boolean complete(Long taskBatchId) {
List<JobTask> jobTasks = jobTaskMapper.selectList( List<JobTask> jobTasks = jobTaskMapper.selectList(
new LambdaQueryWrapper<JobTask>().select(JobTask::getExecuteStatus) new LambdaQueryWrapper<JobTask>().select(JobTask::getTaskStatus)
.eq(JobTask::getTaskBatchId, taskBatchId)); .eq(JobTask::getTaskBatchId, taskBatchId));
if (jobTasks.stream().anyMatch(jobTask -> JobTaskStatusEnum.NOT_COMPLETE.contains(jobTask.getExecuteStatus()))) { if (jobTasks.stream().anyMatch(jobTask -> JobTaskStatusEnum.NOT_COMPLETE.contains(jobTask.getTaskStatus()))) {
return false; return false;
} }
long failCount = jobTasks.stream().filter(jobTask -> jobTask.getExecuteStatus() == JobTaskBatchStatusEnum.FAIL.getStatus()).count(); long failCount = jobTasks.stream().filter(jobTask -> jobTask.getTaskStatus() == JobTaskBatchStatusEnum.FAIL.getStatus()).count();
long stopCount = jobTasks.stream().filter(jobTask -> jobTask.getExecuteStatus() == JobTaskBatchStatusEnum.STOP.getStatus()).count(); long stopCount = jobTasks.stream().filter(jobTask -> jobTask.getTaskStatus() == JobTaskBatchStatusEnum.STOP.getStatus()).count();
JobTaskBatch jobTaskBatch = new JobTaskBatch(); JobTaskBatch jobTaskBatch = new JobTaskBatch();
jobTaskBatch.setId(taskBatchId); jobTaskBatch.setId(taskBatchId);
if (failCount > 0) { if (failCount > 0) {
jobTaskBatch.setTaskStatus(JobTaskBatchStatusEnum.FAIL.getStatus()); jobTaskBatch.setTaskBatchStatus(JobTaskBatchStatusEnum.FAIL.getStatus());
} else if (stopCount > 0) { } else if (stopCount > 0) {
jobTaskBatch.setTaskStatus(JobTaskBatchStatusEnum.STOP.getStatus()); jobTaskBatch.setTaskBatchStatus(JobTaskBatchStatusEnum.STOP.getStatus());
} else { } else {
jobTaskBatch.setTaskStatus(JobTaskBatchStatusEnum.SUCCESS.getStatus()); jobTaskBatch.setTaskBatchStatus(JobTaskBatchStatusEnum.SUCCESS.getStatus());
} }
jobTaskBatchMapper.updateById(jobTaskBatch); jobTaskBatchMapper.updateById(jobTaskBatch);

View File

@ -31,7 +31,7 @@ public abstract class AbstractJobTaskStopHandler implements JobTaskStopHandler,
List<JobTask> jobTasks = jobTaskMapper.selectList( List<JobTask> jobTasks = jobTaskMapper.selectList(
new LambdaQueryWrapper<JobTask>() new LambdaQueryWrapper<JobTask>()
.eq(JobTask::getTaskBatchId, context.getTaskBatchId()) .eq(JobTask::getTaskBatchId, context.getTaskBatchId())
.in(JobTask::getExecuteStatus, JobTaskStatusEnum.NOT_COMPLETE) .in(JobTask::getTaskStatus, JobTaskStatusEnum.NOT_COMPLETE)
); );
if (CollectionUtils.isEmpty(jobTasks)) { if (CollectionUtils.isEmpty(jobTasks)) {

View File

@ -2,6 +2,7 @@ package com.aizuda.easy.retry.server.job.task.support.stop;
import akka.actor.ActorRef; import akka.actor.ActorRef;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator; import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.util.ClientInfoUtils;
import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter; import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter;
import com.aizuda.easy.retry.server.job.task.dto.RealStopTaskInstanceDTO; import com.aizuda.easy.retry.server.job.task.dto.RealStopTaskInstanceDTO;
import com.aizuda.easy.retry.server.job.task.enums.TaskTypeEnum; import com.aizuda.easy.retry.server.job.task.enums.TaskTypeEnum;
@ -27,9 +28,8 @@ public class BroadcastTaskStopHandler extends AbstractJobTaskStopHandler {
public void doStop(TaskStopJobContext context) { public void doStop(TaskStopJobContext context) {
for (final JobTask jobTask : context.getJobTasks()) { for (final JobTask jobTask : context.getJobTasks()) {
String clientId = jobTask.getClientId();
RealStopTaskInstanceDTO taskInstanceDTO = JobTaskConverter.INSTANCE.toRealStopTaskInstanceDTO(context); RealStopTaskInstanceDTO taskInstanceDTO = JobTaskConverter.INSTANCE.toRealStopTaskInstanceDTO(context);
taskInstanceDTO.setClientId(clientId); taskInstanceDTO.setClientId(ClientInfoUtils.clientId(jobTask.getClientInfo()));
ActorRef actorRef = ActorGenerator.jobRealStopTaskInstanceActor(); ActorRef actorRef = ActorGenerator.jobRealStopTaskInstanceActor();
actorRef.tell(taskInstanceDTO, actorRef); actorRef.tell(taskInstanceDTO, actorRef);

View File

@ -2,6 +2,7 @@ package com.aizuda.easy.retry.server.job.task.support.stop;
import akka.actor.ActorRef; import akka.actor.ActorRef;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator; import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.util.ClientInfoUtils;
import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter; import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter;
import com.aizuda.easy.retry.server.job.task.dto.RealStopTaskInstanceDTO; import com.aizuda.easy.retry.server.job.task.dto.RealStopTaskInstanceDTO;
import com.aizuda.easy.retry.server.job.task.enums.TaskTypeEnum; import com.aizuda.easy.retry.server.job.task.enums.TaskTypeEnum;
@ -30,9 +31,8 @@ public class ClusterTaskStopHandler extends AbstractJobTaskStopHandler {
public void doStop(TaskStopJobContext context) { public void doStop(TaskStopJobContext context) {
List<JobTask> jobTasks = context.getJobTasks(); List<JobTask> jobTasks = context.getJobTasks();
String clientId = jobTasks.get(0).getClientId();
RealStopTaskInstanceDTO taskInstanceDTO = JobTaskConverter.INSTANCE.toRealStopTaskInstanceDTO(context); RealStopTaskInstanceDTO taskInstanceDTO = JobTaskConverter.INSTANCE.toRealStopTaskInstanceDTO(context);
taskInstanceDTO.setClientId(clientId); taskInstanceDTO.setClientId(ClientInfoUtils.clientId(jobTasks.get(0).getClientInfo()));
ActorRef actorRef = ActorGenerator.jobRealStopTaskInstanceActor(); ActorRef actorRef = ActorGenerator.jobRealStopTaskInstanceActor();
actorRef.tell(taskInstanceDTO, actorRef); actorRef.tell(taskInstanceDTO, actorRef);

View File

@ -2,6 +2,7 @@ package com.aizuda.easy.retry.server.job.task.support.stop;
import akka.actor.ActorRef; import akka.actor.ActorRef;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator; import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.util.ClientInfoUtils;
import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter; import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter;
import com.aizuda.easy.retry.server.job.task.dto.RealStopTaskInstanceDTO; import com.aizuda.easy.retry.server.job.task.dto.RealStopTaskInstanceDTO;
import com.aizuda.easy.retry.server.job.task.enums.TaskTypeEnum; import com.aizuda.easy.retry.server.job.task.enums.TaskTypeEnum;
@ -27,9 +28,8 @@ public class ShardingTaskStopHandler extends AbstractJobTaskStopHandler {
public void doStop(TaskStopJobContext context) { public void doStop(TaskStopJobContext context) {
for (final JobTask jobTask : context.getJobTasks()) { for (final JobTask jobTask : context.getJobTasks()) {
String clientId = jobTask.getClientId();
RealStopTaskInstanceDTO taskInstanceDTO = JobTaskConverter.INSTANCE.toRealStopTaskInstanceDTO(context); RealStopTaskInstanceDTO taskInstanceDTO = JobTaskConverter.INSTANCE.toRealStopTaskInstanceDTO(context);
taskInstanceDTO.setClientId(clientId); taskInstanceDTO.setClientId(ClientInfoUtils.clientId(jobTask.getClientInfo()));
ActorRef actorRef = ActorGenerator.jobRealStopTaskInstanceActor(); ActorRef actorRef = ActorGenerator.jobRealStopTaskInstanceActor();
actorRef.tell(taskInstanceDTO, actorRef); actorRef.tell(taskInstanceDTO, actorRef);

View File

@ -69,7 +69,7 @@ public class JobTimerTask implements TimerTask {
JobTaskBatch jobTaskBatch = new JobTaskBatch(); JobTaskBatch jobTaskBatch = new JobTaskBatch();
jobTaskBatch.setId(jobTimerTaskDTO.getTaskBatchId()); jobTaskBatch.setId(jobTimerTaskDTO.getTaskBatchId());
jobTaskBatch.setExecutionAt(LocalDateTime.now()); jobTaskBatch.setExecutionAt(LocalDateTime.now());
jobTaskBatch.setTaskStatus(taskStatus); jobTaskBatch.setTaskBatchStatus(taskStatus);
jobTaskBatch.setOperationReason(operationReason); jobTaskBatch.setOperationReason(operationReason);
Assert.isTrue(1 == jobTaskBatchMapper.updateById(jobTaskBatch), Assert.isTrue(1 == jobTaskBatchMapper.updateById(jobTaskBatch),
() -> new EasyRetryServerException("更新任务失败")); () -> new EasyRetryServerException("更新任务失败"));

View File

@ -66,6 +66,9 @@ public class ConsumerBucketActor extends AbstractActor {
} }
private void doDispatch(final ConsumerBucket consumerBucket) { private void doDispatch(final ConsumerBucket consumerBucket) {
if (CollectionUtils.isEmpty(consumerBucket.getBuckets())) {
return;
}
// 查询桶对应组信息 // 查询桶对应组信息
List<GroupConfig> groupConfigs = accessTemplate.getGroupConfigAccess().list( List<GroupConfig> groupConfigs = accessTemplate.getGroupConfigAccess().list(

View File

@ -69,6 +69,10 @@
<groupId>com.aizuda</groupId> <groupId>com.aizuda</groupId>
<artifactId>easy-retry-server-retry-task</artifactId> <artifactId>easy-retry-server-retry-task</artifactId>
</dependency> </dependency>
<dependency>
<groupId>com.aizuda</groupId>
<artifactId>easy-retry-server-job-task</artifactId>
</dependency>
</dependencies> </dependencies>
<build> <build>

View File

@ -13,4 +13,7 @@ import lombok.EqualsAndHashCode;
@Data @Data
public class JobBatchQueryVO extends BaseQueryVO { public class JobBatchQueryVO extends BaseQueryVO {
private Long jobId ; private Long jobId ;
private String jobName;
private Integer taskBatchStatus;
private String groupName;
} }

View File

@ -61,8 +61,8 @@ public class JobRequestVO {
/** /**
* 执行器名称 * 执行器名称
*/ */
@NotBlank(message = "executorName 不能为空") @NotBlank(message = "executorInfo 不能为空")
private String executorName; private String executorInfo;
/** /**
* 触发类型 1.CRON 表达式 2. 固定时间 * 触发类型 1.CRON 表达式 2. 固定时间

View File

@ -37,7 +37,7 @@ public class JobTaskResponseVO {
/** /**
* 执行的状态 0失败 1成功 * 执行的状态 0失败 1成功
*/ */
private Integer executeStatus; private Integer taskStatus;
/** /**
* 重试次数 * 重试次数
@ -52,7 +52,7 @@ public class JobTaskResponseVO {
/** /**
* 客户端ID * 客户端ID
*/ */
private String clientId; private String clientInfo;
/** /**
* 执行方法参数 * 执行方法参数

View File

@ -1,6 +1,7 @@
package com.aizuda.easy.retry.server.web.service.convert; package com.aizuda.easy.retry.server.web.service.convert;
import com.aizuda.easy.retry.server.web.model.response.JobBatchResponseVO; import com.aizuda.easy.retry.server.web.model.response.JobBatchResponseVO;
import com.aizuda.easy.retry.template.datasource.persistence.dataobject.JobBatchResponseDO;
import com.aizuda.easy.retry.template.datasource.persistence.po.Job; import com.aizuda.easy.retry.template.datasource.persistence.po.Job;
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTaskBatch; import com.aizuda.easy.retry.template.datasource.persistence.po.JobTaskBatch;
import org.mapstruct.Mapper; import org.mapstruct.Mapper;
@ -20,7 +21,7 @@ public interface JobBatchResponseVOConverter {
JobBatchResponseVOConverter INSTANCE = Mappers.getMapper(JobBatchResponseVOConverter.class); JobBatchResponseVOConverter INSTANCE = Mappers.getMapper(JobBatchResponseVOConverter.class);
List<JobBatchResponseVO> toJobBatchResponseVOs(List<JobTaskBatch> jobBatches); List<JobBatchResponseVO> toJobBatchResponseVOs(List<JobBatchResponseDO> jobBatches);
@Mappings({ @Mappings({
@Mapping(source = "jobBatch.groupName", target = "groupName"), @Mapping(source = "jobBatch.groupName", target = "groupName"),

View File

@ -1,11 +1,14 @@
package com.aizuda.easy.retry.server.web.service.impl; package com.aizuda.easy.retry.server.web.service.impl;
import cn.hutool.core.util.StrUtil;
import com.aizuda.easy.retry.common.core.enums.StatusEnum; import com.aizuda.easy.retry.common.core.enums.StatusEnum;
import com.aizuda.easy.retry.server.web.model.base.PageResult; import com.aizuda.easy.retry.server.web.model.base.PageResult;
import com.aizuda.easy.retry.server.web.model.request.JobBatchQueryVO; import com.aizuda.easy.retry.server.web.model.request.JobBatchQueryVO;
import com.aizuda.easy.retry.server.web.model.response.JobBatchResponseVO; import com.aizuda.easy.retry.server.web.model.response.JobBatchResponseVO;
import com.aizuda.easy.retry.server.web.service.JobBatchService; import com.aizuda.easy.retry.server.web.service.JobBatchService;
import com.aizuda.easy.retry.server.web.service.convert.JobBatchResponseVOConverter; import com.aizuda.easy.retry.server.web.service.convert.JobBatchResponseVOConverter;
import com.aizuda.easy.retry.template.datasource.persistence.dataobject.JobBatchQueryDO;
import com.aizuda.easy.retry.template.datasource.persistence.dataobject.JobBatchResponseDO;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper; import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskBatchMapper; import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskBatchMapper;
import com.aizuda.easy.retry.template.datasource.persistence.po.Job; import com.aizuda.easy.retry.template.datasource.persistence.po.Job;
@ -36,17 +39,18 @@ public class JobBatchServiceImpl implements JobBatchService {
PageDTO<JobTaskBatch> pageDTO = new PageDTO<>(queryVO.getPage(), queryVO.getSize()); PageDTO<JobTaskBatch> pageDTO = new PageDTO<>(queryVO.getPage(), queryVO.getSize());
LambdaQueryWrapper<JobTaskBatch> queryWrapper = new LambdaQueryWrapper<>(); JobBatchQueryDO jobBatchQueryDO = new JobBatchQueryDO();
queryWrapper.eq(JobTaskBatch::getDeleted, StatusEnum.NO.getStatus()); if (StrUtil.isNotBlank(queryVO.getJobName())) {
jobBatchQueryDO.setJobName("%" + queryVO.getJobName() + "%");
if (Objects.nonNull(queryVO.getJobId())) {
queryWrapper.eq(JobTaskBatch::getJobId, queryVO.getJobId());
} }
PageDTO<JobTaskBatch> selectPage = jobTaskBatchMapper.selectPage(pageDTO, queryWrapper); jobBatchQueryDO.setJobId(queryVO.getJobId());
jobBatchQueryDO.setTaskBatchStatus(queryVO.getTaskBatchStatus());
jobBatchQueryDO.setGroupName(queryVO.getGroupName());
List<JobBatchResponseDO> batchResponseDOList = jobTaskBatchMapper.selectJobBatchList(pageDTO, jobBatchQueryDO);
List<JobBatchResponseVO> batchResponseVOList = JobBatchResponseVOConverter.INSTANCE.toJobBatchResponseVOs( List<JobBatchResponseVO> batchResponseVOList = JobBatchResponseVOConverter.INSTANCE.toJobBatchResponseVOs(
selectPage.getRecords()); batchResponseDOList);
return new PageResult<>(pageDTO, batchResponseVOList); return new PageResult<>(pageDTO, batchResponseVOList);
} }

View File

@ -4,7 +4,9 @@ import cn.hutool.core.lang.Assert;
import cn.hutool.core.util.StrUtil; import cn.hutool.core.util.StrUtil;
import com.aizuda.easy.retry.common.core.enums.StatusEnum; import com.aizuda.easy.retry.common.core.enums.StatusEnum;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException; import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.retry.task.support.strategy.WaitStrategies; import com.aizuda.easy.retry.server.job.task.support.WaitStrategy;
import com.aizuda.easy.retry.server.job.task.support.strategy.WaitStrategies.WaitStrategyContext;
import com.aizuda.easy.retry.server.job.task.support.strategy.WaitStrategies.WaitStrategyEnum;
import com.aizuda.easy.retry.server.web.model.base.PageResult; import com.aizuda.easy.retry.server.web.model.base.PageResult;
import com.aizuda.easy.retry.server.web.model.request.JobQueryVO; import com.aizuda.easy.retry.server.web.model.request.JobQueryVO;
import com.aizuda.easy.retry.server.web.model.request.JobRequestVO; import com.aizuda.easy.retry.server.web.model.request.JobRequestVO;
@ -20,6 +22,7 @@ import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.time.LocalDateTime;
import java.util.List; import java.util.List;
import java.util.Objects; import java.util.Objects;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -47,13 +50,14 @@ public class JobServiceImpl implements JobService {
} }
if (StrUtil.isNotBlank(queryVO.getJobName())) { if (StrUtil.isNotBlank(queryVO.getJobName())) {
queryWrapper.eq(Job::getJobName, queryVO.getJobName()); queryWrapper.like(Job::getJobName, "%" + queryVO.getJobName() + "%");
} }
if (Objects.nonNull(queryVO.getJobStatus())) { if (Objects.nonNull(queryVO.getJobStatus())) {
queryWrapper.eq(Job::getJobStatus, queryVO.getJobStatus()); queryWrapper.eq(Job::getJobStatus, queryVO.getJobStatus());
} }
queryWrapper.orderByDesc(Job::getId);
PageDTO<Job> selectPage = jobMapper.selectPage(pageDTO, queryWrapper); PageDTO<Job> selectPage = jobMapper.selectPage(pageDTO, queryWrapper);
List<JobResponseVO> jobResponseList = JobResponseVOConverter.INSTANCE.toJobResponseVOs(selectPage.getRecords()); List<JobResponseVO> jobResponseList = JobResponseVOConverter.INSTANCE.toJobResponseVOs(selectPage.getRecords());
@ -71,7 +75,12 @@ public class JobServiceImpl implements JobService {
@Override @Override
public boolean saveJob(JobRequestVO jobRequestVO) { public boolean saveJob(JobRequestVO jobRequestVO) {
Job job = JobConverter.INSTANCE.toJob(jobRequestVO); Job job = JobConverter.INSTANCE.toJob(jobRequestVO);
job.setNextTriggerAt(WaitStrategies.randomWait(1, TimeUnit.SECONDS, 60, TimeUnit.SECONDS).computeRetryTime(null)); WaitStrategy waitStrategy = WaitStrategyEnum.getWaitStrategy(jobRequestVO.getTriggerType());
WaitStrategyContext waitStrategyContext = new WaitStrategyContext();
waitStrategyContext.setTriggerType(jobRequestVO.getTriggerType());
waitStrategyContext.setTriggerInterval(jobRequestVO.getTriggerInterval());
waitStrategyContext.setNextTriggerAt(LocalDateTime.now());
job.setNextTriggerAt(waitStrategy.computeRetryTime(waitStrategyContext));
return 1 == jobMapper.insert(job); return 1 == jobMapper.insert(job);
} }

View File

@ -71,7 +71,7 @@ const enums = {
'color': '#e1f52d' 'color': '#e1f52d'
} }
}, },
taskStatus: { taskBatchStatus: {
'1': { '1': {
'name': '待处理', 'name': '待处理',
'color': '#64a6ea' 'color': '#64a6ea'
@ -113,20 +113,8 @@ const enums = {
'name': '任务已关闭', 'name': '任务已关闭',
'color': '#087da1' 'color': '#087da1'
} }
// '4': {
// 'name': '失败',
// 'color': '#f52d80'
// },
// '5': {
// 'name': '停止',
// 'color': '#ac2df5'
// },
// '6': {
// 'name': '取消',
// 'color': '#f5732d'
// }
}, },
executeStatus: { taskStatus: {
'2': { '2': {
'name': '运行中', 'name': '运行中',
'color': '#1b7ee5' 'color': '#1b7ee5'

View File

@ -50,22 +50,22 @@
</a-form-item> </a-form-item>
</a-col> </a-col>
<a-col :lg="6" :md="12" :sm="24"> <a-col :lg="6" :md="12" :sm="24">
<a-form-item> <!-- <a-form-item>-->
<span slot="label"> <!-- <span slot="label">-->
路由策略&nbsp; <!-- 路由策略&nbsp;-->
<a :href="officialWebsite + '/pages/32e4a0/#什么是路由策略'" target="_blank"> <!-- <a :href="officialWebsite + '/pages/32e4a0/#什么是路由策略'" target="_blank">-->
<a-icon type="question-circle-o" /> <!-- <a-icon type="question-circle-o" />-->
</a> <!-- </a>-->
</span> <!-- </span>-->
<a-select <!-- <a-select-->
placeholder="请选择路由策略" <!-- placeholder="请选择路由策略"-->
v-decorator="[ <!-- v-decorator="[-->
'routeKey', <!-- 'routeKey',-->
{rules: [{ required: true, message: '请选择路由策略'}]} <!-- {rules: [{ required: true, message: '请选择路由策略'}]}-->
]" > <!-- ]" >-->
<a-select-option :value="key" v-for="(value, key) in routeKey" :key="key">{{ value }}</a-select-option> <!-- <a-select-option :value="key" v-for="(value, key) in routeKey" :key="key">{{ value }}</a-select-option>-->
</a-select> <!-- </a-select>-->
</a-form-item> <!-- </a-form-item>-->
</a-col> </a-col>
<a-col :lg="6" :md="12" :sm="24"> <a-col :lg="6" :md="12" :sm="24">
<a-form-item label="描述"> <a-form-item label="描述">

View File

@ -59,7 +59,7 @@ export default {
data () { data () {
return { return {
jobBatchInfo: null, jobBatchInfo: null,
taskStatus: enums.taskStatus, taskStatus: enums.taskBatchStatus,
operationReason: enums.operationReason, operationReason: enums.operationReason,
taskType: enums.taskType, taskType: enums.taskType,
triggerType: enums.triggerType, triggerType: enums.triggerType,

View File

@ -15,39 +15,35 @@
</a-form-item> </a-form-item>
</a-col> </a-col>
<a-col :md="8" :sm="24"> <a-col :md="8" :sm="24">
<a-form-item label="场景名称"> <a-form-item label="任务名称">
<a-select v-model="queryParam.sceneName" placeholder="请选择场景名称" allowClear> <a-input v-model="queryParam.jobName" placeholder="请输入任务名称" allowClear />
<a-select-option v-for="item in sceneList" :value="item.sceneName" :key="item.sceneName">
{{ item.sceneName }}</a-select-option
>
</a-select>
</a-form-item> </a-form-item>
</a-col> </a-col>
<a-col :md="8" :sm="24"> <a-col :md="8" :sm="24">
<a-form-item label="状态"> <a-form-item label="状态">
<a-select v-model="queryParam.jobStatus" placeholder="请选择状态" allowClear> <a-select v-model="queryParam.taskBatchStatus" placeholder="请选择状态" allowClear>
<a-select-option v-for="(index, value) in jobStatus" :value="value" :key="value"> <a-select-option v-for="(item, index) in taskStatus" :value="index" :key="index">
{{ index.name }}</a-select-option {{ item.name }}</a-select-option
> >
</a-select> </a-select>
</a-form-item> </a-form-item>
</a-col> </a-col>
<template v-if="advanced"> <template v-if="advanced">
<a-col :md="8" :sm="24"> <!-- <a-col :md="8" :sm="24">-->
<a-form-item label="业务编号"> <!-- <a-form-item label="业务编号">-->
<a-input v-model="queryParam.bizNo" placeholder="请输入业务编号" allowClear /> <!-- <a-input v-model="queryParam.bizNo" placeholder="请输入业务编号" allowClear />-->
</a-form-item> <!-- </a-form-item>-->
</a-col> <!-- </a-col>-->
<a-col :md="8" :sm="24"> <!-- <a-col :md="8" :sm="24">-->
<a-form-item label="幂等id"> <!-- <a-form-item label="幂等id">-->
<a-input v-model="queryParam.idempotentId" placeholder="请输入幂等id" allowClear /> <!-- <a-input v-model="queryParam.idempotentId" placeholder="请输入幂等id" allowClear />-->
</a-form-item> <!-- </a-form-item>-->
</a-col> <!-- </a-col>-->
<a-col :md="8" :sm="24"> <!-- <a-col :md="8" :sm="24">-->
<a-form-item label="UniqueId"> <!-- <a-form-item label="UniqueId">-->
<a-input v-model="queryParam.uniqueId" placeholder="请输入唯一id" allowClear/> <!-- <a-input v-model="queryParam.uniqueId" placeholder="请输入唯一id" allowClear/>-->
</a-form-item> <!-- </a-form-item>-->
</a-col> <!-- </a-col>-->
</template> </template>
<a-col :md="(!advanced && 8) || 24" :sm="24"> <a-col :md="(!advanced && 8) || 24" :sm="24">
<span <span
@ -87,9 +83,9 @@
<span slot="serial" slot-scope="text, record"> <span slot="serial" slot-scope="text, record">
{{ record.id }} {{ record.id }}
</span> </span>
<span slot="taskStatus" slot-scope="text"> <span slot="taskBatchStatus" slot-scope="text">
<a-tag :color="taskStatus[text].color"> <a-tag :color="taskBatchStatus[text].color">
{{ taskStatus[text].name }} {{ taskBatchStatus[text].name }}
</a-tag> </a-tag>
</span> </span>
<span slot="operationReason" slot-scope="text"> <span slot="operationReason" slot-scope="text">
@ -99,41 +95,41 @@
<template> <template>
<a @click="handleInfo(record)">详情</a> <a @click="handleInfo(record)">详情</a>
<a-divider type="vertical" /> <a-divider type="vertical" />
<a-popconfirm <!-- <a-popconfirm-->
title="是否暂停?" <!-- title="是否暂停?"-->
ok-text="恢复" <!-- ok-text="恢复"-->
cancel-text="取消" <!-- cancel-text="取消"-->
@confirm="handleSuspend(record)" <!-- @confirm="handleSuspend(record)"-->
> <!-- >-->
<a href="javascript:;" v-if="record.retryStatus === 0">暂停</a> <!-- <a href="javascript:;" v-if="record.retryStatus === 0">暂停</a>-->
</a-popconfirm> <!-- </a-popconfirm>-->
<a-divider type="vertical" v-if="record.retryStatus === 0" /> <!-- <a-divider type="vertical" v-if="record.retryStatus === 0" />-->
<a-popconfirm <!-- <a-popconfirm-->
title="是否恢复?" <!-- title="是否恢复?"-->
ok-text="恢复" <!-- ok-text="恢复"-->
cancel-text="取消" <!-- cancel-text="取消"-->
@confirm="handleRecovery(record)" <!-- @confirm="handleRecovery(record)"-->
> <!-- >-->
<a href="javascript:;" v-if="record.retryStatus === 3">恢复</a> <!-- <a href="javascript:;" v-if="record.retryStatus === 3">恢复</a>-->
</a-popconfirm> <!-- </a-popconfirm>-->
<a-divider type="vertical" v-if="record.retryStatus === 3" /> <!-- <a-divider type="vertical" v-if="record.retryStatus === 3" />-->
<a-popconfirm <!-- <a-popconfirm-->
title="是否完成?" <!-- title="是否完成?"-->
ok-text="完成" <!-- ok-text="完成"-->
cancel-text="取消" <!-- cancel-text="取消"-->
@confirm="handleFinish(record)" <!-- @confirm="handleFinish(record)"-->
> <!-- >-->
<a href="javascript:;" v-if="record.retryStatus !== 1 && record.retryStatus !== 2">完成</a> <!-- <a href="javascript:;" v-if="record.retryStatus !== 1 && record.retryStatus !== 2">完成</a>-->
</a-popconfirm> <!-- </a-popconfirm>-->
<a-divider type="vertical" v-if="record.retryStatus !== 1 && record.retryStatus !== 2" /> <!-- <a-divider type="vertical" v-if="record.retryStatus !== 1 && record.retryStatus !== 2" />-->
<a-popconfirm <!-- <a-popconfirm-->
title="是否执行任务?" <!-- title="是否执行任务?"-->
ok-text="执行" <!-- ok-text="执行"-->
cancel-text="取消" <!-- cancel-text="取消"-->
@confirm="handleTrigger(record)" <!-- @confirm="handleTrigger(record)"-->
> <!-- >-->
<a href="javascript:;" v-if="record.retryStatus !== 1 && record.retryStatus !== 2">执行</a> <!-- <a href="javascript:;" v-if="record.retryStatus !== 1 && record.retryStatus !== 2">执行</a>-->
</a-popconfirm> <!-- </a-popconfirm>-->
</template> </template>
</span> </span>
@ -167,7 +163,7 @@ export default {
advanced: false, advanced: false,
// //
queryParam: {}, queryParam: {},
taskStatus: enums.taskStatus, taskStatus: enums.taskBatchStatus,
operationReason: enums.operationReason, operationReason: enums.operationReason,
// //
columns: [ columns: [
@ -180,15 +176,15 @@ export default {
dataIndex: 'groupName', dataIndex: 'groupName',
ellipsis: true ellipsis: true
}, },
// { {
// title: '', title: '任务名称',
// dataIndex: 'jobName', dataIndex: 'jobName',
// ellipsis: true ellipsis: true
// }, },
{ {
title: '状态', title: '状态',
dataIndex: 'taskStatus', dataIndex: 'taskBatchStatus',
scopedSlots: { customRender: 'taskStatus' } scopedSlots: { customRender: 'taskBatchStatus' }
}, },
{ {
title: '开始执行时间', title: '开始执行时间',

View File

@ -36,11 +36,6 @@ export default {
scopedSlots: { customRender: 'serial' }, scopedSlots: { customRender: 'serial' },
width: '5%' width: '5%'
}, },
{
title: '客户端地址',
dataIndex: 'clientAddress',
width: '10%'
},
{ {
title: '信息', title: '信息',
dataIndex: 'message', dataIndex: 'message',

View File

@ -87,11 +87,14 @@
<span slot="serial" slot-scope="text, record"> <span slot="serial" slot-scope="text, record">
{{ record.id }} {{ record.id }}
</span> </span>
<span slot="executeStatus" slot-scope="text"> <span slot="taskStatus" slot-scope="text">
<a-tag :color="executeStatus[text].color"> <a-tag :color="taskStatus[text].color">
{{ executeStatus[text].name }} {{ taskStatus[text].name }}
</a-tag> </a-tag>
</span> </span>
<span slot="clientInfo" slot-scope="text">
{{ text !== '' ? text.split('@')[1] : '' }}
</span>
<p slot="expandedRowRender" style="margin: 0" slot-scope="record"> <p slot="expandedRowRender" style="margin: 0" slot-scope="record">
执行结果: {{ record.resultMessage }}<br/> 执行结果: {{ record.resultMessage }}<br/>
参数: {{ record.argsStr }} 参数: {{ record.argsStr }}
@ -132,7 +135,7 @@ export default {
advanced: false, advanced: false,
// //
queryParam: {}, queryParam: {},
executeStatus: enums.executeStatus, taskStatus: enums.taskStatus,
// //
columns: [ columns: [
{ {
@ -143,6 +146,11 @@ export default {
title: '组名称', title: '组名称',
dataIndex: 'groupName' dataIndex: 'groupName'
}, },
{
title: '地址',
dataIndex: 'clientInfo',
scopedSlots: { customRender: 'clientInfo' }
},
{ {
title: '参数', title: '参数',
dataIndex: 'argsStr', dataIndex: 'argsStr',
@ -155,8 +163,8 @@ export default {
}, },
{ {
title: '状态', title: '状态',
dataIndex: 'executeStatus', dataIndex: 'taskStatus',
scopedSlots: { customRender: 'executeStatus' } scopedSlots: { customRender: 'taskStatus' }
}, },
{ {
title: '重试次数', title: '重试次数',

View File

@ -4,7 +4,7 @@
<div></div> <div></div>
</page-header-wrapper> </page-header-wrapper>
<a-card :body-style="{padding: '24px 32px'}" :bordered="false" :loading="loading"> <a-card :body-style="{padding: '24px 32px'}" :bordered="false" :loading="loading">
<a-form @submit="handleSubmit" :form="form" class="form-row" layout="vertical" style="width: 35%;margin: auto;"> <a-form @submit="handleSubmit" :form="form" class="form-row" layout="vertical" style="width: 40%;margin: auto;">
<a-row class="form-row" :gutter="16"> <a-row class="form-row" :gutter="16">
<a-col :lg="24" :md="24" :sm="24"> <a-col :lg="24" :md="24" :sm="24">
<a-form-item> <a-form-item>
@ -132,7 +132,7 @@
type="textarea" type="textarea"
:rows="1" :rows="1"
v-decorator="[ v-decorator="[
'executorName', 'executorInfo',
{rules: [{ required: true, message: '请输入执行器名称', whitespace: true}]} {rules: [{ required: true, message: '请输入执行器名称', whitespace: true}]}
]" /> ]" />
</a-form-item> </a-form-item>
@ -169,7 +169,7 @@
</a-col> </a-col>
</a-row> </a-row>
<a-row class="form-row" :gutter="16"> <a-row class="form-row" :gutter="16">
<a-col :lg="6" :md="12" :sm="12"> <a-col :lg="6" :md="12" :sm="24">
<a-form-item label="超时时间(秒)"> <a-form-item label="超时时间(秒)">
<a-input-number <a-input-number
id="inputNumber" id="inputNumber"
@ -184,7 +184,7 @@
]" /> ]" />
</a-form-item> </a-form-item>
</a-col> </a-col>
<a-col :lg="6" :md="12" :sm="12"> <a-col :lg="6" :md="12" :sm="24">
<a-form-item label="最大重试次数"> <a-form-item label="最大重试次数">
<a-input-number <a-input-number
:min="1" :min="1"
@ -197,7 +197,7 @@
]" /> ]" />
</a-form-item> </a-form-item>
</a-col> </a-col>
<a-col :lg="6" :md="12" :sm="12"> <a-col :lg="6" :md="12" :sm="24">
<a-form-item label="重试间隔(秒)"> <a-form-item label="重试间隔(秒)">
<a-input-number <a-input-number
:min="1" :min="1"
@ -211,7 +211,7 @@
]" /> ]" />
</a-form-item> </a-form-item>
</a-col> </a-col>
<a-col :lg="6" :md="6" :sm="12"> <a-col :lg="6" :md="12" :sm="24">
<a-form-item label="并行数"> <a-form-item label="并行数">
<a-input-number <a-input-number
:min="1" :min="1"