feat:2.4.0

1. 修复并行重复产生新任务问题
This commit is contained in:
byteblogs168 2023-10-27 23:20:57 +08:00
parent b15f977cf4
commit 97f45ada1c
5 changed files with 20 additions and 9 deletions

View File

@ -7,6 +7,8 @@ import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.easy.retry.common.core.enums.StatusEnum; import com.aizuda.easy.retry.common.core.enums.StatusEnum;
import com.aizuda.easy.retry.common.core.log.LogUtils; import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator; import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.cache.CacheRegisterTable;
import com.aizuda.easy.retry.server.common.dto.RegisterNodeInfo;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException; import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.job.task.dto.JobTimerTaskDTO; import com.aizuda.easy.retry.server.job.task.dto.JobTimerTaskDTO;
import com.aizuda.easy.retry.server.job.task.dto.TaskExecuteDTO; import com.aizuda.easy.retry.server.job.task.dto.TaskExecuteDTO;
@ -32,10 +34,12 @@ import org.springframework.stereotype.Component;
import org.springframework.transaction.TransactionStatus; import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.TransactionCallbackWithoutResult; import org.springframework.transaction.support.TransactionCallbackWithoutResult;
import org.springframework.transaction.support.TransactionTemplate; import org.springframework.transaction.support.TransactionTemplate;
import org.springframework.util.CollectionUtils;
import java.time.Duration; import java.time.Duration;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.util.Objects; import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
/** /**
@ -81,11 +85,8 @@ public class JobExecutorActor extends AbstractActor {
); );
try { try {
// 更新批次的状态
updateBatchStatus(taskExecute, job);
// 如果任务已经关闭则不需要执行 if (!handlerTaskBatch(taskExecute, job)) {
if (Objects.isNull(job)) {
return; return;
} }
@ -101,13 +102,16 @@ public class JobExecutorActor extends AbstractActor {
} }
private void updateBatchStatus(final TaskExecuteDTO taskExecute, final Job job) { private boolean handlerTaskBatch(final TaskExecuteDTO taskExecute, final Job job) {
int taskStatus = JobTaskBatchStatusEnum.RUNNING.getStatus(); int taskStatus = JobTaskBatchStatusEnum.RUNNING.getStatus();
int operationReason = JobOperationReasonEnum.NONE.getReason(); int operationReason = JobOperationReasonEnum.NONE.getReason();
if (Objects.isNull(job)) { if (Objects.isNull(job)) {
log.warn("任务已经关闭不允许执行. jobId:[{}]", taskExecute.getJobId());
taskStatus = JobTaskBatchStatusEnum.CANCEL.getStatus(); taskStatus = JobTaskBatchStatusEnum.CANCEL.getStatus();
operationReason = JobOperationReasonEnum.JOB_CLOSED.getReason(); operationReason = JobOperationReasonEnum.JOB_CLOSED.getReason();
} else if (CollectionUtils.isEmpty(CacheRegisterTable.getServerNodeSet(taskExecute.getGroupName()))) {
taskStatus = JobTaskBatchStatusEnum.CANCEL.getStatus();
operationReason = JobOperationReasonEnum.NOT_CLIENT.getReason();
} }
JobTaskBatch jobTaskBatch = new JobTaskBatch(); JobTaskBatch jobTaskBatch = new JobTaskBatch();
@ -117,6 +121,8 @@ public class JobExecutorActor extends AbstractActor {
jobTaskBatch.setOperationReason(operationReason); jobTaskBatch.setOperationReason(operationReason);
Assert.isTrue(1 == jobTaskBatchMapper.updateById(jobTaskBatch), Assert.isTrue(1 == jobTaskBatchMapper.updateById(jobTaskBatch),
() -> new EasyRetryServerException("更新任务失败")); () -> new EasyRetryServerException("更新任务失败"));
return taskStatus == JobTaskBatchStatusEnum.RUNNING.getStatus();
} }
private void doHandlerResidentTask(Job job, TaskExecuteDTO taskExecuteDTO) { private void doHandlerResidentTask(Job job, TaskExecuteDTO taskExecuteDTO) {

View File

@ -10,6 +10,7 @@ import com.aizuda.easy.retry.template.datasource.persistence.po.Job;
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask; import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask;
import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.CollectionUtils; import org.springframework.util.CollectionUtils;
import java.util.List; import java.util.List;
@ -22,6 +23,7 @@ import java.util.List;
public abstract class AbstractJobExecutor implements JobExecutor, InitializingBean { public abstract class AbstractJobExecutor implements JobExecutor, InitializingBean {
@Override @Override
@Transactional
public void execute(JobExecutorContext context) { public void execute(JobExecutorContext context) {
// 生成任务 // 生成任务

View File

@ -5,6 +5,7 @@ import com.aizuda.easy.retry.server.common.IdempotentStrategy;
import java.util.HashSet; import java.util.HashSet;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
/** /**
* @author www.byteblogs.com * @author www.byteblogs.com
@ -13,7 +14,7 @@ import java.util.Set;
*/ */
public class TimerIdempotent implements IdempotentStrategy<String, String> { public class TimerIdempotent implements IdempotentStrategy<String, String> {
private static final Set<String> cache = new HashSet<>(); private static final CopyOnWriteArraySet<String> cache = new CopyOnWriteArraySet<>();
@Override @Override
public boolean set(String key, String value) { public boolean set(String key, String value) {

View File

@ -38,9 +38,9 @@ public class DispatchService implements Lifecycle {
public static final Long PERIOD = 10L; public static final Long PERIOD = 10L;
/** /**
* 延迟10s为了尽可能保障集群节点都启动完成在进行rebalance * 延迟30s为了尽可能保障集群节点都启动完成在进行rebalance
*/ */
public static final Long INITIAL_DELAY = 10L; public static final Long INITIAL_DELAY = 30L;
@Override @Override
public void start() { public void start() {

View File

@ -147,6 +147,7 @@ public class JobServiceImpl implements JobService {
@Override @Override
public Job updateJobResident(JobRequestVO jobRequestVO) { public Job updateJobResident(JobRequestVO jobRequestVO) {
Job job = JobConverter.INSTANCE.toJob(jobRequestVO); Job job = JobConverter.INSTANCE.toJob(jobRequestVO);
job.setResident(StatusEnum.NO.getStatus());
if (jobRequestVO.getTriggerType() == WaitStrategyEnum.FIXED.getTriggerType()) { if (jobRequestVO.getTriggerType() == WaitStrategyEnum.FIXED.getTriggerType()) {
if (Integer.parseInt(jobRequestVO.getTriggerInterval()) < 10) { if (Integer.parseInt(jobRequestVO.getTriggerInterval()) < 10) {
job.setResident(StatusEnum.YES.getStatus()); job.setResident(StatusEnum.YES.getStatus());
@ -163,6 +164,7 @@ public class JobServiceImpl implements JobService {
} else { } else {
throw new EasyRetryServerException("未知触发类型"); throw new EasyRetryServerException("未知触发类型");
} }
return job; return job;
} }