feat:2.4.0

1. 修复客户端提交时taskBatchId值错误导致更新状态异常问题
This commit is contained in:
byteblogs168 2023-10-10 23:00:20 +08:00
parent a5ea2a6651
commit 943532412c
7 changed files with 33 additions and 26 deletions

View File

@ -35,34 +35,35 @@ public class JobExecutorFutureCallback implements FutureCallback<ExecuteResult>
public void onSuccess(final ExecuteResult result) {
// 上报执行成功
log.info("任务执行成功 [{}]", JsonUtil.toJsonString(result));
DispatchJobResultRequest dispatchJobRequest = new DispatchJobResultRequest();
int taskStatus;
if (result.getStatus() == StatusEnum.NO.getStatus()) {
dispatchJobRequest.setTaskStatus(JobTaskStatusEnum.FAIL.getStatus());
taskStatus = JobTaskStatusEnum.FAIL.getStatus();
} else {
dispatchJobRequest.setTaskStatus(JobTaskStatusEnum.SUCCESS.getStatus());
taskStatus = JobTaskStatusEnum.SUCCESS.getStatus();
}
dispatchJobRequest.setTaskBatchId(jobContext.getTaskId());
dispatchJobRequest.setGroupName(jobContext.getGroupName());
dispatchJobRequest.setJobId(jobContext.getJobId());
dispatchJobRequest.setTaskId(jobContext.getTaskId());
dispatchJobRequest.setTaskType(jobContext.getTaskType());
dispatchJobRequest.setExecuteResult(result);
CLIENT.dispatchResult(dispatchJobRequest);
CLIENT.dispatchResult(buildDispatchJobResultRequest(result, taskStatus));
}
@Override
public void onFailure(final Throwable t) {
// 上报执行失败
log.error("任务执行失败 jobTask:[{}]", jobContext.getTaskId(), t);
CLIENT.dispatchResult(
buildDispatchJobResultRequest(ExecuteResult.failure(t.getMessage()), JobTaskStatusEnum.FAIL.getStatus())
);
}
private DispatchJobResultRequest buildDispatchJobResultRequest(ExecuteResult executeResult, int status) {
DispatchJobResultRequest dispatchJobRequest = new DispatchJobResultRequest();
dispatchJobRequest.setTaskBatchId(jobContext.getTaskId());
dispatchJobRequest.setTaskBatchId(jobContext.getTaskBatchId());
dispatchJobRequest.setGroupName(jobContext.getGroupName());
dispatchJobRequest.setJobId(jobContext.getJobId());
dispatchJobRequest.setTaskId(jobContext.getTaskId());
dispatchJobRequest.setTaskStatus(JobTaskStatusEnum.FAIL.getStatus());
dispatchJobRequest.setTaskType(jobContext.getTaskType());
dispatchJobRequest.setExecuteResult(ExecuteResult.failure(t.getMessage()));
CLIENT.dispatchResult(dispatchJobRequest);
dispatchJobRequest.setExecuteResult(executeResult);
dispatchJobRequest.setTaskStatus(status);
return dispatchJobRequest;
}
}

View File

@ -45,6 +45,7 @@ public class JobExecutorResultActor extends AbstractActor {
@Override
public Receive createReceive() {
return receiveBuilder().match(JobExecutorResultDTO.class, result -> {
log.info("更新任务状态. 参数:[{}]", JsonUtil.toJsonString(result));
transactionTemplate.execute(new TransactionCallbackWithoutResult() {
@Override

View File

@ -80,7 +80,7 @@ public class ScanJobTaskActor extends AbstractActor {
WaitStrategyContext waitStrategyContext = new WaitStrategyContext();
waitStrategyContext.setTriggerType(partitionTask.getTriggerType());
waitStrategyContext.setTriggerInterval(partitionTask.getTriggerInterval());
waitStrategyContext.setNextTriggerAt(partitionTask.getNextTriggerAt());
waitStrategyContext.setNextTriggerAt(LocalDateTime.now());
Job job = new Job();
job.setId(partitionTask.getId());

View File

@ -49,4 +49,9 @@ public class JobPartitionTask extends PartitionTask {
*/
private Integer executorTimeout;
/**
* 任务类型
*/
private Integer taskType;
}

View File

@ -57,7 +57,7 @@ public class JobTaskBatchGenerator {
JobTimerTaskDTO jobTimerTaskDTO = new JobTimerTaskDTO();
jobTimerTaskDTO.setTaskBatchId(jobTaskBatch.getId());
JobTimerWheelHandler.register(context.getGroupName(), context.getJobId(),
JobTimerWheelHandler.register(context.getGroupName(), jobTaskBatch.getId(),
new JobTimerTask(jobTimerTaskDTO), delay, TimeUnit.MILLISECONDS);
}

View File

@ -32,7 +32,7 @@ public class WaitJobPrepareHandler extends AbstractJobPrePareHandler {
log.info("存在待处理任务. taskBatchId:[{}]", jobPrepareDTO.getTaskBatchId());
// 若时间轮中数据不存在则重新加入
if (!JobTimerWheelHandler.isExisted(jobPrepareDTO.getGroupName(), jobPrepareDTO.getJobId())) {
if (!JobTimerWheelHandler.isExisted(jobPrepareDTO.getGroupName(), jobPrepareDTO.getTaskBatchId())) {
// 进入时间轮
long delay = jobPrepareDTO.getNextTriggerAt().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli()
@ -40,7 +40,7 @@ public class WaitJobPrepareHandler extends AbstractJobPrePareHandler {
JobTimerTaskDTO jobTimerTaskDTO = new JobTimerTaskDTO();
jobTimerTaskDTO.setTaskBatchId(jobPrepareDTO.getTaskBatchId());
JobTimerWheelHandler.register(jobPrepareDTO.getGroupName(), jobPrepareDTO.getJobId(),
JobTimerWheelHandler.register(jobPrepareDTO.getGroupName(), jobPrepareDTO.getTaskBatchId(),
new JobTimerTask(jobTimerTaskDTO), delay, TimeUnit.MILLISECONDS);
}
}

View File

@ -45,7 +45,7 @@ public class JobTimerWheelHandler implements Lifecycle {
.build();
}
public static void register(String groupName, Long taskId, TimerTask task, long delay, TimeUnit unit) {
public static void register(String groupName, Long taskBatchId, TimerTask task, long delay, TimeUnit unit) {
if (delay < 0) {
delay = 0;
@ -54,19 +54,19 @@ public class JobTimerWheelHandler implements Lifecycle {
// TODO 支持可配置
if (delay > 60 * 1000) {
LogUtils.warn(log, "距离下次执行时间过久, 不满足进入时间轮的条件. groupName:[{}] uniqueId:[{}] delay:[{}ms]",
groupName, taskId, delay);
groupName, taskBatchId, delay);
return;
}
Timeout timeout = getTimeout(groupName, taskId);
Timeout timeout = getTimeout(groupName, taskBatchId);
if (Objects.isNull(timeout)) {
try {
log.info("加入时间轮. delay:[{}ms] taskId:[{}]", delay, taskId);
log.info("加入时间轮. delay:[{}ms] taskId:[{}]", delay, taskBatchId);
timeout = timer.newTimeout(task, delay, unit);
cache.put(getKey(groupName, taskId), timeout);
cache.put(getKey(groupName, taskBatchId), timeout);
} catch (Exception e) {
LogUtils.error(log, "加入时间轮失败. groupName:[{}] uniqueId:[{}]",
groupName, taskId, e);
groupName, taskBatchId, e);
}
}