diff --git a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/client/JobEndPoint.java b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/client/JobEndPoint.java index 629d3bfc9..1fab5ffa3 100644 --- a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/client/JobEndPoint.java +++ b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/client/JobEndPoint.java @@ -126,7 +126,7 @@ public class JobEndPoint { jobContext.setExecutorTimeout(dispatchJob.getExecutorTimeout()); jobContext.setWorkflowNodeId(dispatchJob.getWorkflowNodeId()); jobContext.setWorkflowTaskBatchId(dispatchJob.getWorkflowTaskBatchId()); - jobContext.setRetry(dispatchJob.isRetry()); + jobContext.setRetryStatus(dispatchJob.getRetryStatus()); jobContext.setRetryScene(dispatchJob.getRetryScene()); jobContext.setTaskName(dispatchJob.getTaskName()); jobContext.setMrStage(dispatchJob.getMrStage()); diff --git a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/JobExecutorFutureCallback.java b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/JobExecutorFutureCallback.java index 3fe62d68e..922c5443c 100644 --- a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/JobExecutorFutureCallback.java +++ b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/JobExecutorFutureCallback.java @@ -158,7 +158,7 @@ public class JobExecutorFutureCallback implements FutureCallback dispatchJobRequest.setTaskType(jobContext.getTaskType()); dispatchJobRequest.setExecuteResult(executeResult); dispatchJobRequest.setTaskStatus(status); - dispatchJobRequest.setRetry(jobContext.isRetry()); + dispatchJobRequest.setRetryStatus(jobContext.getRetryStatus()); dispatchJobRequest.setRetryScene(jobContext.getRetryScene()); // 传递变更后的上下文 if (CollUtil.isNotEmpty(jobContext.getChangeWfContext())) { diff --git a/snail-job-common/snail-job-common-client-api/src/main/java/com/aizuda/snailjob/client/model/request/DispatchJobRequest.java b/snail-job-common/snail-job-common-client-api/src/main/java/com/aizuda/snailjob/client/model/request/DispatchJobRequest.java index d3b5cb56b..bfe4ed803 100644 --- a/snail-job-common/snail-job-common-client-api/src/main/java/com/aizuda/snailjob/client/model/request/DispatchJobRequest.java +++ b/snail-job-common/snail-job-common-client-api/src/main/java/com/aizuda/snailjob/client/model/request/DispatchJobRequest.java @@ -68,8 +68,14 @@ public class DispatchJobRequest { /** * 是否是重试流量 */ + @Deprecated private boolean isRetry; + /** + * 是否是重试流量 + */ + private Boolean retryStatus = Boolean.FALSE; + /** * 工作流上下文 */ diff --git a/snail-job-common/snail-job-common-client-api/src/main/java/com/aizuda/snailjob/client/model/request/DispatchJobResultRequest.java b/snail-job-common/snail-job-common-client-api/src/main/java/com/aizuda/snailjob/client/model/request/DispatchJobResultRequest.java index 701d4fa2f..b1766512e 100644 --- a/snail-job-common/snail-job-common-client-api/src/main/java/com/aizuda/snailjob/client/model/request/DispatchJobResultRequest.java +++ b/snail-job-common/snail-job-common-client-api/src/main/java/com/aizuda/snailjob/client/model/request/DispatchJobResultRequest.java @@ -39,8 +39,14 @@ public class DispatchJobResultRequest { /** * 是否是重试流量 */ + @Deprecated private boolean isRetry; + /** + * 是否是重试流量 + */ + private Boolean retryStatus = Boolean.FALSE; + /** * 工作流上下文 */ diff --git a/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/model/JobContext.java b/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/model/JobContext.java index 432804dba..56d49ddea 100644 --- a/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/model/JobContext.java +++ b/snail-job-common/snail-job-common-core/src/main/java/com/aizuda/snailjob/common/core/model/JobContext.java @@ -48,7 +48,7 @@ public class JobContext { /** * 是否是重试流量 */ - private boolean isRetry; + private Boolean retryStatus = Boolean.FALSE; /** * Map集合列表 diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/dto/CompleteJobBatchDTO.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/dto/CompleteJobBatchDTO.java index 92e0eefe4..d0ea841c9 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/dto/CompleteJobBatchDTO.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/dto/CompleteJobBatchDTO.java @@ -20,6 +20,6 @@ public class CompleteJobBatchDTO extends BaseDTO { private Integer jobOperationReason; private Object result; private Integer taskType; - private boolean isRetry; + private Boolean retryStatus; } diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/dto/JobExecutorResultDTO.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/dto/JobExecutorResultDTO.java index 6667e5432..b2dea893e 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/dto/JobExecutorResultDTO.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/dto/JobExecutorResultDTO.java @@ -44,6 +44,6 @@ public class JobExecutorResultDTO { private String wfContext; - private boolean isRetry; + private Boolean retryStatus = Boolean.FALSE; } diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/dto/RealJobExecutorDTO.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/dto/RealJobExecutorDTO.java index 620e144c4..a443498cf 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/dto/RealJobExecutorDTO.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/dto/RealJobExecutorDTO.java @@ -97,7 +97,7 @@ public class RealJobExecutorDTO extends BaseDTO { /** * 是否是重试流量 */ - private boolean isRetry; + private Boolean retryStatus = Boolean.FALSE; /** * 工作流上下文 diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/callback/AbstractClientCallbackHandler.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/callback/AbstractClientCallbackHandler.java index 06c70172d..f39708702 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/callback/AbstractClientCallbackHandler.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/callback/AbstractClientCallbackHandler.java @@ -57,7 +57,7 @@ public abstract class AbstractClientCallbackHandler implements ClientCallbackHan realJobExecutor.setWorkflowNodeId(context.getWorkflowNodeId()); realJobExecutor.setWorkflowTaskBatchId(context.getWorkflowTaskBatchId()); realJobExecutor.setRetryCount(jobTask.getRetryCount() + 1); - realJobExecutor.setRetry(Boolean.TRUE); + realJobExecutor.setRetryStatus(Boolean.TRUE); realJobExecutor.setRetryScene(context.getRetryScene()); realJobExecutor.setTaskName(jobTask.getTaskName()); // 这里统一收口传递上下文 @@ -141,7 +141,7 @@ public abstract class AbstractClientCallbackHandler implements ClientCallbackHan // 手动重试策略 if (Objects.nonNull(context.getRetryScene()) && Objects.equals(JobRetrySceneEnum.MANUAL.getRetryScene(), context.getRetryScene()) - && !context.isRetry()) { + && !context.getRetryStatus()) { return Boolean.TRUE; } diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/callback/ClientCallbackContext.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/callback/ClientCallbackContext.java index 178df6eed..7ddee1062 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/callback/ClientCallbackContext.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/callback/ClientCallbackContext.java @@ -45,8 +45,14 @@ public class ClientCallbackContext { private Integer retryScene; + @Deprecated private boolean isRetry; + /** + * 是否是重试流量 + */ + private Boolean retryStatus = Boolean.FALSE; + /** * 工作流上下文 */ diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorResultActor.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorResultActor.java index 8918fac10..393e9c77c 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorResultActor.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/JobExecutorResultActor.java @@ -10,7 +10,6 @@ import com.aizuda.snailjob.server.common.exception.SnailJobServerException; import com.aizuda.snailjob.server.job.task.dto.CompleteJobBatchDTO; import com.aizuda.snailjob.server.job.task.dto.JobExecutorResultDTO; import com.aizuda.snailjob.server.job.task.support.JobTaskConverter; -import com.aizuda.snailjob.server.job.task.support.handler.DistributedLockHandler; import com.aizuda.snailjob.server.job.task.support.handler.JobTaskBatchHandler; import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskMapper; import com.aizuda.snailjob.template.datasource.persistence.po.JobTask; @@ -20,8 +19,6 @@ import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.context.annotation.Scope; import org.springframework.stereotype.Component; -import java.text.MessageFormat; -import java.time.Duration; import java.util.Objects; /** @@ -33,10 +30,8 @@ import java.util.Objects; @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) @RequiredArgsConstructor public class JobExecutorResultActor extends AbstractActor { - private static final String KEY = "job_complete_{0}_{1}"; private final JobTaskMapper jobTaskMapper; private final JobTaskBatchHandler jobTaskBatchHandler; - private final DistributedLockHandler distributedLockHandler; @Override public Receive createReceive() { @@ -68,14 +63,7 @@ public class JobExecutorResultActor extends AbstractActor { return; } - boolean tryCompleteAndStop = tryCompleteAndStop(result); -// if (!tryCompleteAndStop) { -// // 存在并发问题 -// distributedLockHandler.lockWithDisposableAndRetry(() -> { -// tryCompleteAndStop(result); -// }, MessageFormat.format(KEY, result.getTaskBatchId(), -// result.getJobId()), Duration.ofSeconds(1), Duration.ofSeconds(1), 3); -// } + tryCompleteAndStop(result); } catch (Exception e) { SnailJobLog.LOCAL.error(" job executor result exception. [{}]", result, e); } finally { @@ -86,8 +74,8 @@ public class JobExecutorResultActor extends AbstractActor { } - private boolean tryCompleteAndStop(JobExecutorResultDTO result) { + private void tryCompleteAndStop(JobExecutorResultDTO result) { CompleteJobBatchDTO completeJobBatchDTO = JobTaskConverter.INSTANCE.toCompleteJobBatchDTO(result); - return jobTaskBatchHandler.handleResult(completeJobBatchDTO); + jobTaskBatchHandler.handleResult(completeJobBatchDTO); } } diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/job/RequestClientActor.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/job/RequestClientActor.java index 8ad5859c3..7b2a220a2 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/job/RequestClientActor.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/executor/job/RequestClientActor.java @@ -66,7 +66,7 @@ public class RequestClientActor extends AbstractActor { taskExecuteFailure(realJobExecutorDTO, "客户端不存在"); JobLogMetaDTO jobLogMetaDTO = JobTaskConverter.INSTANCE.toJobLogDTO(realJobExecutorDTO); jobLogMetaDTO.setTimestamp(nowMilli); - if (realJobExecutorDTO.isRetry()) { + if (realJobExecutorDTO.getRetryStatus()) { SnailJobLog.REMOTE.error("taskId:[{}] 任务调度失败执行重试. 失败原因: 无可执行的客户端. 重试次数:[{}]. <|>{}<|>", realJobExecutorDTO.getTaskId(), realJobExecutorDTO.getRetryCount(), jobLogMetaDTO); } else { @@ -78,6 +78,9 @@ public class RequestClientActor extends AbstractActor { DispatchJobRequest dispatchJobRequest = JobTaskConverter.INSTANCE.toDispatchJobRequest(realJobExecutorDTO); + // 兼容历史客户端版本正式版本即可删除 + dispatchJobRequest.setRetry(realJobExecutorDTO.getRetryStatus()); + try { // 构建请求客户端对象 JobRpcClient rpcClient = buildRpcClient(registerNodeInfo, realJobExecutorDTO); @@ -107,7 +110,7 @@ public class RequestClientActor extends AbstractActor { JobLogMetaDTO jobLogMetaDTO = JobTaskConverter.INSTANCE.toJobLogDTO(realJobExecutorDTO); jobLogMetaDTO.setTimestamp(nowMilli); - if (realJobExecutorDTO.isRetry()) { + if (realJobExecutorDTO.getRetryStatus()) { SnailJobLog.REMOTE.error("taskId:[{}] 任务调度失败执行重试 重试次数:[{}]. <|>{}<|>", jobLogMetaDTO.getTaskId(), realJobExecutorDTO.getRetryCount(), jobLogMetaDTO, throwable); } else { @@ -147,7 +150,7 @@ public class RequestClientActor extends AbstractActor { private JobRpcClient buildRpcClient(RegisterNodeInfo registerNodeInfo, RealJobExecutorDTO realJobExecutorDTO) { int maxRetryTimes = realJobExecutorDTO.getMaxRetryTimes(); - boolean retry = realJobExecutorDTO.isRetry(); + boolean retry = realJobExecutorDTO.getRetryStatus(); return RequestBuilder.newBuilder() .nodeInfo(registerNodeInfo) .failRetry(maxRetryTimes > 0 && !retry) diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/handler/JobTaskBatchHandler.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/handler/JobTaskBatchHandler.java index e27a5d294..b4d7f4d75 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/handler/JobTaskBatchHandler.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/handler/JobTaskBatchHandler.java @@ -58,7 +58,7 @@ public class JobTaskBatchHandler { Assert.notNull(completeJobBatchDTO.getTaskType(), ()-> new SnailJobServerException("taskType can not be null")); // 非重试流量幂等处理 - if(!completeJobBatchDTO.isRetry()) { + if(!completeJobBatchDTO.getRetryStatus()) { // 幂等处理 Long countJobTaskBatch = jobTaskBatchMapper.selectCount(new LambdaQueryWrapper() .eq(JobTaskBatch::getId, completeJobBatchDTO.getTaskBatchId()) diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/request/MapTaskPostHttpRequestHandler.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/request/MapTaskPostHttpRequestHandler.java index 147b362f8..14e028489 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/request/MapTaskPostHttpRequestHandler.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/request/MapTaskPostHttpRequestHandler.java @@ -1,6 +1,7 @@ package com.aizuda.snailjob.server.job.task.support.request; import cn.hutool.core.collection.CollUtil; +import cn.hutool.core.lang.Assert; import cn.hutool.core.net.url.UrlQuery; import com.aizuda.snailjob.client.model.request.MapTaskRequest; import com.aizuda.snailjob.common.core.constant.SystemConstants; @@ -10,6 +11,7 @@ import com.aizuda.snailjob.common.core.model.SnailJobRpcResult; import com.aizuda.snailjob.common.core.model.SnailJobRequest; import com.aizuda.snailjob.common.core.util.JsonUtil; import com.aizuda.snailjob.common.log.SnailJobLog; +import com.aizuda.snailjob.server.common.exception.SnailJobServerException; import com.aizuda.snailjob.server.common.handler.PostHttpRequestHandler; import com.aizuda.snailjob.server.common.util.HttpHeaderUtil; import com.aizuda.snailjob.common.core.enums.MapReduceStageEnum; @@ -95,12 +97,13 @@ public class MapTaskPostHttpRequestHandler extends PostHttpRequestHandler { } String newWfContext = null; - if (Objects.nonNull(mapTaskRequest.getWorkflowTaskBatchId())) { + if (Objects.nonNull(mapTaskRequest.getWorkflowTaskBatchId()) && mapTaskRequest.getWorkflowTaskBatchId() > 0) { WorkflowTaskBatch workflowTaskBatch = workflowTaskBatchMapper.selectOne( new LambdaQueryWrapper() .select(WorkflowTaskBatch::getWfContext, WorkflowTaskBatch::getId) .eq(WorkflowTaskBatch::getId, mapTaskRequest.getWorkflowTaskBatchId()) ); + Assert.notNull(workflowTaskBatch, ()-> new SnailJobServerException("workflowTaskBatch is null. id:[{}]", mapTaskRequest.getWorkflowTaskBatchId())); newWfContext = workflowTaskBatch.getWfContext(); } diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/request/ReportDispatchResultPostHttpRequestHandler.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/request/ReportDispatchResultPostHttpRequestHandler.java index 7829f15ce..6797e12ee 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/request/ReportDispatchResultPostHttpRequestHandler.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/request/ReportDispatchResultPostHttpRequestHandler.java @@ -17,6 +17,8 @@ import io.netty.handler.codec.http.HttpHeaders; import io.netty.handler.codec.http.HttpMethod; import org.springframework.stereotype.Component; +import java.util.Objects; + import static com.aizuda.snailjob.common.core.constant.SystemConstants.HTTP_PATH.REPORT_JOB_DISPATCH_RESULT; /** @@ -49,6 +51,10 @@ public class ReportDispatchResultPostHttpRequestHandler extends PostHttpRequestH ClientCallbackHandler clientCallback = ClientCallbackFactory.getClientCallback(dispatchJobResultRequest.getTaskType()); ClientCallbackContext context = JobTaskConverter.INSTANCE.toClientCallbackContext(dispatchJobResultRequest); + // 兼容过度版本 + if (Objects.isNull(context.getRetryStatus())) { + context.setRetryStatus(context.isRetry()); + } context.setNamespaceId(headers.getAsString(HeadersEnum.NAMESPACE.getKey())); clientCallback.callback(context);