feat: 2.4.0

1. 完成任务调度流程设计
This commit is contained in:
byteblogs168 2023-10-11 18:46:02 +08:00
parent ce94194148
commit 18b952234c
26 changed files with 302 additions and 180 deletions

View File

@ -18,6 +18,7 @@ public enum JobOperationReasonEnum {
NONE(0, StrUtil.EMPTY),
EXECUTE_TIMEOUT(1, "执行超时"),
NOT_CLIENT(2, "无客户端节点"),
JOB_CLOSED(3, "任务已关闭"),
;
private final int reason;

View File

@ -4,21 +4,18 @@ import lombok.AllArgsConstructor;
import lombok.Getter;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
/**
* @author: www.byteblogs.com
* @date : 2023-09-26 14:26
* @since : 2.4.0
*/
@AllArgsConstructor
@Getter
public enum JobTaskStatusEnum {
/**
* 待处理
*/
WAITING(1),
/**
* 处理中
*/
@ -35,7 +32,7 @@ public enum JobTaskStatusEnum {
FAIL(4),
/**
* 任务停止成功
* 任务停止
*/
STOP(5),
@ -44,7 +41,7 @@ public enum JobTaskStatusEnum {
private final int status;
public static final List<Integer> NOT_COMPLETE = Arrays.asList(WAITING.status, RUNNING.status);
public static final List<Integer> NOT_COMPLETE = Collections.singletonList(RUNNING.status);
public static final List<Integer> COMPLETED = Arrays.asList(SUCCESS.status, FAIL.status, STOP.status);
}

View File

@ -2,6 +2,7 @@ package com.aizuda.easy.retry.server.common.client;
import cn.hutool.core.lang.Assert;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.github.rholder.retry.RetryListener;
import java.lang.reflect.Proxy;
import java.util.Objects;
@ -21,6 +22,11 @@ public class RequestBuilder<T, R> {
private String hostIp;
private Integer hostPort;
private String contextPath;
private boolean failRetry;
private int retryTimes = 3;
private int retryInterval = 1;
private RetryListener retryListener = new SimpleRetryListener();
public static <T, R> RequestBuilder<T, R> newBuilder() {
return new RequestBuilder<>();
@ -56,6 +62,27 @@ public class RequestBuilder<T, R> {
return this;
}
public RequestBuilder<T, R> failRetry(boolean failRetry) {
this.failRetry = failRetry;
return this;
}
public RequestBuilder<T, R> retryTimes(int retryTimes) {
this.retryTimes = retryTimes;
return this;
}
public RequestBuilder<T, R> retryInterval(int retryInterval) {
this.retryInterval = retryInterval;
return this;
}
public RequestBuilder<T, R> retryListener(RetryListener retryListener) {
this.retryListener = retryListener;
return this;
}
public T build() {
if (Objects.isNull(clintInterface)) {
throw new EasyRetryServerException("clintInterface cannot be null");
@ -73,7 +100,8 @@ public class RequestBuilder<T, R> {
throw new EasyRetryServerException("class not found exception to: [{}]", clintInterface.getName());
}
RpcClientInvokeHandler clientInvokeHandler = new RpcClientInvokeHandler(groupName, hostId, hostIp, hostPort, contextPath);
RpcClientInvokeHandler clientInvokeHandler = new RpcClientInvokeHandler(
groupName, hostId, hostIp, hostPort, contextPath, failRetry, retryTimes, retryInterval, retryListener);
return (T) Proxy.newProxyInstance(clintInterface.getClassLoader(),
new Class[]{clintInterface}, clientInvokeHandler);

View File

@ -4,7 +4,6 @@ import cn.hutool.core.lang.Assert;
import cn.hutool.core.util.URLUtil;
import com.aizuda.easy.retry.common.core.constant.SystemConstants;
import com.aizuda.easy.retry.common.core.context.SpringContext;
import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.common.core.model.Result;
import com.aizuda.easy.retry.common.core.util.HostUtils;
import com.aizuda.easy.retry.common.core.util.JsonUtil;
@ -16,14 +15,13 @@ import com.aizuda.easy.retry.server.common.client.annotation.Param;
import com.aizuda.easy.retry.server.common.dto.RegisterNodeInfo;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.common.handler.ClientNodeAllocateHandler;
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask;
import com.github.rholder.retry.Attempt;
import com.github.rholder.retry.RetryException;
import com.github.rholder.retry.RetryListener;
import com.github.rholder.retry.Retryer;
import com.github.rholder.retry.RetryerBuilder;
import com.github.rholder.retry.StopStrategies;
import com.github.rholder.retry.WaitStrategies;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;
@ -32,7 +30,6 @@ import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.ResponseEntity;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ReflectionUtils;
import org.springframework.web.client.ResourceAccessException;
import org.springframework.web.client.RestClientException;
import org.springframework.web.client.RestTemplate;
@ -45,7 +42,6 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
/**
@ -56,28 +52,20 @@ import java.util.concurrent.TimeUnit;
* @since 2.0.0
*/
@Slf4j
@AllArgsConstructor
public class RpcClientInvokeHandler implements InvocationHandler {
public static final String URL = "http://{0}:{1}/{2}";
private final String groupName;
private String groupName;
private String hostId;
private String hostIp;
private Integer hostPort;
private String contextPath;
public RpcClientInvokeHandler(
final String groupName,
final String hostId,
final String hostIp,
final Integer hostPort,
final String contextPath) {
this.groupName = groupName;
this.hostId = hostId;
this.hostIp = hostIp;
this.hostPort = hostPort;
this.contextPath = contextPath;
}
private boolean failRetry;
private int retryTimes;
private int retryInterval;
private RetryListener retryListener;
@Override
public Result invoke(final Object proxy, final Method method, final Object[] args) throws Throwable {
@ -124,7 +112,7 @@ public class RpcClientInvokeHandler implements InvocationHandler {
RestTemplate restTemplate = SpringContext.CONTEXT.getBean(RestTemplate.class);
Retryer<Result> retryer = buildResultRetryer(mapping);
Retryer<Result> retryer = buildResultRetryer();
Result result = retryer.call(() -> {
ResponseEntity<Result> response = restTemplate.exchange(
@ -188,21 +176,12 @@ public class RpcClientInvokeHandler implements InvocationHandler {
return null;
}
private Retryer<Result> buildResultRetryer(Mapping mapping) throws InstantiationException, IllegalAccessException, NoSuchMethodException {
Class<? extends RetryListener> retryListenerClazz = mapping.retryListener();
RetryListener retryListener = retryListenerClazz.newInstance();
Method method = retryListenerClazz.getMethod("onRetry", Attempt.class);
private Retryer<Result> buildResultRetryer() {
Retryer<Result> retryer = RetryerBuilder.<Result>newBuilder()
.retryIfException(throwable -> mapping.failRetry())
.withStopStrategy(StopStrategies.stopAfterAttempt(mapping.retryTimes()))
.withWaitStrategy(WaitStrategies.fixedWait(mapping.retryInterval(), TimeUnit.SECONDS))
.withRetryListener(new RetryListener() {
@Override
public <V> void onRetry(Attempt<V> attempt) {
ReflectionUtils.invokeMethod(method, retryListener, attempt);
}
})
.retryIfException(throwable -> failRetry)
.withStopStrategy(StopStrategies.stopAfterAttempt(retryTimes))
.withWaitStrategy(WaitStrategies.fixedWait(retryInterval, TimeUnit.SECONDS))
.withRetryListener(retryListener)
.build();
return retryer;
}

View File

@ -1,8 +1,6 @@
package com.aizuda.easy.retry.server.common.client.annotation;
import com.aizuda.easy.retry.server.common.client.RequestMethod;
import com.aizuda.easy.retry.server.common.client.SimpleRetryListener;
import com.github.rholder.retry.RetryListener;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
@ -26,16 +24,10 @@ public @interface Mapping {
String path() default "";
/**
* 是否支持失败转移
* @return false or trur
*/
boolean failover() default false;
boolean failRetry() default false;
int retryTimes() default 3;
int retryInterval() default 1;
Class<? extends RetryListener> retryListener() default SimpleRetryListener.class;
}

View File

@ -18,4 +18,8 @@ public class ScanTask {
private String groupName;
private Set<Integer> buckets;
private long size;
private long startId;
}

View File

@ -0,0 +1,46 @@
package com.aizuda.easy.retry.server.common.util;
import com.aizuda.easy.retry.server.common.dto.PartitionTask;
import org.springframework.util.CollectionUtils;
import java.util.List;
import java.util.Optional;
import java.util.function.Consumer;
import java.util.function.Function;
/**
* @author: www.byteblogs.com
* @date : 2023-10-11 10:58
* @since : 2.4.0
*/
public class PartitionTaskUtils {
private PartitionTaskUtils() {
}
public static long process(
Function<Long, List<? extends PartitionTask>> dataSource, Consumer<List<? extends PartitionTask>> task,
long startId) {
int total = 0;
do {
List<? extends PartitionTask> products = dataSource.apply(startId);
if (CollectionUtils.isEmpty(products)) {
// 没有查询到数据直接退出
break;
}
total += products.size();
task.accept(products);
startId = maxId(products);
} while (startId > 0);
return total;
}
private static long maxId(List<? extends PartitionTask> products) {
Optional<Long> max = products.stream().map(PartitionTask::getId).max(Long::compareTo);
return max.orElse(-1L) + 1;
}
}

View File

@ -33,6 +33,11 @@ public interface JobTaskConverter {
)
JobTaskPrepareDTO toJobTaskPrepare(JobPartitionTask job);
@Mappings(
@Mapping(source = "id", target = "jobId")
)
JobTaskPrepareDTO toJobTaskPrepare(Job job);
JobTaskBatchGeneratorContext toJobTaskGeneratorContext(JobTaskPrepareDTO jobTaskPrepareDTO);
JobTaskBatchGeneratorContext toJobTaskGeneratorContext(BlockStrategies.BlockStrategyContext context);
@ -70,6 +75,9 @@ public interface JobTaskConverter {
JobExecutorResultDTO toJobExecutorResultDTO(ClientCallbackContext context);
@Mappings(
@Mapping(source = "id", target = "taskId")
)
JobExecutorResultDTO toJobExecutorResultDTO(JobTask jobTask);
RealStopTaskInstanceDTO toRealStopTaskInstanceDTO(TaskStopJobContext context);

View File

@ -0,0 +1,25 @@
package com.aizuda.easy.retry.server.job.task.client;
import com.aizuda.easy.retry.client.model.StopJobDTO;
import com.aizuda.easy.retry.client.model.request.DispatchJobRequest;
import com.aizuda.easy.retry.common.core.model.Result;
import com.aizuda.easy.retry.server.common.client.RequestMethod;
import com.aizuda.easy.retry.server.common.client.annotation.Body;
import com.aizuda.easy.retry.server.common.client.annotation.Mapping;
/**
* 调用客户端接口
*
* @author: www.byteblogs.com
* @date : 2023-06-19 15:40
* @since 2.0.0
*/
public interface JobRpcClient {
@Mapping(path = "/job/stop/v1", method = RequestMethod.POST)
Result<Boolean> stop(@Body StopJobDTO stopJobDTO);
@Mapping(path = "/job/dispatch/v1", method = RequestMethod.POST)
Result<Boolean> dispatch(@Body DispatchJobRequest dispatchJobRequest);
}

View File

@ -3,17 +3,29 @@ package com.aizuda.easy.retry.server.job.task.dispatch;
import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import cn.hutool.core.lang.Assert;
import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.easy.retry.common.core.util.JsonUtil;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.cache.CacheGroupScanActor;
import com.aizuda.easy.retry.server.common.dto.ScanTask;
import com.aizuda.easy.retry.server.common.enums.TaskTypeEnum;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.job.task.JobTaskConverter;
import com.aizuda.easy.retry.server.job.task.WaitStrategy;
import com.aizuda.easy.retry.server.job.task.dto.JobExecutorResultDTO;
import com.aizuda.easy.retry.server.job.task.dto.JobLogDTO;
import com.aizuda.easy.retry.server.job.task.dto.JobTaskPrepareDTO;
import com.aizuda.easy.retry.server.job.task.handler.helper.JobTaskBatchHelper;
import com.aizuda.easy.retry.server.job.task.strategy.WaitStrategies.WaitStrategyContext;
import com.aizuda.easy.retry.server.job.task.strategy.WaitStrategies.WaitStrategyEnum;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskMapper;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskBatchMapper;
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask;
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTaskBatch;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
@ -23,6 +35,7 @@ import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.TransactionCallbackWithoutResult;
import org.springframework.transaction.support.TransactionTemplate;
import java.time.LocalDateTime;
import java.util.Objects;
/**
@ -41,6 +54,8 @@ public class JobExecutorResultActor extends AbstractActor {
private TransactionTemplate transactionTemplate;
@Autowired
private JobTaskBatchHelper jobTaskBatchHelper;
@Autowired
private JobTaskBatchMapper jobTaskBatchMapper;
@Override
public Receive createReceive() {
@ -65,6 +80,20 @@ public class JobExecutorResultActor extends AbstractActor {
}
});
// TODO 60秒内的任务直接丢入时间轮中
if (Integer.parseInt("30") <= 60) {
if (jobTaskBatchMapper.selectCount(new LambdaQueryWrapper<JobTaskBatch>()
.eq(JobTaskBatch::getId, result.getTaskBatchId())
.in(JobTaskBatch::getTaskStatus, JobTaskBatchStatusEnum.NOT_COMPLETE)) <= 0) {
ActorRef scanActorRef = CacheGroupScanActor.get("DEFAULT_JOB_KEY", TaskTypeEnum.JOB);
ScanTask scanTask = new ScanTask();
scanTask.setBuckets(Sets.newHashSet(0));
scanTask.setSize(1);
scanTask.setStartId(result.getJobId());
scanActorRef.tell(scanTask, scanActorRef);
}
}
JobLogDTO jobLogDTO = JobTaskConverter.INSTANCE.toJobLogDTO(result);
jobLogDTO.setMessage(result.getMessage());
jobLogDTO.setClientId(result.getClientId());

View File

@ -10,6 +10,7 @@ import com.aizuda.easy.retry.server.common.cache.CacheConsumerGroup;
import com.aizuda.easy.retry.server.common.dto.PartitionTask;
import com.aizuda.easy.retry.server.common.dto.ScanTask;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.common.util.PartitionTaskUtils;
import com.aizuda.easy.retry.server.job.task.JobTaskConverter;
import com.aizuda.easy.retry.server.job.task.WaitStrategy;
import com.aizuda.easy.retry.server.job.task.dto.JobPartitionTask;
@ -70,7 +71,7 @@ public class ScanJobTaskActor extends AbstractActor {
private void doScan(final ScanTask scanTask) {
log.info("job scan start");
long total = process(startId -> listAvailableJobs(startId, scanTask), partitionTasks -> {
long total = PartitionTaskUtils.process(startId -> listAvailableJobs(startId, scanTask), partitionTasks -> {
for (final JobPartitionTask partitionTask : (List<JobPartitionTask>) partitionTasks) {
CacheConsumerGroup.addOrUpdate(partitionTask.getGroupName());
JobTaskPrepareDTO jobTaskPrepare = JobTaskConverter.INSTANCE.toJobTaskPrepare(partitionTask);
@ -92,18 +93,18 @@ public class ScanJobTaskActor extends AbstractActor {
ActorRef actorRef = ActorGenerator.jobTaskPrepareActor();
actorRef.tell(jobTaskPrepare, actorRef);
}
}, 0);
}, scanTask.getStartId());
log.info("job scan end. total:[{}]", total);
}
private List<JobPartitionTask> listAvailableJobs(Long startId, ScanTask scanTask) {
List<Job> jobs = jobMapper.selectPage(new PageDTO<Job>(0, 1000),
List<Job> jobs = jobMapper.selectPage(new PageDTO<Job>(0, scanTask.getSize()),
new LambdaQueryWrapper<Job>()
.eq(Job::getJobStatus, StatusEnum.YES.getStatus())
.in(Job::getBucketIndex, scanTask.getBuckets())
.le(Job::getNextTriggerAt, LocalDateTime.now().plusSeconds(6))
.le(Job::getNextTriggerAt, LocalDateTime.now().plusSeconds(60))
.eq(Job::getDeleted, StatusEnum.NO.getStatus())
.gt(Job::getId, startId)
).getRecords();
@ -111,27 +112,5 @@ public class ScanJobTaskActor extends AbstractActor {
return JobTaskConverter.INSTANCE.toJobPartitionTasks(jobs);
}
public long process(
Function<Long, List<? extends PartitionTask>> dataSource, Consumer<List<? extends PartitionTask>> task, long startId) {
int total = 0;
do {
List<? extends PartitionTask> products = dataSource.apply(startId);
if (CollectionUtils.isEmpty(products)) {
// 没有查询到数据直接退出
break;
}
total += products.size();
task.accept(products);
startId = maxId(products);
} while (startId > 0);
return total;
}
private static long maxId(List<? extends PartitionTask> products) {
Optional<Long> max = products.stream().map(PartitionTask::getId).max(Long::compareTo);
return max.orElse(-1L) + 1;
}
}

View File

@ -11,4 +11,6 @@ import lombok.Data;
public class JobTimerTaskDTO {
private Long taskBatchId;
private String groupName;
private Long jobId;
}

View File

@ -56,7 +56,8 @@ public class JobTaskBatchGenerator {
- System.currentTimeMillis();
JobTimerTaskDTO jobTimerTaskDTO = new JobTimerTaskDTO();
jobTimerTaskDTO.setTaskBatchId(jobTaskBatch.getId());
jobTimerTaskDTO.setGroupName(context.getGroupName());
jobTimerTaskDTO.setJobId(context.getJobId());
JobTimerWheelHandler.register(context.getGroupName(), jobTaskBatch.getId(),
new JobTimerTask(jobTimerTaskDTO), delay, TimeUnit.MILLISECONDS);

View File

@ -10,9 +10,9 @@ import com.aizuda.easy.retry.common.core.model.Result;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.cache.CacheRegisterTable;
import com.aizuda.easy.retry.server.common.client.RequestBuilder;
import com.aizuda.easy.retry.server.common.client.RpcClient;
import com.aizuda.easy.retry.server.common.dto.RegisterNodeInfo;
import com.aizuda.easy.retry.server.job.task.JobTaskConverter;
import com.aizuda.easy.retry.server.job.task.client.JobRpcClient;
import com.aizuda.easy.retry.server.job.task.dto.JobExecutorResultDTO;
import com.aizuda.easy.retry.server.job.task.dto.JobLogDTO;
import com.aizuda.easy.retry.server.job.task.dto.RealJobExecutorDTO;
@ -21,10 +21,6 @@ import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask;
import com.github.rholder.retry.Attempt;
import com.github.rholder.retry.RetryException;
import com.github.rholder.retry.RetryListener;
import com.github.rholder.retry.Retryer;
import com.github.rholder.retry.RetryerBuilder;
import com.github.rholder.retry.StopStrategies;
import com.github.rholder.retry.WaitStrategies;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
@ -32,7 +28,6 @@ import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
/**
* @author www.byteblogs.com
@ -60,7 +55,8 @@ public class RealJobExecutorActor extends AbstractActor {
private void doExecute(RealJobExecutorDTO realJobExecutorDTO) {
// 检查客户端是否存在
RegisterNodeInfo registerNodeInfo = CacheRegisterTable.getServerNode(realJobExecutorDTO.getGroupName(), realJobExecutorDTO.getClientId());
RegisterNodeInfo registerNodeInfo = CacheRegisterTable.getServerNode(realJobExecutorDTO.getGroupName(),
realJobExecutorDTO.getClientId());
if (Objects.isNull(registerNodeInfo)) {
taskExecuteFailure(realJobExecutorDTO, "无可执行的客户端");
return;
@ -69,14 +65,12 @@ public class RealJobExecutorActor extends AbstractActor {
JobLogDTO jobLogDTO = JobTaskConverter.INSTANCE.toJobLogDTO(realJobExecutorDTO);
DispatchJobRequest dispatchJobRequest = JobTaskConverter.INSTANCE.toDispatchJobRequest(realJobExecutorDTO);
// 构建重试组件
Retryer<Result<Boolean>> retryer = buildResultRetryer(realJobExecutorDTO);
try {
// 构建请求客户端对象
RpcClient rpcClient = buildRpcClient(registerNodeInfo);
Result<Boolean> dispatch = retryer.call(() -> rpcClient.dispatch(dispatchJobRequest));
if (dispatch.getStatus() == StatusEnum.YES.getStatus() && Objects.equals(dispatch.getData(), Boolean.TRUE)) {
JobRpcClient rpcClient = buildRpcClient(registerNodeInfo, realJobExecutorDTO);
Result<Boolean> dispatch = rpcClient.dispatch(dispatchJobRequest);
if (dispatch.getStatus() == StatusEnum.YES.getStatus() && Objects.equals(dispatch.getData(),
Boolean.TRUE)) {
jobLogDTO.setMessage("任务调度成功");
} else {
jobLogDTO.setMessage(dispatch.getMessage());
@ -96,14 +90,19 @@ public class RealJobExecutorActor extends AbstractActor {
}
private Retryer<Result<Boolean>> buildResultRetryer(RealJobExecutorDTO realJobExecutorDTO) {
Retryer<Result<Boolean>> retryer = RetryerBuilder.<Result<Boolean>>newBuilder()
.retryIfException(throwable -> true)
.withStopStrategy(StopStrategies.stopAfterAttempt(realJobExecutorDTO.getMaxRetryTimes()))
.withWaitStrategy(WaitStrategies.fixedWait(realJobExecutorDTO.getRetryInterval(), TimeUnit.SECONDS))
.withRetryListener(new RetryListener() {
public static class JobExecutorRetryListener implements RetryListener {
private RealJobExecutorDTO realJobExecutorDTO;
private JobTaskMapper jobTaskMapper;
public JobExecutorRetryListener(final RealJobExecutorDTO realJobExecutorDTO,
final JobTaskMapper jobTaskMapper) {
this.realJobExecutorDTO = realJobExecutorDTO;
this.jobTaskMapper = jobTaskMapper;
}
@Override
public <V> void onRetry(Attempt<V> attempt) {
public <V> void onRetry(final Attempt<V> attempt) {
if (attempt.hasException()) {
LogUtils.error(log, "任务调度失败. taskInstanceId:[{}] count:[{}]",
realJobExecutorDTO.getTaskBatchId(), attempt.getAttemptNumber(), attempt.getExceptionCause());
@ -112,27 +111,27 @@ public class RealJobExecutorActor extends AbstractActor {
jobTaskMapper.updateById(jobTask);
}
}
})
.build();
return retryer;
}
private RpcClient buildRpcClient(RegisterNodeInfo registerNodeInfo) {
RpcClient rpcClient = RequestBuilder.<RpcClient, Result>newBuilder()
private JobRpcClient buildRpcClient(RegisterNodeInfo registerNodeInfo, RealJobExecutorDTO realJobExecutorDTO) {
return RequestBuilder.<JobRpcClient, Result>newBuilder()
.hostPort(registerNodeInfo.getHostPort())
.groupName(registerNodeInfo.getGroupName())
.hostId(registerNodeInfo.getHostId())
.hostIp(registerNodeInfo.getHostIp())
.contextPath(registerNodeInfo.getContextPath())
.client(RpcClient.class)
.failRetry(Boolean.TRUE)
.retryTimes(realJobExecutorDTO.getMaxRetryTimes())
.retryInterval(realJobExecutorDTO.getRetryInterval())
.retryListener(new JobExecutorRetryListener(realJobExecutorDTO, jobTaskMapper))
.client(JobRpcClient.class)
.build();
return rpcClient;
}
private static void taskExecuteFailure(RealJobExecutorDTO realJobExecutorDTO, String message) {
ActorRef actorRef = ActorGenerator.jobTaskExecutorResultActor();
JobExecutorResultDTO jobExecutorResultDTO = new JobExecutorResultDTO();
jobExecutorResultDTO.setTaskId(realJobExecutorDTO.getTaskBatchId());
jobExecutorResultDTO.setTaskId(realJobExecutorDTO.getTaskId());
jobExecutorResultDTO.setJobId(realJobExecutorDTO.getJobId());
jobExecutorResultDTO.setTaskBatchId(realJobExecutorDTO.getTaskBatchId());
jobExecutorResultDTO.setTaskStatus(JobTaskStatusEnum.FAIL.getStatus());

View File

@ -24,14 +24,14 @@ public class JobTaskBatchHelper {
@Autowired
private JobTaskBatchMapper jobTaskBatchMapper;
public void complete(Long taskBatchId) {
public boolean complete(Long taskBatchId) {
List<JobTask> jobTasks = jobTaskMapper.selectList(
new LambdaQueryWrapper<JobTask>().select(JobTask::getExecuteStatus)
.eq(JobTask::getTaskBatchId, taskBatchId));
if (jobTasks.stream().anyMatch(jobTask -> JobTaskStatusEnum.NOT_COMPLETE.contains(jobTask.getExecuteStatus()))) {
return;
return false;
}
long failCount = jobTasks.stream().filter(jobTask -> jobTask.getExecuteStatus() == JobTaskBatchStatusEnum.FAIL.getStatus()).count();
@ -49,5 +49,7 @@ public class JobTaskBatchHelper {
jobTaskBatchMapper.updateById(jobTaskBatch);
return true;
}
}

View File

@ -6,6 +6,7 @@ import com.aizuda.easy.retry.server.job.task.JobTaskConverter;
import com.aizuda.easy.retry.server.job.task.dto.JobTaskPrepareDTO;
import com.aizuda.easy.retry.server.job.task.handler.helper.JobTaskBatchHelper;
import com.aizuda.easy.retry.server.job.task.strategy.BlockStrategies;
import com.aizuda.easy.retry.server.job.task.strategy.BlockStrategies.BlockStrategyEnum;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskBatchMapper;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskMapper;
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask;
@ -41,18 +42,21 @@ public class RunningJobPrepareHandler extends AbstractJobPrePareHandler {
log.info("存在运行中的任务. taskBatchId:[{}]", prepare.getTaskBatchId());
// 若存在所有的任务都是完成但是批次上的状态为运行中则是并发导致的未把批次状态变成为终态此处做一次兜底处理
jobTaskBatchHelper.complete(prepare.getTaskBatchId());
int blockStrategy = prepare.getBlockStrategy();
if (jobTaskBatchHelper.complete(prepare.getTaskBatchId())) {
blockStrategy = BlockStrategyEnum.CONCURRENCY.getBlockStrategy();
} else {
// 计算超时时间
long delay = System.currentTimeMillis() - prepare.getExecutionAt().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
int blockStrategy = prepare.getBlockStrategy();
// 计算超时时间到达超时时间覆盖任务
if (delay > prepare.getExecutorTimeout() * 1000) {
log.info("任务执行超时.taskBatchId:[{}] delay:[{}] executorTimeout:[{}]", prepare.getTaskBatchId(), delay, prepare.getExecutorTimeout() * 1000);
blockStrategy = BlockStrategies.BlockStrategyEnum.OVERLAY.getBlockStrategy();
}
}
BlockStrategies.BlockStrategyContext blockStrategyContext = JobTaskConverter.INSTANCE.toBlockStrategyContext(prepare);
BlockStrategy blockStrategyInterface = BlockStrategies.BlockStrategyEnum.getBlockStrategy(blockStrategy);
blockStrategyInterface.block(blockStrategyContext);

View File

@ -39,6 +39,8 @@ public class WaitJobPrepareHandler extends AbstractJobPrePareHandler {
- System.currentTimeMillis();
JobTimerTaskDTO jobTimerTaskDTO = new JobTimerTaskDTO();
jobTimerTaskDTO.setTaskBatchId(jobPrepareDTO.getTaskBatchId());
jobTimerTaskDTO.setJobId(jobPrepareDTO.getJobId());
jobTimerTaskDTO.setGroupName(jobPrepareDTO.getGroupName());
JobTimerWheelHandler.register(jobPrepareDTO.getGroupName(), jobPrepareDTO.getTaskBatchId(),
new JobTimerTask(jobTimerTaskDTO), delay, TimeUnit.MILLISECONDS);

View File

@ -30,7 +30,7 @@ public abstract class AbstractJobTaskStopHandler implements JobTaskStopHandler,
List<JobTask> jobTasks = jobTaskMapper.selectList(
new LambdaQueryWrapper<JobTask>()
.eq(JobTask::getTaskBatchId, context.getTaskBatchId())
.eq(JobTask::getExecuteStatus, JobTaskStatusEnum.NOT_COMPLETE)
.in(JobTask::getExecuteStatus, JobTaskStatusEnum.NOT_COMPLETE)
);
if (CollectionUtils.isEmpty(jobTasks)) {

View File

@ -6,8 +6,8 @@ import com.aizuda.easy.retry.common.core.model.Result;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.cache.CacheRegisterTable;
import com.aizuda.easy.retry.server.common.client.RequestBuilder;
import com.aizuda.easy.retry.server.common.client.RpcClient;
import com.aizuda.easy.retry.server.common.dto.RegisterNodeInfo;
import com.aizuda.easy.retry.server.job.task.client.JobRpcClient;
import com.aizuda.easy.retry.server.job.task.dto.RealStopTaskInstanceDTO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
@ -48,13 +48,16 @@ public class RealStopTaskActor extends AbstractActor {
}
private Result<Boolean> requestClient(RealStopTaskInstanceDTO realStopTaskInstanceDTO, RegisterNodeInfo registerNodeInfo) {
RpcClient rpcClient = RequestBuilder.<RpcClient, Result>newBuilder()
JobRpcClient rpcClient = RequestBuilder.<JobRpcClient, Result>newBuilder()
.hostPort(registerNodeInfo.getHostPort())
.groupName(realStopTaskInstanceDTO.getGroupName())
.hostId(registerNodeInfo.getHostId())
.hostIp(registerNodeInfo.getHostIp())
.contextPath(registerNodeInfo.getContextPath())
.client(RpcClient.class)
.failRetry(Boolean.TRUE)
.retryTimes(3)
.retryInterval(1)
.client(JobRpcClient.class)
.build();
StopJobDTO stopJobDTO = new StopJobDTO();

View File

@ -3,12 +3,21 @@ package com.aizuda.easy.retry.server.job.task.handler.timer;
import akka.actor.ActorRef;
import cn.hutool.core.lang.Assert;
import com.aizuda.easy.retry.common.core.context.SpringContext;
import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum;
import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.easy.retry.common.core.enums.StatusEnum;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.job.task.JobTaskConverter;
import com.aizuda.easy.retry.server.job.task.WaitStrategy;
import com.aizuda.easy.retry.server.job.task.dto.JobTaskPrepareDTO;
import com.aizuda.easy.retry.server.job.task.dto.JobTimerTaskDTO;
import com.aizuda.easy.retry.server.job.task.dto.TaskExecuteDTO;
import com.aizuda.easy.retry.server.job.task.strategy.WaitStrategies.WaitStrategyContext;
import com.aizuda.easy.retry.server.job.task.strategy.WaitStrategies.WaitStrategyEnum;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskBatchMapper;
import com.aizuda.easy.retry.template.datasource.persistence.po.Job;
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTaskBatch;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import io.netty.util.Timeout;
@ -42,36 +51,48 @@ public class JobTimerTask implements TimerTask {
log.info("开始执行任务调度. 当前时间:[{}] taskId:[{}]", LocalDateTime.now(), jobTimerTaskDTO.getTaskBatchId());
executor.execute(() -> {
Long jobId = 0L;
String groupName = "";
try {
JobTaskBatchMapper jobTaskBatchMapper = SpringContext.getBeanByType(JobTaskBatchMapper.class);
JobTaskBatch jobTaskBatch = jobTaskBatchMapper.selectOne(new LambdaQueryWrapper<JobTaskBatch>()
.select(JobTaskBatch::getJobId, JobTaskBatch::getGroupName, JobTaskBatch::getId)
.eq(JobTaskBatch::getId, jobTimerTaskDTO.getTaskBatchId())
.eq(JobTaskBatch::getTaskStatus, JobTaskBatchStatusEnum.WAITING.getStatus()));
if (Objects.isNull(jobTaskBatch)) {
return;
// 清除时间轮的缓存
JobTimerWheelHandler.clearCache(jobTimerTaskDTO.getGroupName(), jobTimerTaskDTO.getTaskBatchId());
JobMapper jobMapper = SpringContext.getBeanByType(JobMapper.class);
Job job = jobMapper.selectOne(new LambdaQueryWrapper<Job>()
.eq(Job::getJobStatus, StatusEnum.YES.getStatus())
.eq(Job::getId, jobTimerTaskDTO.getJobId())
);
int taskStatus = JobTaskBatchStatusEnum.RUNNING.getStatus();
int operationReason = JobOperationReasonEnum.NONE.getReason();
if (Objects.isNull(job)) {
log.warn("任务已经关闭不允许执行. jobId:[{}]", jobTimerTaskDTO.getJobId());
taskStatus = JobTaskBatchStatusEnum.CANCEL.getStatus();
operationReason = JobOperationReasonEnum.JOB_CLOSED.getReason();
}
jobId = jobTaskBatch.getJobId();
groupName = jobTaskBatch.getGroupName();
JobTaskBatchMapper jobTaskBatchMapper = SpringContext.getBeanByType(JobTaskBatchMapper.class);
JobTaskBatch jobTaskBatch = new JobTaskBatch();
jobTaskBatch.setId(jobTimerTaskDTO.getTaskBatchId());
jobTaskBatch.setExecutionAt(LocalDateTime.now());
jobTaskBatch.setTaskStatus(JobTaskBatchStatusEnum.RUNNING.getStatus());
jobTaskBatch.setTaskStatus(taskStatus);
jobTaskBatch.setOperationReason(operationReason);
Assert.isTrue(1 == jobTaskBatchMapper.updateById(jobTaskBatch),
() -> new EasyRetryServerException("更新任务失败"));
// 如果任务已经关闭则不需要执行
if (Objects.isNull(job)) {
return;
}
TaskExecuteDTO taskExecuteDTO = new TaskExecuteDTO();
taskExecuteDTO.setTaskBatchId(jobTimerTaskDTO.getTaskBatchId());
taskExecuteDTO.setGroupName(groupName);
taskExecuteDTO.setJobId(jobId);
taskExecuteDTO.setGroupName(jobTimerTaskDTO.getGroupName());
taskExecuteDTO.setJobId(jobTimerTaskDTO.getJobId());
ActorRef actorRef = ActorGenerator.jobTaskExecutorActor();
actorRef.tell(taskExecuteDTO, actorRef);
} catch (Exception e) {
log.error("任务调度执行失败", e);
} finally {
// 清除时间轮的缓存
JobTimerWheelHandler.clearCache(groupName, jobId);
}
});

View File

@ -34,7 +34,7 @@ public class JobTimerWheelHandler implements Lifecycle {
// tickDuration timeUnit 一格的时间长度
// ticksPerWheel 一圈有多少格
timer = new HashedWheelTimer(
new CustomizableThreadFactory("job-task-timer-wheel-"), 100,
new CustomizableThreadFactory("job-task-timer-wheel-"), 1000,
TimeUnit.MILLISECONDS, 1024);
timer.start();

View File

@ -8,6 +8,7 @@ import com.aizuda.easy.retry.server.job.task.generator.batch.JobTaskBatchGenerat
import com.aizuda.easy.retry.server.job.task.generator.batch.JobTaskBatchGeneratorContext;
import com.aizuda.easy.retry.server.job.task.handler.stop.JobTaskStopHandler;
import com.aizuda.easy.retry.server.job.task.handler.stop.JobTaskStopFactory;
import com.aizuda.easy.retry.server.job.task.handler.stop.TaskStopJobContext;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.Getter;
@ -49,7 +50,7 @@ public class BlockStrategies {
private Long jobId;
private Long taskId;
private Long taskBatchId;
private String groupName;
@ -69,7 +70,7 @@ public class BlockStrategies {
@Override
public void block(final BlockStrategyContext context) {
log.warn("阻塞策略为丢弃此次执行. jobId:[{}]", context.getJobId());
log.warn("阻塞策略为丢弃此次执行. taskBatchId:[{}]", context.getTaskBatchId());
}
}
@ -77,11 +78,13 @@ public class BlockStrategies {
@Override
public void block(final BlockStrategyContext context) {
log.warn("阻塞策略为覆盖. jobId:[{}]", context.getJobId());
log.warn("阻塞策略为覆盖. taskBatchId:[{}]", context.getTaskBatchId());
// 停止任务
JobTaskStopHandler instanceInterrupt = JobTaskStopFactory.getJobTaskStop(context.taskType);
instanceInterrupt.stop(JobTaskConverter.INSTANCE.toStopJobContext(context));
TaskStopJobContext stopJobContext = JobTaskConverter.INSTANCE.toStopJobContext(context);
stopJobContext.setNeedUpdateTaskStatus(Boolean.TRUE);
instanceInterrupt.stop(stopJobContext);
// 重新生成任务
JobTaskBatchGenerator jobTaskBatchGenerator = SpringContext.getBeanByType(JobTaskBatchGenerator.class);
@ -94,7 +97,7 @@ public class BlockStrategies {
@Override
public void block(final BlockStrategyContext context) {
log.warn("阻塞策略为并行执行. jobId:[{}]", context.getJobId());
log.warn("阻塞策略为并行执行. taskBatchId:[{}]", context.getTaskBatchId());
// 重新生成任务
JobTaskBatchGenerator jobTaskBatchGenerator = SpringContext.getBeanByType(JobTaskBatchGenerator.class);

View File

@ -1,12 +1,13 @@
package com.aizuda.easy.retry.server.common.client;
package com.aizuda.easy.retry.server.retry.task.client;
import com.aizuda.easy.retry.client.model.request.DispatchJobRequest;
import com.aizuda.easy.retry.client.model.DispatchRetryDTO;
import com.aizuda.easy.retry.client.model.DispatchRetryResultDTO;
import com.aizuda.easy.retry.client.model.StopJobDTO;
import com.aizuda.easy.retry.client.model.RetryCallbackDTO;
import com.aizuda.easy.retry.client.model.StopJobDTO;
import com.aizuda.easy.retry.client.model.request.DispatchJobRequest;
import com.aizuda.easy.retry.common.core.model.EasyRetryHeaders;
import com.aizuda.easy.retry.common.core.model.Result;
import com.aizuda.easy.retry.server.common.client.RequestMethod;
import com.aizuda.easy.retry.server.common.client.annotation.Body;
import com.aizuda.easy.retry.server.common.client.annotation.Header;
import com.aizuda.easy.retry.server.common.client.annotation.Mapping;
@ -18,18 +19,12 @@ import com.aizuda.easy.retry.server.common.client.annotation.Mapping;
* @date : 2023-06-19 15:40
* @since 2.0.0
*/
public interface RpcClient {
public interface RetryRpcClient {
@Mapping(path = "/retry/dispatch/v1", method = RequestMethod.POST)
@Mapping(path = "/retry/dispatch/v1", method = RequestMethod.POST, failover = true)
Result<DispatchRetryResultDTO> dispatch(@Body DispatchRetryDTO dispatchRetryDTO, @Header EasyRetryHeaders headers);
@Mapping(path = "/retry/callback/v1", method = RequestMethod.POST)
@Mapping(path = "/retry/callback/v1", method = RequestMethod.POST, failover = true)
Result callback(@Body RetryCallbackDTO retryCallbackDTO);
@Mapping(path = "/job/stop/v1", method = RequestMethod.POST)
Result<Boolean> stop(@Body StopJobDTO stopJobDTO);
@Mapping(path = "/job/dispatch/v1", method = RequestMethod.POST)
Result<Boolean> dispatch(@Body DispatchJobRequest dispatchJobRequest);
}

View File

@ -11,9 +11,9 @@ import com.aizuda.easy.retry.common.core.model.Result;
import com.aizuda.easy.retry.common.core.util.JsonUtil;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.client.RequestBuilder;
import com.aizuda.easy.retry.server.common.client.RpcClient;
import com.aizuda.easy.retry.server.common.dto.RegisterNodeInfo;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.retry.task.client.RetryRpcClient;
import com.aizuda.easy.retry.server.retry.task.support.IdempotentStrategy;
import com.aizuda.easy.retry.server.retry.task.support.context.CallbackRetryContext;
import com.aizuda.easy.retry.server.retry.task.support.dispatch.actor.log.RetryTaskLogDTO;
@ -135,13 +135,13 @@ public class ExecCallbackUnitActor extends AbstractActor {
retryCallbackDTO.setExecutorName(callbackTask.getExecutorName());
retryCallbackDTO.setUniqueId(callbackTask.getUniqueId());
RpcClient rpcClient = RequestBuilder.<RpcClient, Result>newBuilder()
RetryRpcClient rpcClient = RequestBuilder.<RetryRpcClient, Result>newBuilder()
.hostPort(serverNode.getHostPort())
.groupName(serverNode.getGroupName())
.hostId(serverNode.getHostId())
.hostIp(serverNode.getHostIp())
.contextPath(serverNode.getContextPath())
.client(RpcClient.class)
.client(RetryRpcClient.class)
.build();
return rpcClient.callback(retryCallbackDTO);

View File

@ -14,8 +14,8 @@ import com.aizuda.easy.retry.common.core.model.Result;
import com.aizuda.easy.retry.common.core.util.JsonUtil;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.client.RequestBuilder;
import com.aizuda.easy.retry.server.common.client.RpcClient;
import com.aizuda.easy.retry.server.common.dto.RegisterNodeInfo;
import com.aizuda.easy.retry.server.retry.task.client.RetryRpcClient;
import com.aizuda.easy.retry.server.retry.task.support.IdempotentStrategy;
import com.aizuda.easy.retry.server.retry.task.support.context.MaxAttemptsPersistenceRetryContext;
import com.aizuda.easy.retry.server.retry.task.support.dispatch.actor.log.RetryTaskLogDTO;
@ -145,13 +145,13 @@ public class ExecUnitActor extends AbstractActor {
easyRetryHeaders.setEasyRetryId(retryTask.getUniqueId());
requestHeaders.add(SystemConstants.EASY_RETRY_HEAD_KEY, JsonUtil.toJsonString(easyRetryHeaders));
RpcClient rpcClient = RequestBuilder.<RpcClient, Result>newBuilder()
RetryRpcClient rpcClient = RequestBuilder.<RetryRpcClient, Result>newBuilder()
.hostPort(serverNode.getHostPort())
.groupName(serverNode.getGroupName())
.hostId(serverNode.getHostId())
.hostIp(serverNode.getHostIp())
.contextPath(serverNode.getContextPath())
.client(RpcClient.class)
.client(RetryRpcClient.class)
.build();
return rpcClient.dispatch(dispatchRetryDTO, easyRetryHeaders);

View File

@ -88,6 +88,8 @@ public class ConsumerBucketActor extends AbstractActor {
// 扫描回调数据
ScanTask scanTask = new ScanTask();
scanTask.setBuckets(consumerBucket.getBuckets());
scanTask.setSize(1000);
scanTask.setStartId(0);
ActorRef scanJobActorRef = cacheActorRef("DEFAULT_JOB_KEY", TaskTypeEnum.JOB);
scanJobActorRef.tell(scanTask, scanJobActorRef);
}