pom:(1.3.0-beta1): isRetry改为retryStatus

This commit is contained in:
opensnail 2024-11-25 21:13:25 +08:00
parent 7afe46e851
commit 69091e4f36
15 changed files with 46 additions and 28 deletions

View File

@ -126,7 +126,7 @@ public class JobEndPoint {
jobContext.setExecutorTimeout(dispatchJob.getExecutorTimeout());
jobContext.setWorkflowNodeId(dispatchJob.getWorkflowNodeId());
jobContext.setWorkflowTaskBatchId(dispatchJob.getWorkflowTaskBatchId());
jobContext.setRetry(dispatchJob.isRetry());
jobContext.setRetryStatus(dispatchJob.getRetryStatus());
jobContext.setRetryScene(dispatchJob.getRetryScene());
jobContext.setTaskName(dispatchJob.getTaskName());
jobContext.setMrStage(dispatchJob.getMrStage());

View File

@ -158,7 +158,7 @@ public class JobExecutorFutureCallback implements FutureCallback<ExecuteResult>
dispatchJobRequest.setTaskType(jobContext.getTaskType());
dispatchJobRequest.setExecuteResult(executeResult);
dispatchJobRequest.setTaskStatus(status);
dispatchJobRequest.setRetry(jobContext.isRetry());
dispatchJobRequest.setRetryStatus(jobContext.getRetryStatus());
dispatchJobRequest.setRetryScene(jobContext.getRetryScene());
// 传递变更后的上下文
if (CollUtil.isNotEmpty(jobContext.getChangeWfContext())) {

View File

@ -68,8 +68,14 @@ public class DispatchJobRequest {
/**
* 是否是重试流量
*/
@Deprecated
private boolean isRetry;
/**
* 是否是重试流量
*/
private Boolean retryStatus = Boolean.FALSE;
/**
* 工作流上下文
*/

View File

@ -39,8 +39,14 @@ public class DispatchJobResultRequest {
/**
* 是否是重试流量
*/
@Deprecated
private boolean isRetry;
/**
* 是否是重试流量
*/
private Boolean retryStatus = Boolean.FALSE;
/**
* 工作流上下文
*/

View File

@ -48,7 +48,7 @@ public class JobContext {
/**
* 是否是重试流量
*/
private boolean isRetry;
private Boolean retryStatus = Boolean.FALSE;
/**
* Map集合列表

View File

@ -20,6 +20,6 @@ public class CompleteJobBatchDTO extends BaseDTO {
private Integer jobOperationReason;
private Object result;
private Integer taskType;
private boolean isRetry;
private Boolean retryStatus;
}

View File

@ -44,6 +44,6 @@ public class JobExecutorResultDTO {
private String wfContext;
private boolean isRetry;
private Boolean retryStatus = Boolean.FALSE;
}

View File

@ -97,7 +97,7 @@ public class RealJobExecutorDTO extends BaseDTO {
/**
* 是否是重试流量
*/
private boolean isRetry;
private Boolean retryStatus = Boolean.FALSE;
/**
* 工作流上下文

View File

@ -57,7 +57,7 @@ public abstract class AbstractClientCallbackHandler implements ClientCallbackHan
realJobExecutor.setWorkflowNodeId(context.getWorkflowNodeId());
realJobExecutor.setWorkflowTaskBatchId(context.getWorkflowTaskBatchId());
realJobExecutor.setRetryCount(jobTask.getRetryCount() + 1);
realJobExecutor.setRetry(Boolean.TRUE);
realJobExecutor.setRetryStatus(Boolean.TRUE);
realJobExecutor.setRetryScene(context.getRetryScene());
realJobExecutor.setTaskName(jobTask.getTaskName());
// 这里统一收口传递上下文
@ -141,7 +141,7 @@ public abstract class AbstractClientCallbackHandler implements ClientCallbackHan
// 手动重试策略
if (Objects.nonNull(context.getRetryScene())
&& Objects.equals(JobRetrySceneEnum.MANUAL.getRetryScene(), context.getRetryScene())
&& !context.isRetry()) {
&& !context.getRetryStatus()) {
return Boolean.TRUE;
}

View File

@ -45,8 +45,14 @@ public class ClientCallbackContext {
private Integer retryScene;
@Deprecated
private boolean isRetry;
/**
* 是否是重试流量
*/
private Boolean retryStatus = Boolean.FALSE;
/**
* 工作流上下文
*/

View File

@ -10,7 +10,6 @@ import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
import com.aizuda.snailjob.server.job.task.dto.CompleteJobBatchDTO;
import com.aizuda.snailjob.server.job.task.dto.JobExecutorResultDTO;
import com.aizuda.snailjob.server.job.task.support.JobTaskConverter;
import com.aizuda.snailjob.server.job.task.support.handler.DistributedLockHandler;
import com.aizuda.snailjob.server.job.task.support.handler.JobTaskBatchHandler;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.JobTask;
@ -20,8 +19,6 @@ import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import java.text.MessageFormat;
import java.time.Duration;
import java.util.Objects;
/**
@ -33,10 +30,8 @@ import java.util.Objects;
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
@RequiredArgsConstructor
public class JobExecutorResultActor extends AbstractActor {
private static final String KEY = "job_complete_{0}_{1}";
private final JobTaskMapper jobTaskMapper;
private final JobTaskBatchHandler jobTaskBatchHandler;
private final DistributedLockHandler distributedLockHandler;
@Override
public Receive createReceive() {
@ -68,14 +63,7 @@ public class JobExecutorResultActor extends AbstractActor {
return;
}
boolean tryCompleteAndStop = tryCompleteAndStop(result);
// if (!tryCompleteAndStop) {
// // 存在并发问题
// distributedLockHandler.lockWithDisposableAndRetry(() -> {
// tryCompleteAndStop(result);
// }, MessageFormat.format(KEY, result.getTaskBatchId(),
// result.getJobId()), Duration.ofSeconds(1), Duration.ofSeconds(1), 3);
// }
tryCompleteAndStop(result);
} catch (Exception e) {
SnailJobLog.LOCAL.error(" job executor result exception. [{}]", result, e);
} finally {
@ -86,8 +74,8 @@ public class JobExecutorResultActor extends AbstractActor {
}
private boolean tryCompleteAndStop(JobExecutorResultDTO result) {
private void tryCompleteAndStop(JobExecutorResultDTO result) {
CompleteJobBatchDTO completeJobBatchDTO = JobTaskConverter.INSTANCE.toCompleteJobBatchDTO(result);
return jobTaskBatchHandler.handleResult(completeJobBatchDTO);
jobTaskBatchHandler.handleResult(completeJobBatchDTO);
}
}

View File

@ -66,7 +66,7 @@ public class RequestClientActor extends AbstractActor {
taskExecuteFailure(realJobExecutorDTO, "客户端不存在");
JobLogMetaDTO jobLogMetaDTO = JobTaskConverter.INSTANCE.toJobLogDTO(realJobExecutorDTO);
jobLogMetaDTO.setTimestamp(nowMilli);
if (realJobExecutorDTO.isRetry()) {
if (realJobExecutorDTO.getRetryStatus()) {
SnailJobLog.REMOTE.error("taskId:[{}] 任务调度失败执行重试. 失败原因: 无可执行的客户端. 重试次数:[{}]. <|>{}<|>",
realJobExecutorDTO.getTaskId(), realJobExecutorDTO.getRetryCount(), jobLogMetaDTO);
} else {
@ -78,6 +78,9 @@ public class RequestClientActor extends AbstractActor {
DispatchJobRequest dispatchJobRequest = JobTaskConverter.INSTANCE.toDispatchJobRequest(realJobExecutorDTO);
// 兼容历史客户端版本正式版本即可删除
dispatchJobRequest.setRetry(realJobExecutorDTO.getRetryStatus());
try {
// 构建请求客户端对象
JobRpcClient rpcClient = buildRpcClient(registerNodeInfo, realJobExecutorDTO);
@ -107,7 +110,7 @@ public class RequestClientActor extends AbstractActor {
JobLogMetaDTO jobLogMetaDTO = JobTaskConverter.INSTANCE.toJobLogDTO(realJobExecutorDTO);
jobLogMetaDTO.setTimestamp(nowMilli);
if (realJobExecutorDTO.isRetry()) {
if (realJobExecutorDTO.getRetryStatus()) {
SnailJobLog.REMOTE.error("taskId:[{}] 任务调度失败执行重试 重试次数:[{}]. <|>{}<|>", jobLogMetaDTO.getTaskId(),
realJobExecutorDTO.getRetryCount(), jobLogMetaDTO, throwable);
} else {
@ -147,7 +150,7 @@ public class RequestClientActor extends AbstractActor {
private JobRpcClient buildRpcClient(RegisterNodeInfo registerNodeInfo, RealJobExecutorDTO realJobExecutorDTO) {
int maxRetryTimes = realJobExecutorDTO.getMaxRetryTimes();
boolean retry = realJobExecutorDTO.isRetry();
boolean retry = realJobExecutorDTO.getRetryStatus();
return RequestBuilder.<JobRpcClient, Result>newBuilder()
.nodeInfo(registerNodeInfo)
.failRetry(maxRetryTimes > 0 && !retry)

View File

@ -58,7 +58,7 @@ public class JobTaskBatchHandler {
Assert.notNull(completeJobBatchDTO.getTaskType(), ()-> new SnailJobServerException("taskType can not be null"));
// 非重试流量幂等处理
if(!completeJobBatchDTO.isRetry()) {
if(!completeJobBatchDTO.getRetryStatus()) {
// 幂等处理
Long countJobTaskBatch = jobTaskBatchMapper.selectCount(new LambdaQueryWrapper<JobTaskBatch>()
.eq(JobTaskBatch::getId, completeJobBatchDTO.getTaskBatchId())

View File

@ -1,6 +1,7 @@
package com.aizuda.snailjob.server.job.task.support.request;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.net.url.UrlQuery;
import com.aizuda.snailjob.client.model.request.MapTaskRequest;
import com.aizuda.snailjob.common.core.constant.SystemConstants;
@ -10,6 +11,7 @@ import com.aizuda.snailjob.common.core.model.SnailJobRpcResult;
import com.aizuda.snailjob.common.core.model.SnailJobRequest;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
import com.aizuda.snailjob.server.common.handler.PostHttpRequestHandler;
import com.aizuda.snailjob.server.common.util.HttpHeaderUtil;
import com.aizuda.snailjob.common.core.enums.MapReduceStageEnum;
@ -95,12 +97,13 @@ public class MapTaskPostHttpRequestHandler extends PostHttpRequestHandler {
}
String newWfContext = null;
if (Objects.nonNull(mapTaskRequest.getWorkflowTaskBatchId())) {
if (Objects.nonNull(mapTaskRequest.getWorkflowTaskBatchId()) && mapTaskRequest.getWorkflowTaskBatchId() > 0) {
WorkflowTaskBatch workflowTaskBatch = workflowTaskBatchMapper.selectOne(
new LambdaQueryWrapper<WorkflowTaskBatch>()
.select(WorkflowTaskBatch::getWfContext, WorkflowTaskBatch::getId)
.eq(WorkflowTaskBatch::getId, mapTaskRequest.getWorkflowTaskBatchId())
);
Assert.notNull(workflowTaskBatch, ()-> new SnailJobServerException("workflowTaskBatch is null. id:[{}]", mapTaskRequest.getWorkflowTaskBatchId()));
newWfContext = workflowTaskBatch.getWfContext();
}

View File

@ -17,6 +17,8 @@ import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpMethod;
import org.springframework.stereotype.Component;
import java.util.Objects;
import static com.aizuda.snailjob.common.core.constant.SystemConstants.HTTP_PATH.REPORT_JOB_DISPATCH_RESULT;
/**
@ -49,6 +51,10 @@ public class ReportDispatchResultPostHttpRequestHandler extends PostHttpRequestH
ClientCallbackHandler clientCallback = ClientCallbackFactory.getClientCallback(dispatchJobResultRequest.getTaskType());
ClientCallbackContext context = JobTaskConverter.INSTANCE.toClientCallbackContext(dispatchJobResultRequest);
// 兼容过度版本
if (Objects.isNull(context.getRetryStatus())) {
context.setRetryStatus(context.isRetry());
}
context.setNamespaceId(headers.getAsString(HeadersEnum.NAMESPACE.getKey()));
clientCallback.callback(context);