diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/JobExecutorActor.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/JobExecutorActor.java index 01f936f0..f846e623 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/JobExecutorActor.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/dispatch/JobExecutorActor.java @@ -7,6 +7,8 @@ import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum; import com.aizuda.easy.retry.common.core.enums.StatusEnum; import com.aizuda.easy.retry.common.core.log.LogUtils; import com.aizuda.easy.retry.server.common.akka.ActorGenerator; +import com.aizuda.easy.retry.server.common.cache.CacheRegisterTable; +import com.aizuda.easy.retry.server.common.dto.RegisterNodeInfo; import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException; import com.aizuda.easy.retry.server.job.task.dto.JobTimerTaskDTO; import com.aizuda.easy.retry.server.job.task.dto.TaskExecuteDTO; @@ -32,10 +34,12 @@ import org.springframework.stereotype.Component; import org.springframework.transaction.TransactionStatus; import org.springframework.transaction.support.TransactionCallbackWithoutResult; import org.springframework.transaction.support.TransactionTemplate; +import org.springframework.util.CollectionUtils; import java.time.Duration; import java.time.LocalDateTime; import java.util.Objects; +import java.util.Set; import java.util.concurrent.TimeUnit; /** @@ -81,11 +85,8 @@ public class JobExecutorActor extends AbstractActor { ); try { - // 更新批次的状态 - updateBatchStatus(taskExecute, job); - // 如果任务已经关闭则不需要执行 - if (Objects.isNull(job)) { + if (!handlerTaskBatch(taskExecute, job)) { return; } @@ -101,13 +102,16 @@ public class JobExecutorActor extends AbstractActor { } - private void updateBatchStatus(final TaskExecuteDTO taskExecute, final Job job) { + private boolean handlerTaskBatch(final TaskExecuteDTO taskExecute, final Job job) { + int taskStatus = JobTaskBatchStatusEnum.RUNNING.getStatus(); int operationReason = JobOperationReasonEnum.NONE.getReason(); if (Objects.isNull(job)) { - log.warn("任务已经关闭不允许执行. jobId:[{}]", taskExecute.getJobId()); taskStatus = JobTaskBatchStatusEnum.CANCEL.getStatus(); operationReason = JobOperationReasonEnum.JOB_CLOSED.getReason(); + } else if (CollectionUtils.isEmpty(CacheRegisterTable.getServerNodeSet(taskExecute.getGroupName()))) { + taskStatus = JobTaskBatchStatusEnum.CANCEL.getStatus(); + operationReason = JobOperationReasonEnum.NOT_CLIENT.getReason(); } JobTaskBatch jobTaskBatch = new JobTaskBatch(); @@ -117,6 +121,8 @@ public class JobExecutorActor extends AbstractActor { jobTaskBatch.setOperationReason(operationReason); Assert.isTrue(1 == jobTaskBatchMapper.updateById(jobTaskBatch), () -> new EasyRetryServerException("更新任务失败")); + + return taskStatus == JobTaskBatchStatusEnum.RUNNING.getStatus(); } private void doHandlerResidentTask(Job job, TaskExecuteDTO taskExecuteDTO) { diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/AbstractJobExecutor.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/AbstractJobExecutor.java index e7b84b50..d9eddffa 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/AbstractJobExecutor.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/executor/AbstractJobExecutor.java @@ -10,6 +10,7 @@ import com.aizuda.easy.retry.template.datasource.persistence.po.Job; import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.transaction.annotation.Transactional; import org.springframework.util.CollectionUtils; import java.util.List; @@ -22,6 +23,7 @@ import java.util.List; public abstract class AbstractJobExecutor implements JobExecutor, InitializingBean { @Override + @Transactional public void execute(JobExecutorContext context) { // 生成任务 diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/idempotent/TimerIdempotent.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/idempotent/TimerIdempotent.java index a105bfef..381f68ce 100644 --- a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/idempotent/TimerIdempotent.java +++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/idempotent/TimerIdempotent.java @@ -5,6 +5,7 @@ import com.aizuda.easy.retry.server.common.IdempotentStrategy; import java.util.HashSet; import java.util.Set; +import java.util.concurrent.CopyOnWriteArraySet; /** * @author www.byteblogs.com @@ -13,7 +14,7 @@ import java.util.Set; */ public class TimerIdempotent implements IdempotentStrategy { - private static final Set cache = new HashSet<>(); + private static final CopyOnWriteArraySet cache = new CopyOnWriteArraySet<>(); @Override public boolean set(String key, String value) { diff --git a/easy-retry-server/easy-retry-server-starter/src/main/java/com/aizuda/easy/retry/server/starter/dispatch/DispatchService.java b/easy-retry-server/easy-retry-server-starter/src/main/java/com/aizuda/easy/retry/server/starter/dispatch/DispatchService.java index d1fc0da0..3ca8cbca 100644 --- a/easy-retry-server/easy-retry-server-starter/src/main/java/com/aizuda/easy/retry/server/starter/dispatch/DispatchService.java +++ b/easy-retry-server/easy-retry-server-starter/src/main/java/com/aizuda/easy/retry/server/starter/dispatch/DispatchService.java @@ -38,9 +38,9 @@ public class DispatchService implements Lifecycle { public static final Long PERIOD = 10L; /** - * 延迟10s为了尽可能保障集群节点都启动完成在进行rebalance + * 延迟30s为了尽可能保障集群节点都启动完成在进行rebalance */ - public static final Long INITIAL_DELAY = 10L; + public static final Long INITIAL_DELAY = 30L; @Override public void start() { diff --git a/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/JobServiceImpl.java b/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/JobServiceImpl.java index 325066b7..777c3ec8 100644 --- a/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/JobServiceImpl.java +++ b/easy-retry-server/easy-retry-server-web/src/main/java/com/aizuda/easy/retry/server/web/service/impl/JobServiceImpl.java @@ -147,6 +147,7 @@ public class JobServiceImpl implements JobService { @Override public Job updateJobResident(JobRequestVO jobRequestVO) { Job job = JobConverter.INSTANCE.toJob(jobRequestVO); + job.setResident(StatusEnum.NO.getStatus()); if (jobRequestVO.getTriggerType() == WaitStrategyEnum.FIXED.getTriggerType()) { if (Integer.parseInt(jobRequestVO.getTriggerInterval()) < 10) { job.setResident(StatusEnum.YES.getStatus()); @@ -163,6 +164,7 @@ public class JobServiceImpl implements JobService { } else { throw new EasyRetryServerException("未知触发类型"); } + return job; }