feat: 2.6.0

1. 优化定时任务回调重试
2. 优化工作流前端显示问题
This commit is contained in:
byteblogs168 2024-01-13 22:53:09 +08:00
parent ee5dfa962c
commit c8a238b91c
20 changed files with 247 additions and 160 deletions

View File

@ -11,6 +11,7 @@ import com.aizuda.easy.retry.client.model.request.DispatchJobRequest;
import com.aizuda.easy.retry.common.core.context.SpringContext; import com.aizuda.easy.retry.common.core.context.SpringContext;
import com.aizuda.easy.retry.common.core.model.JobContext; import com.aizuda.easy.retry.common.core.model.JobContext;
import com.aizuda.easy.retry.common.core.model.Result; import com.aizuda.easy.retry.common.core.model.Result;
import com.aizuda.easy.retry.common.log.EasyRetryLog;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.validation.annotation.Validated; import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.PostMapping;
@ -36,19 +37,26 @@ public class JobEndPoint {
JobContext jobContext = buildJobContext(dispatchJob); JobContext jobContext = buildJobContext(dispatchJob);
JobExecutorInfo jobExecutorInfo = JobExecutorInfoCache.get(jobContext.getExecutorInfo()); JobExecutorInfo jobExecutorInfo = JobExecutorInfoCache.get(jobContext.getExecutorInfo());
if (Objects.isNull(jobExecutorInfo)) { if (Objects.isNull(jobExecutorInfo)) {
EasyRetryLog.REMOTE.error("执行器配置有误. executorInfo:[{}]", dispatchJob.getExecutorInfo());
return new Result<>("执行器配置有误", Boolean.FALSE); return new Result<>("执行器配置有误", Boolean.FALSE);
} }
// 选择执行器 try {
Object executor = jobExecutorInfo.getExecutor(); // 选择执行器
IJobExecutor jobExecutor; Object executor = jobExecutorInfo.getExecutor();
if (IJobExecutor.class.isAssignableFrom(executor.getClass())) { IJobExecutor jobExecutor;
jobExecutor = (AbstractJobExecutor) executor; if (IJobExecutor.class.isAssignableFrom(executor.getClass())) {
} else { jobExecutor = (AbstractJobExecutor) executor;
jobExecutor = SpringContext.getBeanByType(AnnotationJobExecutor.class); } else {
jobExecutor = SpringContext.getBeanByType(AnnotationJobExecutor.class);
}
jobExecutor.jobExecute(jobContext);
} catch (Exception e) {
EasyRetryLog.REMOTE.error("客户端发生非预期异常. taskBatchId:[{}]", dispatchJob.getTaskBatchId());
throw e;
} }
jobExecutor.jobExecute(jobContext);
return new Result<>(Boolean.TRUE); return new Result<>(Boolean.TRUE);
} }

View File

@ -30,7 +30,7 @@ public class JobExecutorFutureCallback implements FutureCallback<ExecuteResult>
private static final JobNettyClient CLIENT = RequestBuilder.<JobNettyClient, NettyResult>newBuilder() private static final JobNettyClient CLIENT = RequestBuilder.<JobNettyClient, NettyResult>newBuilder()
.client(JobNettyClient.class) .client(JobNettyClient.class)
.callback(nettyResult -> LogUtils.info(log, "Data report successfully requestId:[{}]", nettyResult.getRequestId())).build(); .callback(nettyResult -> EasyRetryLog.LOCAL.info("Job execute result report successfully requestId:[{}]", nettyResult.getRequestId())).build();
private final JobContext jobContext; private final JobContext jobContext;
@ -41,7 +41,7 @@ public class JobExecutorFutureCallback implements FutureCallback<ExecuteResult>
@Override @Override
public void onSuccess(ExecuteResult result) { public void onSuccess(ExecuteResult result) {
// 上报执行成功 // 上报执行成功
EasyRetryLog.LOCAL.warn("任务执行成功 taskBatchId:[{}] [{}]", jobContext.getTaskBatchId(), JsonUtil.toJsonString(result)); EasyRetryLog.REMOTE.info("任务执行成功 taskBatchId:[{}] [{}]", jobContext.getTaskBatchId(), JsonUtil.toJsonString(result));
if (Objects.isNull(result)) { if (Objects.isNull(result)) {
result = ExecuteResult.success(); result = ExecuteResult.success();
@ -57,7 +57,7 @@ public class JobExecutorFutureCallback implements FutureCallback<ExecuteResult>
try { try {
CLIENT.dispatchResult(buildDispatchJobResultRequest(result, taskStatus)); CLIENT.dispatchResult(buildDispatchJobResultRequest(result, taskStatus));
} catch (Exception e) { } catch (Exception e) {
EasyRetryLog.LOCAL.error("执行结果上报异常.[{}]", jobContext.getTaskId(), e); EasyRetryLog.REMOTE.error("执行结果上报异常.[{}]", jobContext.getTaskId(), e);
} finally { } finally {
stopThreadPool(); stopThreadPool();
ThreadLocalLogUtil.removeContext(); ThreadLocalLogUtil.removeContext();
@ -67,7 +67,7 @@ public class JobExecutorFutureCallback implements FutureCallback<ExecuteResult>
@Override @Override
public void onFailure(final Throwable t) { public void onFailure(final Throwable t) {
// 上报执行失败 // 上报执行失败
log.error("任务执行失败 任务执行成功 taskBatchId:[{}]", jobContext.getTaskBatchId(), t); EasyRetryLog.REMOTE.error("任务执行失败 taskBatchId:[{}]", jobContext.getTaskBatchId(), t);
try { try {
ExecuteResult failure = ExecuteResult.failure(); ExecuteResult failure = ExecuteResult.failure();
@ -81,7 +81,7 @@ public class JobExecutorFutureCallback implements FutureCallback<ExecuteResult>
buildDispatchJobResultRequest(failure, JobTaskStatusEnum.FAIL.getStatus()) buildDispatchJobResultRequest(failure, JobTaskStatusEnum.FAIL.getStatus())
); );
} catch (Exception e) { } catch (Exception e) {
EasyRetryLog.LOCAL.error("执行结果上报异常.[{}]", jobContext.getTaskId(), e); EasyRetryLog.REMOTE.error("执行结果上报异常.[{}]", jobContext.getTaskId(), e);
} finally { } finally {
stopThreadPool(); stopThreadPool();
ThreadLocalLogUtil.removeContext(); ThreadLocalLogUtil.removeContext();

View File

@ -3,6 +3,8 @@ package com.aizuda.easy.retry.server.model.dto;
import lombok.Data; import lombok.Data;
/** /**
* 工作流回调节点参数模型
*
* @author: xiaowoniu * @author: xiaowoniu
* @date : 2024-01-02 * @date : 2024-01-02
* @since : 2.6.0 * @since : 2.6.0

View File

@ -1,13 +1,13 @@
package com.aizuda.easy.retry.server.common.handler; package com.aizuda.easy.retry.server.common.handler;
import com.aizuda.easy.retry.common.core.log.LogUtils; import com.aizuda.easy.retry.common.log.EasyRetryLog;
import com.aizuda.easy.retry.server.common.ClientLoadBalance; import com.aizuda.easy.retry.server.common.ClientLoadBalance;
import com.aizuda.easy.retry.server.common.allocate.client.ClientLoadBalanceManager; import com.aizuda.easy.retry.server.common.allocate.client.ClientLoadBalanceManager;
import com.aizuda.easy.retry.server.common.allocate.client.ClientLoadBalanceManager.AllocationAlgorithmEnum; import com.aizuda.easy.retry.server.common.allocate.client.ClientLoadBalanceManager.AllocationAlgorithmEnum;
import com.aizuda.easy.retry.server.common.dto.RegisterNodeInfo;
import com.aizuda.easy.retry.server.common.cache.CacheRegisterTable; import com.aizuda.easy.retry.server.common.cache.CacheRegisterTable;
import com.aizuda.easy.retry.server.common.dto.RegisterNodeInfo;
import com.aizuda.easy.retry.template.datasource.access.AccessTemplate; import com.aizuda.easy.retry.template.datasource.access.AccessTemplate;
import lombok.extern.slf4j.Slf4j; import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils; import org.springframework.util.CollectionUtils;
@ -22,11 +22,9 @@ import java.util.stream.Stream;
* @date : 2023-01-10 14:18 * @date : 2023-01-10 14:18
*/ */
@Component @Component
@Slf4j @RequiredArgsConstructor
public class ClientNodeAllocateHandler { public class ClientNodeAllocateHandler {
private final AccessTemplate accessTemplate;
@Autowired
protected AccessTemplate accessTemplate;
/** /**
* 获取分配的节点 * 获取分配的节点
@ -39,7 +37,7 @@ public class ClientNodeAllocateHandler {
Set<RegisterNodeInfo> serverNodes = CacheRegisterTable.getServerNodeSet(groupName, namespaceId); Set<RegisterNodeInfo> serverNodes = CacheRegisterTable.getServerNodeSet(groupName, namespaceId);
if (CollectionUtils.isEmpty(serverNodes)) { if (CollectionUtils.isEmpty(serverNodes)) {
LogUtils.warn(log, "client node is null. groupName:[{}]", groupName); EasyRetryLog.LOCAL.warn("client node is null. groupName:[{}]", groupName);
return null; return null;
} }

View File

@ -63,7 +63,7 @@ public interface JobTaskConverter {
JobLogMessage toJobLogMessage(LogTaskDTO logTaskDTO); JobLogMessage toJobLogMessage(LogTaskDTO logTaskDTO);
JobLogDTO toJobLogDTO(JobExecutorContext context); LogMetaDTO toJobLogDTO(ClientCallbackContext context);
LogMetaDTO toJobLogDTO(JobExecutorResultDTO resultDTO); LogMetaDTO toJobLogDTO(JobExecutorResultDTO resultDTO);

View File

@ -1,9 +1,28 @@
package com.aizuda.easy.retry.server.job.task.support.callback; package com.aizuda.easy.retry.server.job.task.support.callback;
import akka.actor.ActorRef;
import cn.hutool.core.util.StrUtil;
import com.aizuda.easy.retry.common.core.enums.JobTaskStatusEnum;
import com.aizuda.easy.retry.common.log.EasyRetryLog;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.dto.RegisterNodeInfo;
import com.aizuda.easy.retry.server.common.util.ClientInfoUtils;
import com.aizuda.easy.retry.server.job.task.dto.LogMetaDTO;
import com.aizuda.easy.retry.server.job.task.dto.RealJobExecutorDTO;
import com.aizuda.easy.retry.server.job.task.support.ClientCallbackHandler; import com.aizuda.easy.retry.server.job.task.support.ClientCallbackHandler;
import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskMapper;
import com.aizuda.easy.retry.template.datasource.persistence.po.Job;
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.baomidou.mybatisplus.extension.toolkit.SqlHelper;
import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.annotation.Transactional;
import java.util.Objects;
/** /**
* @author www.byteblogs.com * @author www.byteblogs.com
* @date 2023-10-03 23:12:33 * @date 2023-10-03 23:12:33
@ -11,12 +30,107 @@ import org.springframework.transaction.annotation.Transactional;
*/ */
public abstract class AbstractClientCallbackHandler implements ClientCallbackHandler, InitializingBean { public abstract class AbstractClientCallbackHandler implements ClientCallbackHandler, InitializingBean {
@Autowired
private JobTaskMapper jobTaskMapper;
@Autowired
private JobMapper jobMapper;
@Override @Override
@Transactional @Transactional
public void callback(ClientCallbackContext context) { public void callback(ClientCallbackContext context) {
// 判定是否需要重试
boolean needRetry = isNeedRetry(context);
if (needRetry) {
// 更新重试次数
if (updateRetryCount(context)) {
Job job = context.getJob();
JobTask jobTask = context.getJobTask();
RealJobExecutorDTO realJobExecutor = JobTaskConverter.INSTANCE.toRealJobExecutorDTO(JobTaskConverter.INSTANCE.toJobExecutorContext(job), jobTask);
realJobExecutor.setClientId(ClientInfoUtils.clientId(context.getClientInfo()));
ActorRef actorRef = ActorGenerator.jobRealTaskExecutorActor();
actorRef.tell(realJobExecutor, actorRef);
LogMetaDTO logMetaDTO = JobTaskConverter.INSTANCE.toJobLogDTO(context);
EasyRetryLog.REMOTE.info("任务执行/调度失败执行重试. 重试次数:[{}] <|>{}<|>",
jobTask.getRetryCount() + 1, logMetaDTO);
return;
}
}
// 不需要重试执行回调
doCallback(context); doCallback(context);
} }
private boolean updateRetryCount(ClientCallbackContext context) {
JobTask updateJobTask = new JobTask();
updateJobTask.setRetryCount(1);
String newClient = chooseNewClient(context);
if (StrUtil.isNotBlank(newClient)) {
updateJobTask.setClientInfo(newClient);
// 覆盖老的的客户端信息
context.setClientInfo(newClient);
}
Job job = context.getJob();
return SqlHelper.retBool(jobTaskMapper.update(updateJobTask, Wrappers.<JobTask>lambdaUpdate()
.lt(JobTask::getRetryCount, job.getMaxRetryTimes())
.eq(JobTask::getId, context.getTaskId())
));
}
private boolean isNeedRetry(ClientCallbackContext context) {
if (context.getTaskStatus().equals(JobTaskStatusEnum.FAIL.getStatus())) {
JobTask jobTask = jobTaskMapper.selectById(context.getTaskId());
Job job = jobMapper.selectById(context.getJobId());
if (Objects.isNull(jobTask) || Objects.isNull(job)) {
return Boolean.FALSE;
}
if (jobTask.getRetryCount() < job.getMaxRetryTimes()) {
context.setClientInfo(jobTask.getClientInfo());
context.setJob(job);
context.setJobTask(jobTask);
return Boolean.TRUE;
}
}
return Boolean.FALSE;
}
protected String chooseNewClient(ClientCallbackContext context) {
return null;
}
private void failRetry(ClientCallbackContext context) {
if (context.getTaskStatus().equals(JobTaskStatusEnum.FAIL.getStatus())) {
JobTask jobTask = jobTaskMapper.selectById(context.getTaskId());
Job job = jobMapper.selectById(context.getJobId());
if (jobTask == null || job == null) {
return;
}
if (jobTask.getRetryCount() < job.getMaxRetryTimes()) {
// 更新重试次数
JobTask updateJobTask = new JobTask();
updateJobTask.setRetryCount(1);
boolean success = SqlHelper.retBool(jobTaskMapper.update(updateJobTask, Wrappers.<JobTask>lambdaUpdate()
.lt(JobTask::getRetryCount, job.getMaxRetryTimes())
.eq(JobTask::getId, context.getTaskId())
));
if (success) {
}
return;
}
}
}
protected abstract void doCallback(ClientCallbackContext context); protected abstract void doCallback(ClientCallbackContext context);
@Override @Override

View File

@ -2,8 +2,10 @@ package com.aizuda.easy.retry.server.job.task.support.callback;
import akka.actor.ActorRef; import akka.actor.ActorRef;
import com.aizuda.easy.retry.common.core.enums.JobTaskStatusEnum; import com.aizuda.easy.retry.common.core.enums.JobTaskStatusEnum;
import com.aizuda.easy.retry.common.log.EasyRetryLog;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator; import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.util.ClientInfoUtils; import com.aizuda.easy.retry.server.common.util.ClientInfoUtils;
import com.aizuda.easy.retry.server.job.task.dto.LogMetaDTO;
import com.aizuda.easy.retry.server.job.task.dto.RealJobExecutorDTO; import com.aizuda.easy.retry.server.job.task.dto.RealJobExecutorDTO;
import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter; import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter;
import com.aizuda.easy.retry.server.job.task.dto.JobExecutorResultDTO; import com.aizuda.easy.retry.server.job.task.dto.JobExecutorResultDTO;
@ -39,30 +41,7 @@ public class BroadcastClientCallbackHandler extends AbstractClientCallbackHandle
@Override @Override
protected void doCallback(final ClientCallbackContext context) { protected void doCallback(final ClientCallbackContext context) {
if (context.getTaskStatus().equals(JobTaskStatusEnum.FAIL.getStatus())) {
JobTask jobTask = jobTaskMapper.selectById(context.getTaskId());
Job job = jobMapper.selectById(context.getJobId());
if (jobTask == null || job == null) {
return;
}
if (jobTask.getRetryCount() < job.getMaxRetryTimes()) {
// 更新重试次数
JobTask updateJobTask = new JobTask();
updateJobTask.setRetryCount(1);
boolean success = SqlHelper.retBool(jobTaskMapper.update(updateJobTask, Wrappers.<JobTask>lambdaUpdate()
.lt(JobTask::getRetryCount, job.getMaxRetryTimes())
.eq(JobTask::getId, context.getTaskId())
));
if (success) {
RealJobExecutorDTO realJobExecutor = JobTaskConverter.INSTANCE.toRealJobExecutorDTO(JobTaskConverter.INSTANCE.toJobExecutorContext(job), jobTask);
realJobExecutor.setClientId(ClientInfoUtils.clientId(jobTask.getClientInfo()));
ActorRef actorRef = ActorGenerator.jobRealTaskExecutorActor();
actorRef.tell(realJobExecutor, actorRef);
// TODO 记录日志
}
return;
}
}
JobExecutorResultDTO jobExecutorResultDTO = JobTaskConverter.INSTANCE.toJobExecutorResultDTO(context); JobExecutorResultDTO jobExecutorResultDTO = JobTaskConverter.INSTANCE.toJobExecutorResultDTO(context);
jobExecutorResultDTO.setTaskId(context.getTaskId()); jobExecutorResultDTO.setTaskId(context.getTaskId());
jobExecutorResultDTO.setMessage(context.getExecuteResult().getMessage()); jobExecutorResultDTO.setMessage(context.getExecuteResult().getMessage());

View File

@ -1,6 +1,8 @@
package com.aizuda.easy.retry.server.job.task.support.callback; package com.aizuda.easy.retry.server.job.task.support.callback;
import com.aizuda.easy.retry.client.model.ExecuteResult; import com.aizuda.easy.retry.client.model.ExecuteResult;
import com.aizuda.easy.retry.template.datasource.persistence.po.Job;
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask;
import lombok.Data; import lombok.Data;
/** /**
@ -35,5 +37,11 @@ public class ClientCallbackContext {
private ExecuteResult executeResult; private ExecuteResult executeResult;
private String clientInfo;
private Job job;
private JobTask jobTask;
} }

View File

@ -3,11 +3,14 @@ package com.aizuda.easy.retry.server.job.task.support.callback;
import akka.actor.ActorRef; import akka.actor.ActorRef;
import com.aizuda.easy.retry.common.core.enums.JobTaskStatusEnum; import com.aizuda.easy.retry.common.core.enums.JobTaskStatusEnum;
import com.aizuda.easy.retry.common.core.enums.JobTaskTypeEnum; import com.aizuda.easy.retry.common.core.enums.JobTaskTypeEnum;
import com.aizuda.easy.retry.common.log.EasyRetryLog;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator; import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.cache.CacheRegisterTable;
import com.aizuda.easy.retry.server.common.dto.RegisterNodeInfo; import com.aizuda.easy.retry.server.common.dto.RegisterNodeInfo;
import com.aizuda.easy.retry.server.common.handler.ClientNodeAllocateHandler; import com.aizuda.easy.retry.server.common.handler.ClientNodeAllocateHandler;
import com.aizuda.easy.retry.server.common.util.ClientInfoUtils; import com.aizuda.easy.retry.server.common.util.ClientInfoUtils;
import com.aizuda.easy.retry.server.job.task.dto.JobExecutorResultDTO; import com.aizuda.easy.retry.server.job.task.dto.JobExecutorResultDTO;
import com.aizuda.easy.retry.server.job.task.dto.LogMetaDTO;
import com.aizuda.easy.retry.server.job.task.dto.RealJobExecutorDTO; import com.aizuda.easy.retry.server.job.task.dto.RealJobExecutorDTO;
import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter; import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper; import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobMapper;
@ -21,6 +24,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.util.Objects; import java.util.Objects;
import java.util.Set;
/** /**
* @author www.byteblogs.com * @author www.byteblogs.com
@ -43,43 +47,23 @@ public class ClusterClientCallbackHandler extends AbstractClientCallbackHandler
return JobTaskTypeEnum.CLUSTER; return JobTaskTypeEnum.CLUSTER;
} }
@Override
protected String chooseNewClient(ClientCallbackContext context) {
// 选择重试的节点
RegisterNodeInfo serverNode = clientNodeAllocateHandler.getServerNode(context.getJobId().toString(),
context.getGroupName(), context.getNamespaceId(), context.getJob().getRouteKey());
if (Objects.isNull(serverNode)) {
log.error("无可执行的客户端信息. jobId:[{}]", context.getJobId());
return null;
}
return ClientInfoUtils.generate(serverNode);
}
@Override @Override
protected void doCallback(ClientCallbackContext context) { protected void doCallback(ClientCallbackContext context) {
if (context.getTaskStatus().equals(JobTaskStatusEnum.FAIL.getStatus())) {
JobTask jobTask = jobTaskMapper.selectById(context.getTaskId());
Job job = jobMapper.selectById(context.getJobId());
if (jobTask == null || job == null) {
return;
}
if (jobTask.getRetryCount() < job.getMaxRetryTimes()) {
// 选择重试的节点
RegisterNodeInfo serverNode = clientNodeAllocateHandler.getServerNode(context.getJobId().toString(),
context.getGroupName(), context.getNamespaceId(), job.getRouteKey());
if (Objects.isNull(serverNode)) {
log.error("无可执行的客户端信息. jobId:[{}]", context.getJobId());
return;
}
String newClient = ClientInfoUtils.generate(serverNode);
// 更新重试次数
JobTask updateJobTask = new JobTask();
updateJobTask.setClientInfo(newClient);
updateJobTask.setRetryCount(1);
boolean success = SqlHelper.retBool(jobTaskMapper.update(updateJobTask, Wrappers.<JobTask>lambdaUpdate()
.lt(JobTask::getRetryCount, job.getMaxRetryTimes())
.eq(JobTask::getId, context.getTaskId())
));
// 更新成功执行重试
if (success) {
RealJobExecutorDTO realJobExecutor = JobTaskConverter.INSTANCE.toRealJobExecutorDTO(JobTaskConverter.INSTANCE.toJobExecutorContext(job), jobTask);
realJobExecutor.setClientId(ClientInfoUtils.clientId(newClient));
ActorRef actorRef = ActorGenerator.jobRealTaskExecutorActor();
actorRef.tell(realJobExecutor, actorRef);
// TODO 记录日志
}
return;
}
}
JobExecutorResultDTO jobExecutorResultDTO = JobTaskConverter.INSTANCE.toJobExecutorResultDTO(context); JobExecutorResultDTO jobExecutorResultDTO = JobTaskConverter.INSTANCE.toJobExecutorResultDTO(context);
jobExecutorResultDTO.setTaskId(context.getTaskId()); jobExecutorResultDTO.setTaskId(context.getTaskId());
jobExecutorResultDTO.setMessage(context.getExecuteResult().getMessage()); jobExecutorResultDTO.setMessage(context.getExecuteResult().getMessage());

View File

@ -45,40 +45,6 @@ public class ShardingClientCallbackHandler extends AbstractClientCallbackHandler
@Override @Override
protected void doCallback(final ClientCallbackContext context) { protected void doCallback(final ClientCallbackContext context) {
if (context.getTaskStatus().equals(JobTaskStatusEnum.FAIL.getStatus())) {
JobTask jobTask = jobTaskMapper.selectById(context.getTaskId());
Job job = jobMapper.selectById(context.getJobId());
if (jobTask == null || job == null) {
return;
}
if (jobTask.getRetryCount() < job.getMaxRetryTimes()) {
Set<RegisterNodeInfo> nodes = CacheRegisterTable.getServerNodeSet(context.getGroupName(), context.getNamespaceId());
if (CollUtil.isEmpty(nodes)) {
log.error("无可执行的客户端信息. jobId:[{}]", context.getJobId());
return;
}
RegisterNodeInfo serverNode = RandomUtil.randomEle(nodes.toArray(new RegisterNodeInfo[0]));
String newClient = ClientInfoUtils.generate(serverNode);
// 更新重试次数
JobTask updateJobTask = new JobTask();
updateJobTask.setClientInfo(newClient);
updateJobTask.setRetryCount(1);
boolean success = SqlHelper.retBool(
jobTaskMapper.update(updateJobTask, Wrappers.<JobTask>lambdaUpdate()
.lt(JobTask::getRetryCount, job.getMaxRetryTimes())
.eq(JobTask::getId, context.getTaskId())
)
);
if (success) {
RealJobExecutorDTO realJobExecutor = JobTaskConverter.INSTANCE.toRealJobExecutorDTO(JobTaskConverter.INSTANCE.toJobExecutorContext(job), jobTask);
realJobExecutor.setClientId(ClientInfoUtils.clientId(newClient));
ActorRef actorRef = ActorGenerator.jobRealTaskExecutorActor();
actorRef.tell(realJobExecutor, actorRef);
// TODO 记录日志
}
return;
}
}
JobExecutorResultDTO jobExecutorResultDTO = JobTaskConverter.INSTANCE.toJobExecutorResultDTO(context); JobExecutorResultDTO jobExecutorResultDTO = JobTaskConverter.INSTANCE.toJobExecutorResultDTO(context);
jobExecutorResultDTO.setTaskId(context.getTaskId()); jobExecutorResultDTO.setTaskId(context.getTaskId());
@ -90,4 +56,15 @@ public class ShardingClientCallbackHandler extends AbstractClientCallbackHandler
actorRef.tell(jobExecutorResultDTO, actorRef); actorRef.tell(jobExecutorResultDTO, actorRef);
} }
@Override
protected String chooseNewClient(ClientCallbackContext context) {
Set<RegisterNodeInfo> nodes = CacheRegisterTable.getServerNodeSet(context.getGroupName(), context.getNamespaceId());
if (CollUtil.isEmpty(nodes)) {
log.error("无可执行的客户端信息. jobId:[{}]", context.getJobId());
return null;
}
RegisterNodeInfo serverNode = RandomUtil.randomEle(nodes.toArray(new RegisterNodeInfo[0]));
return ClientInfoUtils.generate(serverNode);
}
} }

View File

@ -88,10 +88,10 @@ public class JobExecutorResultActor extends AbstractActor {
} }
}); });
LogMetaDTO logMetaDTO = JobTaskConverter.INSTANCE.toJobLogDTO(result); // LogMetaDTO logMetaDTO = JobTaskConverter.INSTANCE.toJobLogDTO(result);
// 防止客户端日志还未上报完成导致日志时序错误 // // 防止客户端日志还未上报完成导致日志时序错误
logMetaDTO.setTimestamp(DateUtils.toEpochMilli(LocalDateTime.now().plusHours(1))); // logMetaDTO.setTimestamp(DateUtils.toEpochMilli(LocalDateTime.now().plusHours(1)));
EasyRetryLog.REMOTE.info("taskId:[{}] 任务执行成功. <|>{}<|>", logMetaDTO.getTaskId(), logMetaDTO); // EasyRetryLog.REMOTE.info("taskId:[{}] 任务执行成功. <|>{}<|>", logMetaDTO.getTaskId(), logMetaDTO);
} catch (Exception e) { } catch (Exception e) {
LogUtils.error(log, " job executor result exception. [{}]", result, e); LogUtils.error(log, " job executor result exception. [{}]", result, e);

View File

@ -14,6 +14,7 @@ import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.cache.CacheRegisterTable; import com.aizuda.easy.retry.server.common.cache.CacheRegisterTable;
import com.aizuda.easy.retry.server.common.client.RequestBuilder; import com.aizuda.easy.retry.server.common.client.RequestBuilder;
import com.aizuda.easy.retry.server.common.dto.RegisterNodeInfo; import com.aizuda.easy.retry.server.common.dto.RegisterNodeInfo;
import com.aizuda.easy.retry.server.common.util.DateUtils;
import com.aizuda.easy.retry.server.job.task.client.JobRpcClient; import com.aizuda.easy.retry.server.job.task.client.JobRpcClient;
import com.aizuda.easy.retry.server.job.task.dto.JobExecutorResultDTO; import com.aizuda.easy.retry.server.job.task.dto.JobExecutorResultDTO;
import com.aizuda.easy.retry.server.job.task.dto.JobLogDTO; import com.aizuda.easy.retry.server.job.task.dto.JobLogDTO;
@ -69,10 +70,12 @@ public class RequestClientActor extends AbstractActor {
try { try {
// 构建请求客户端对象 // 构建请求客户端对象
Long timestamp = DateUtils.toNowMilli();
JobRpcClient rpcClient = buildRpcClient(registerNodeInfo, realJobExecutorDTO); JobRpcClient rpcClient = buildRpcClient(registerNodeInfo, realJobExecutorDTO);
Result<Boolean> dispatch = rpcClient.dispatch(dispatchJobRequest); Result<Boolean> dispatch = rpcClient.dispatch(dispatchJobRequest);
if (dispatch.getStatus() == StatusEnum.YES.getStatus() && Objects.equals(dispatch.getData(), Boolean.TRUE)) { if (dispatch.getStatus() == StatusEnum.YES.getStatus() && Objects.equals(dispatch.getData(), Boolean.TRUE)) {
LogMetaDTO logMetaDTO = JobTaskConverter.INSTANCE.toJobLogDTO(realJobExecutorDTO); LogMetaDTO logMetaDTO = JobTaskConverter.INSTANCE.toJobLogDTO(realJobExecutorDTO);
logMetaDTO.setTimestamp(timestamp);
EasyRetryLog.REMOTE.info("taskId:[{}] 任务调度成功. <|>{}<|>", logMetaDTO.getTaskId(), logMetaDTO); EasyRetryLog.REMOTE.info("taskId:[{}] 任务调度成功. <|>{}<|>", logMetaDTO.getTaskId(), logMetaDTO);
} else { } else {
// 客户端返回失败则认为任务执行失败 // 客户端返回失败则认为任务执行失败

View File

@ -92,9 +92,7 @@ public class CallbackWorkflowExecutor extends AbstractWorkflowExecutor {
result = exchange.getBody(); result = exchange.getBody();
log.info("回调结果. webHook:[{}],参数: [{}]", decisionConfig.getWebhook(), result); log.info("回调结果. webHook:[{}],参数: [{}]", decisionConfig.getWebhook(), result);
} catch (Exception e) { } catch (Exception e) {
// log.error("回调异常. webHook:[{}],参数: [{}]", decisionConfig.getWebhook(), context.getTaskResult(), e); EasyRetryLog.REMOTE.error("回调异常. webHook:[{}],参数: [{}]", decisionConfig.getWebhook(), context.getTaskResult(), e);
EasyRetryLog.LOCAL.error("回调异常. webHook:[{}],参数: [{}]", decisionConfig.getWebhook(), context.getTaskResult(), e);
taskBatchStatus = JobTaskBatchStatusEnum.FAIL.getStatus(); taskBatchStatus = JobTaskBatchStatusEnum.FAIL.getStatus();
operationReason = JobOperationReasonEnum.WORKFLOW_CALLBACK_NODE_EXECUTOR_ERROR.getReason(); operationReason = JobOperationReasonEnum.WORKFLOW_CALLBACK_NODE_EXECUTOR_ERROR.getReason();
jobTaskStatus = JobTaskStatusEnum.FAIL.getStatus(); jobTaskStatus = JobTaskStatusEnum.FAIL.getStatus();

View File

@ -27,6 +27,11 @@ public class JobBatchResponseVO {
*/ */
private String jobName; private String jobName;
/**
* 工作流节点名称
*/
private String nodeName;
/** /**
* 任务信息id * 任务信息id
*/ */

View File

@ -102,25 +102,26 @@ public class JobBatchServiceImpl implements JobBatchService {
return null; return null;
} }
if (jobTaskBatch.getTaskType().equals(TaskTypeEnum.JOB.getType())) { Job job = jobMapper.selectById(jobTaskBatch.getJobId());
Job job = jobMapper.selectById(jobTaskBatch.getJobId()); JobBatchResponseVO jobBatchResponseVO = JobBatchResponseVOConverter.INSTANCE.toJobBatchResponseVO(jobTaskBatch, job);
return JobBatchResponseVOConverter.INSTANCE.toJobBatchResponseVO(jobTaskBatch, job);
}
JobBatchResponseVO jobBatchResponseVO = JobBatchResponseVOConverter.INSTANCE.toJobBatchResponseVO(jobTaskBatch); if (jobTaskBatch.getTaskType().equals(TaskTypeEnum.WORKFLOW.getType())) {
// 回调节点
if (SystemConstants.CALLBACK_JOB_ID.equals(jobTaskBatch.getJobId())) {
WorkflowNode workflowNode = workflowNodeMapper.selectById(jobTaskBatch.getWorkflowNodeId()); WorkflowNode workflowNode = workflowNodeMapper.selectById(jobTaskBatch.getWorkflowNodeId());
jobBatchResponseVO.setJobName(workflowNode.getNodeName()); jobBatchResponseVO.setNodeName(workflowNode.getNodeName());
jobBatchResponseVO.setCallback(JsonUtil.parseObject(workflowNode.getNodeInfo(), CallbackConfig.class));
}
// 条件节点 // 回调节点
if (SystemConstants.DECISION_JOB_ID.equals(jobTaskBatch.getJobId())) { if (SystemConstants.CALLBACK_JOB_ID.equals(jobTaskBatch.getJobId())) {
WorkflowNode workflowNode = workflowNodeMapper.selectById(jobTaskBatch.getWorkflowNodeId()); jobBatchResponseVO.setCallback(JsonUtil.parseObject(workflowNode.getNodeInfo(), CallbackConfig.class));
jobBatchResponseVO.setJobName(workflowNode.getNodeName()); jobBatchResponseVO.setExecutionAt(jobTaskBatch.getCreateDt());
jobBatchResponseVO.setDecision(JsonUtil.parseObject(workflowNode.getNodeInfo(), DecisionConfig.class)); return jobBatchResponseVO;
}
// 条件节点
if (SystemConstants.DECISION_JOB_ID.equals(jobTaskBatch.getJobId())) {
jobBatchResponseVO.setDecision(JsonUtil.parseObject(workflowNode.getNodeInfo(), DecisionConfig.class));
jobBatchResponseVO.setExecutionAt(jobTaskBatch.getCreateDt());
return jobBatchResponseVO;
}
} }
return jobBatchResponseVO; return jobBatchResponseVO;

View File

@ -1,14 +1,18 @@
package com.aizuda.easy.retry.server.web.service.impl; package com.aizuda.easy.retry.server.web.service.impl;
import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.easy.retry.common.core.util.JsonUtil; import com.aizuda.easy.retry.common.core.util.JsonUtil;
import com.aizuda.easy.retry.server.web.model.request.JobLogQueryVO; import com.aizuda.easy.retry.server.web.model.request.JobLogQueryVO;
import com.aizuda.easy.retry.server.web.model.response.JobLogResponseVO; import com.aizuda.easy.retry.server.web.model.response.JobLogResponseVO;
import com.aizuda.easy.retry.server.web.service.JobLogService; import com.aizuda.easy.retry.server.web.service.JobLogService;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobLogMessageMapper; import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobLogMessageMapper;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskBatchMapper;
import com.aizuda.easy.retry.template.datasource.persistence.po.JobLogMessage; import com.aizuda.easy.retry.template.datasource.persistence.po.JobLogMessage;
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTaskBatch;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO; import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils; import org.springframework.util.CollectionUtils;
@ -23,10 +27,10 @@ import java.util.stream.Collectors;
* @since 2.4.0 * @since 2.4.0
*/ */
@Service @Service
@RequiredArgsConstructor
public class JobLogServiceImpl implements JobLogService { public class JobLogServiceImpl implements JobLogService {
private final JobLogMessageMapper jobLogMessageMapper;
@Autowired private final JobTaskBatchMapper jobTaskBatchMapper;
private JobLogMessageMapper jobLogMessageMapper;
@Override @Override
public JobLogResponseVO getJobLogPage(final JobLogQueryVO queryVO) { public JobLogResponseVO getJobLogPage(final JobLogQueryVO queryVO) {
@ -35,16 +39,25 @@ public class JobLogServiceImpl implements JobLogService {
LambdaQueryWrapper<JobLogMessage> queryWrapper = new LambdaQueryWrapper<>(); LambdaQueryWrapper<JobLogMessage> queryWrapper = new LambdaQueryWrapper<>();
queryWrapper queryWrapper
.select(JobLogMessage::getId, JobLogMessage::getLogNum) .select(JobLogMessage::getId, JobLogMessage::getLogNum)
.ge(JobLogMessage::getId, queryVO.getStartId()) .ge(JobLogMessage::getId, queryVO.getStartId())
.eq(JobLogMessage::getTaskId, queryVO.getTaskId()); .eq(JobLogMessage::getTaskId, queryVO.getTaskId());
queryWrapper.orderByAsc(JobLogMessage::getRealTime).orderByDesc(JobLogMessage::getId); queryWrapper.orderByAsc(JobLogMessage::getRealTime).orderByDesc(JobLogMessage::getId);
PageDTO<JobLogMessage> selectPage = jobLogMessageMapper.selectPage(pageDTO, queryWrapper); PageDTO<JobLogMessage> selectPage = jobLogMessageMapper.selectPage(pageDTO, queryWrapper);
List<JobLogMessage> records = selectPage.getRecords(); List<JobLogMessage> records = selectPage.getRecords();
if (CollectionUtils.isEmpty(records)) { if (CollectionUtils.isEmpty(records)) {
Long count = jobTaskBatchMapper.selectCount(
new LambdaQueryWrapper<JobTaskBatch>()
.in(JobTaskBatch::getTaskBatchStatus, JobTaskBatchStatusEnum.COMPLETED)
);
JobLogResponseVO jobLogResponseVO = new JobLogResponseVO(); JobLogResponseVO jobLogResponseVO = new JobLogResponseVO();
jobLogResponseVO.setFinished(Boolean.TRUE); if (count > 0) {
jobLogResponseVO.setFinished(Boolean.TRUE);
}
return jobLogResponseVO; return jobLogResponseVO;
} }
@ -70,7 +83,7 @@ public class JobLogServiceImpl implements JobLogService {
List<String> originalList = JsonUtil.parseObject(jobLogMessage.getMessage(), List.class); List<String> originalList = JsonUtil.parseObject(jobLogMessage.getMessage(), List.class);
int size = originalList.size() - fromIndex; int size = originalList.size() - fromIndex;
List<String> pageList = originalList.stream().skip(fromIndex).limit(queryVO.getSize()) List<String> pageList = originalList.stream().skip(fromIndex).limit(queryVO.getSize())
.collect(Collectors.toList()); .collect(Collectors.toList());
if (messages.size() + size >= queryVO.getSize()) { if (messages.size() + size >= queryVO.getSize()) {
messages.addAll(pageList); messages.addAll(pageList);

View File

@ -31,6 +31,7 @@ import com.aizuda.easy.retry.server.web.model.request.SystemUserQueryVO;
import com.aizuda.easy.retry.server.web.model.request.SystemUserRequestVO; import com.aizuda.easy.retry.server.web.model.request.SystemUserRequestVO;
import com.aizuda.easy.retry.server.web.model.response.SystemUserResponseVO; import com.aizuda.easy.retry.server.web.model.response.SystemUserResponseVO;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.annotation.Transactional;
@ -48,18 +49,14 @@ import java.util.stream.Collectors;
* @since 2022-03-05 * @since 2022-03-05
*/ */
@Service @Service
@RequiredArgsConstructor
public class SystemUserServiceImpl implements SystemUserService { public class SystemUserServiceImpl implements SystemUserService {
public static final long EXPIRE_TIME = 3600 * 24 * 1000;
public static final long EXPIRE_TIME = 3600 * 7 * 1000; private final SystemUserMapper systemUserMapper;
private final SystemUserPermissionMapper systemUserPermissionMapper;
@Autowired private final NamespaceMapper namespaceMapper;
private SystemUserMapper systemUserMapper; private final SystemProperties systemProperties;
@Autowired
private SystemUserPermissionMapper systemUserPermissionMapper;
@Autowired
private NamespaceMapper namespaceMapper;
@Autowired
private SystemProperties systemProperties;
@Override @Override
public SystemUserResponseVO login(SystemUserRequestVO requestVO) { public SystemUserResponseVO login(SystemUserRequestVO requestVO) {

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@ -5,8 +5,8 @@
<meta charset="UTF-8" /> <meta charset="UTF-8" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" /> <meta name="viewport" content="width=device-width, initial-scale=1.0" />
<title>Easy Retry</title> <title>Easy Retry</title>
<script type="module" crossorigin src="./assets/MlqLg9-Y.js"></script> <script type="module" crossorigin src="./assets/YpO3912B.js"></script>
<link rel="stylesheet" crossorigin href="./assets/vhU7tY2l.css"> <link rel="stylesheet" crossorigin href="./assets/yQfA4Ykm.css">
</head> </head>
<body> <body>