feat: 2.6.0

1. 重构job的阻塞策略
This commit is contained in:
byteblogs168 2024-01-18 18:04:57 +08:00
parent 74f73c2be4
commit de09e4f858
20 changed files with 302 additions and 192 deletions

View File

@ -0,0 +1,31 @@
package com.aizuda.easy.retry.server.job.task.enums;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import lombok.AllArgsConstructor;
import lombok.Getter;
/**
* @author: xiaowoniu
* @date : 2024-01-18
* @since : 2.6.0
*/
@AllArgsConstructor
@Getter
public enum BlockStrategyEnum {
DISCARD(1),
OVERLAY(2),
CONCURRENCY(3);
private final int blockStrategy;
public static BlockStrategyEnum valueOf(int blockStrategy) {
for (final BlockStrategyEnum value : BlockStrategyEnum.values()) {
if (value.blockStrategy == blockStrategy) {
return value;
}
}
throw new EasyRetryServerException("不符合的阻塞策略. blockStrategy:[{}]", blockStrategy);
}
}

View File

@ -1,6 +1,6 @@
package com.aizuda.easy.retry.server.job.task.support; package com.aizuda.easy.retry.server.job.task.support;
import com.aizuda.easy.retry.server.job.task.support.block.job.BlockStrategies.BlockStrategyContext; import com.aizuda.easy.retry.server.job.task.support.block.job.BlockStrategyContext;
/** /**
* @author: www.byteblogs.com * @author: www.byteblogs.com

View File

@ -3,13 +3,13 @@ package com.aizuda.easy.retry.server.job.task.support;
import com.aizuda.easy.retry.client.model.request.DispatchJobRequest; import com.aizuda.easy.retry.client.model.request.DispatchJobRequest;
import com.aizuda.easy.retry.client.model.request.DispatchJobResultRequest; import com.aizuda.easy.retry.client.model.request.DispatchJobResultRequest;
import com.aizuda.easy.retry.server.job.task.dto.*; import com.aizuda.easy.retry.server.job.task.dto.*;
import com.aizuda.easy.retry.server.job.task.support.block.job.BlockStrategyContext;
import com.aizuda.easy.retry.server.job.task.support.generator.batch.JobTaskBatchGeneratorContext; import com.aizuda.easy.retry.server.job.task.support.generator.batch.JobTaskBatchGeneratorContext;
import com.aizuda.easy.retry.server.job.task.support.generator.task.JobTaskGenerateContext; import com.aizuda.easy.retry.server.job.task.support.generator.task.JobTaskGenerateContext;
import com.aizuda.easy.retry.server.job.task.support.callback.ClientCallbackContext; import com.aizuda.easy.retry.server.job.task.support.callback.ClientCallbackContext;
import com.aizuda.easy.retry.server.job.task.support.executor.job.JobExecutorContext; import com.aizuda.easy.retry.server.job.task.support.executor.job.JobExecutorContext;
import com.aizuda.easy.retry.server.job.task.support.stop.TaskStopJobContext; import com.aizuda.easy.retry.server.job.task.support.stop.TaskStopJobContext;
import com.aizuda.easy.retry.server.model.dto.LogTaskDTO; import com.aizuda.easy.retry.server.model.dto.LogTaskDTO;
import com.aizuda.easy.retry.server.job.task.support.block.job.BlockStrategies;
import com.aizuda.easy.retry.template.datasource.persistence.po.Job; import com.aizuda.easy.retry.template.datasource.persistence.po.Job;
import com.aizuda.easy.retry.template.datasource.persistence.po.JobLogMessage; import com.aizuda.easy.retry.template.datasource.persistence.po.JobLogMessage;
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask; import com.aizuda.easy.retry.template.datasource.persistence.po.JobTask;
@ -43,18 +43,21 @@ public interface JobTaskConverter {
JobTaskBatchGeneratorContext toJobTaskGeneratorContext(JobTaskPrepareDTO jobTaskPrepareDTO); JobTaskBatchGeneratorContext toJobTaskGeneratorContext(JobTaskPrepareDTO jobTaskPrepareDTO);
JobTaskBatchGeneratorContext toJobTaskGeneratorContext(BlockStrategies.BlockStrategyContext context); JobTaskBatchGeneratorContext toJobTaskGeneratorContext(BlockStrategyContext context);
JobTaskGenerateContext toJobTaskInstanceGenerateContext(JobExecutorContext context); JobTaskGenerateContext toJobTaskInstanceGenerateContext(JobExecutorContext context);
JobTask toJobTaskInstance(JobTaskGenerateContext context); JobTask toJobTaskInstance(JobTaskGenerateContext context);
BlockStrategies.BlockStrategyContext toBlockStrategyContext(JobTaskPrepareDTO prepareDTO); BlockStrategyContext toBlockStrategyContext(JobTaskPrepareDTO prepareDTO);
TaskStopJobContext toStopJobContext(BlockStrategies.BlockStrategyContext context); TaskStopJobContext toStopJobContext(BlockStrategyContext context);
TaskStopJobContext toStopJobContext(JobExecutorResultDTO context); TaskStopJobContext toStopJobContext(JobExecutorResultDTO context);
@Mappings(
@Mapping(source = "id", target = "jobId")
)
TaskStopJobContext toStopJobContext(Job job); TaskStopJobContext toStopJobContext(Job job);
TaskStopJobContext toStopJobContext(JobTaskPrepareDTO context); TaskStopJobContext toStopJobContext(JobTaskPrepareDTO context);

View File

@ -0,0 +1,28 @@
package com.aizuda.easy.retry.server.job.task.support.block.job;
import com.aizuda.easy.retry.server.job.task.support.BlockStrategy;
import com.aizuda.easy.retry.server.job.task.enums.BlockStrategyEnum;
import org.springframework.beans.factory.InitializingBean;
/**
* @author: xiaowoniu
* @date : 2024-01-18
* @since : 2.6.0
*/
public abstract class AbstracJobBlockStrategy implements BlockStrategy , InitializingBean {
@Override
public void block(final BlockStrategyContext context) {
doBlock(context);
}
protected abstract void doBlock(final BlockStrategyContext context);
protected abstract BlockStrategyEnum blockStrategyEnum();
@Override
public void afterPropertiesSet() throws Exception {
JobBlockStrategyFactory.registerBlockStrategy(blockStrategyEnum(), this);
}
}

View File

@ -1,148 +0,0 @@
package com.aizuda.easy.retry.server.job.task.support.block.job;
import com.aizuda.easy.retry.common.core.context.SpringContext;
import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum;
import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.job.task.support.BlockStrategy;
import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter;
import com.aizuda.easy.retry.server.job.task.support.generator.batch.JobTaskBatchGenerator;
import com.aizuda.easy.retry.server.job.task.support.generator.batch.JobTaskBatchGeneratorContext;
import com.aizuda.easy.retry.server.job.task.support.stop.JobTaskStopFactory;
import com.aizuda.easy.retry.server.job.task.support.JobTaskStopHandler;
import com.aizuda.easy.retry.server.job.task.support.stop.TaskStopJobContext;
import com.aizuda.easy.retry.server.retry.task.support.dispatch.task.TaskExecutorSceneEnum;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import java.time.LocalDateTime;
import java.util.Objects;
/**
* @author: www.byteblogs.com
* @date : 2023-09-25 17:52
*/
@Slf4j
public class BlockStrategies {
@AllArgsConstructor
@Getter
public enum BlockStrategyEnum {
DISCARD(1, new DiscardBlockStrategy()),
OVERLAY(2, new OverlayBlockStrategy()),
CONCURRENCY(3, new ConcurrencyBlockStrategy());
private final int blockStrategy;
private final BlockStrategy strategy;
public static BlockStrategy getBlockStrategy(int blockStrategy) {
for (final BlockStrategyEnum value : BlockStrategyEnum.values()) {
if (value.blockStrategy == blockStrategy) {
return value.getStrategy();
}
}
throw new EasyRetryServerException("不符合的阻塞策略. blockStrategy:[{}]", blockStrategy);
}
public static BlockStrategyEnum valueOf(int blockStrategy) {
for (final BlockStrategyEnum value : BlockStrategyEnum.values()) {
if (value.blockStrategy == blockStrategy) {
return value;
}
}
throw new EasyRetryServerException("不符合的阻塞策略. blockStrategy:[{}]", blockStrategy);
}
}
@Data
public static class BlockStrategyContext {
private Long jobId;
private Long taskBatchId;
private String namespaceId;
private String groupName;
/**
* 任务类型
*/
private Integer taskType;
/**
* 下次触发时间
*/
private Long nextTriggerAt;
private Integer operationReason;
/**
* 执行策略 1auto 2manual 3workflow
*/
private Integer taskExecutorScene;
}
private static final class DiscardBlockStrategy implements BlockStrategy {
@Override
public void block(final BlockStrategyContext context) {
log.warn("阻塞策略为丢弃此次执行. taskBatchId:[{}]", context.getTaskBatchId());
// 重新生成任务
JobTaskBatchGenerator jobTaskBatchGenerator = SpringContext.getBeanByType(JobTaskBatchGenerator.class);
JobTaskBatchGeneratorContext jobTaskBatchGeneratorContext = JobTaskConverter.INSTANCE.toJobTaskGeneratorContext(context);
jobTaskBatchGeneratorContext.setTaskBatchStatus(JobTaskBatchStatusEnum.CANCEL.getStatus());
jobTaskBatchGeneratorContext.setOperationReason(JobOperationReasonEnum.JOB_DISCARD.getReason());
jobTaskBatchGenerator.generateJobTaskBatch(jobTaskBatchGeneratorContext);
}
}
private static final class OverlayBlockStrategy implements BlockStrategy {
@Override
public void block(final BlockStrategyContext context) {
log.warn("阻塞策略为覆盖. taskBatchId:[{}]", context.getTaskBatchId());
// 重新生成任务
JobTaskBatchGenerator jobTaskBatchGenerator = SpringContext.getBeanByType(JobTaskBatchGenerator.class);
JobTaskBatchGeneratorContext jobTaskBatchGeneratorContext = JobTaskConverter.INSTANCE.toJobTaskGeneratorContext(context);
jobTaskBatchGenerator.generateJobTaskBatch(jobTaskBatchGeneratorContext);
// 停止任务
JobTaskStopHandler instanceInterrupt = JobTaskStopFactory.getJobTaskStop(context.taskType);
TaskStopJobContext stopJobContext = JobTaskConverter.INSTANCE.toStopJobContext(context);
Integer operationReason = context.getOperationReason();
if (Objects.isNull(context.getOperationReason()) || context.getOperationReason() == JobOperationReasonEnum.NONE.getReason()) {
operationReason = JobOperationReasonEnum.JOB_OVERLAY.getReason();
}
stopJobContext.setJobOperationReason(operationReason);
stopJobContext.setNeedUpdateTaskStatus(Boolean.TRUE);
instanceInterrupt.stop(stopJobContext);
}
}
private static final class ConcurrencyBlockStrategy implements BlockStrategy {
@Override
public void block(final BlockStrategyContext context) {
log.warn("阻塞策略为并行执行. taskBatchId:[{}]", context.getTaskBatchId());
// 重新生成任务
JobTaskBatchGenerator jobTaskBatchGenerator = SpringContext.getBeanByType(JobTaskBatchGenerator.class);
JobTaskBatchGeneratorContext jobTaskBatchGeneratorContext = JobTaskConverter.INSTANCE.toJobTaskGeneratorContext(context);
jobTaskBatchGenerator.generateJobTaskBatch(jobTaskBatchGeneratorContext);
}
}
}

View File

@ -0,0 +1,38 @@
package com.aizuda.easy.retry.server.job.task.support.block.job;
import lombok.Data;
/**
* @author: xiaowoniu
* @date : 2024-01-18
* @since : 2.6.0
*/
@Data
public class BlockStrategyContext {
private Long jobId;
private Long taskBatchId;
private String namespaceId;
private String groupName;
/**
* 任务类型
*/
private Integer taskType;
/**
* 下次触发时间
*/
private Long nextTriggerAt;
private Integer operationReason;
/**
* 执行策略 1auto 2manual 3workflow
*/
private Integer taskExecutorScene;
}

View File

@ -0,0 +1,30 @@
package com.aizuda.easy.retry.server.job.task.support.block.job;
import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter;
import com.aizuda.easy.retry.server.job.task.enums.BlockStrategyEnum;
import com.aizuda.easy.retry.server.job.task.support.generator.batch.JobTaskBatchGenerator;
import com.aizuda.easy.retry.server.job.task.support.generator.batch.JobTaskBatchGeneratorContext;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
/**
* @author: xiaowoniu
* @date : 2024-01-18
* @since : 2.6.0
*/
@Component
@RequiredArgsConstructor
public class ConcurrencyBlockStrategy extends AbstracJobBlockStrategy {
private final JobTaskBatchGenerator jobTaskBatchGenerator;
@Override
public void doBlock(final BlockStrategyContext context) {
// 重新生成任务
JobTaskBatchGeneratorContext jobTaskBatchGeneratorContext = JobTaskConverter.INSTANCE.toJobTaskGeneratorContext(context);
jobTaskBatchGenerator.generateJobTaskBatch(jobTaskBatchGeneratorContext);
}
@Override
protected BlockStrategyEnum blockStrategyEnum() {
return BlockStrategyEnum.CONCURRENCY;
}
}

View File

@ -0,0 +1,34 @@
package com.aizuda.easy.retry.server.job.task.support.block.job;
import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum;
import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter;
import com.aizuda.easy.retry.server.job.task.enums.BlockStrategyEnum;
import com.aizuda.easy.retry.server.job.task.support.generator.batch.JobTaskBatchGenerator;
import com.aizuda.easy.retry.server.job.task.support.generator.batch.JobTaskBatchGeneratorContext;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
/**
* @author: xiaowoniu
* @date : 2024-01-18
* @since : 2.6.0
*/
@Component
@RequiredArgsConstructor
public class DiscardBlockStrategy extends AbstracJobBlockStrategy {
private final JobTaskBatchGenerator jobTaskBatchGenerator;
@Override
public void doBlock(final BlockStrategyContext context) {
// 重新生成任务
JobTaskBatchGeneratorContext jobTaskBatchGeneratorContext = JobTaskConverter.INSTANCE.toJobTaskGeneratorContext(context);
jobTaskBatchGeneratorContext.setTaskBatchStatus(JobTaskBatchStatusEnum.CANCEL.getStatus());
jobTaskBatchGeneratorContext.setOperationReason(JobOperationReasonEnum.JOB_DISCARD.getReason());
jobTaskBatchGenerator.generateJobTaskBatch(jobTaskBatchGeneratorContext);
}
@Override
protected BlockStrategyEnum blockStrategyEnum() {
return BlockStrategyEnum.DISCARD;
}
}

View File

@ -0,0 +1,27 @@
package com.aizuda.easy.retry.server.job.task.support.block.job;
import com.aizuda.easy.retry.server.job.task.support.BlockStrategy;
import com.aizuda.easy.retry.server.job.task.enums.BlockStrategyEnum;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author: xiaowoniu
* @date : 2023-01-18
* @since : 2.6.0
*/
public final class JobBlockStrategyFactory {
private static final ConcurrentHashMap<BlockStrategyEnum, BlockStrategy> CACHE = new ConcurrentHashMap<>();
private JobBlockStrategyFactory() {
}
static void registerBlockStrategy(BlockStrategyEnum blockStrategyEnum, BlockStrategy blockStrategy) {
CACHE.put(blockStrategyEnum, blockStrategy);
}
public static BlockStrategy getBlockStrategy(Integer blockStrategy) {
return CACHE.get(BlockStrategyEnum.valueOf(blockStrategy));
}
}

View File

@ -0,0 +1,50 @@
package com.aizuda.easy.retry.server.job.task.support.block.job;
import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum;
import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter;
import com.aizuda.easy.retry.server.job.task.support.JobTaskStopHandler;
import com.aizuda.easy.retry.server.job.task.enums.BlockStrategyEnum;
import com.aizuda.easy.retry.server.job.task.support.generator.batch.JobTaskBatchGenerator;
import com.aizuda.easy.retry.server.job.task.support.generator.batch.JobTaskBatchGeneratorContext;
import com.aizuda.easy.retry.server.job.task.support.stop.JobTaskStopFactory;
import com.aizuda.easy.retry.server.job.task.support.stop.TaskStopJobContext;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
import java.util.Objects;
/**
* @author: xiaowoniu
* @date : 2024-01-18
* @since : 2.6.0
*/
@Component
@RequiredArgsConstructor
public class OverlayBlockStrategy extends AbstracJobBlockStrategy {
private final JobTaskBatchGenerator jobTaskBatchGenerator;
@Override
public void doBlock(final BlockStrategyContext context) {
// 重新生成任务
JobTaskBatchGeneratorContext jobTaskBatchGeneratorContext = JobTaskConverter.INSTANCE.toJobTaskGeneratorContext(context);
jobTaskBatchGenerator.generateJobTaskBatch(jobTaskBatchGeneratorContext);
// 停止任务
JobTaskStopHandler instanceInterrupt = JobTaskStopFactory.getJobTaskStop(context.getTaskType());
TaskStopJobContext stopJobContext = JobTaskConverter.INSTANCE.toStopJobContext(context);
Integer operationReason = context.getOperationReason();
if (Objects.isNull(context.getOperationReason()) || context.getOperationReason() == JobOperationReasonEnum.NONE.getReason()) {
operationReason = JobOperationReasonEnum.JOB_OVERLAY.getReason();
}
stopJobContext.setJobOperationReason(operationReason);
stopJobContext.setNeedUpdateTaskStatus(Boolean.TRUE);
instanceInterrupt.stop(stopJobContext);
}
@Override
protected BlockStrategyEnum blockStrategyEnum() {
return BlockStrategyEnum.OVERLAY;
}
}

View File

@ -1,8 +1,8 @@
package com.aizuda.easy.retry.server.job.task.support.block.workflow; package com.aizuda.easy.retry.server.job.task.support.block.workflow;
import com.aizuda.easy.retry.server.job.task.support.BlockStrategy; import com.aizuda.easy.retry.server.job.task.support.BlockStrategy;
import com.aizuda.easy.retry.server.job.task.support.block.job.BlockStrategies.BlockStrategyContext; import com.aizuda.easy.retry.server.job.task.support.block.job.BlockStrategyContext;
import com.aizuda.easy.retry.server.job.task.support.block.job.BlockStrategies.BlockStrategyEnum; import com.aizuda.easy.retry.server.job.task.enums.BlockStrategyEnum;
import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.InitializingBean;
import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.annotation.Transactional;
@ -27,6 +27,6 @@ public abstract class AbstractWorkflowBlockStrategy implements BlockStrategy, In
@Override @Override
public void afterPropertiesSet() throws Exception { public void afterPropertiesSet() throws Exception {
WorkflowBlockStrategyFactory.registerTaskStop(blockStrategyEnum(), this); WorkflowBlockStrategyFactory.registerBlockStrategy(blockStrategyEnum(), this);
} }
} }

View File

@ -4,7 +4,7 @@ import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.job.task.support.WorkflowTaskConverter; import com.aizuda.easy.retry.server.job.task.support.WorkflowTaskConverter;
import com.aizuda.easy.retry.server.job.task.support.generator.batch.WorkflowBatchGenerator; import com.aizuda.easy.retry.server.job.task.support.generator.batch.WorkflowBatchGenerator;
import com.aizuda.easy.retry.server.job.task.support.generator.batch.WorkflowTaskBatchGeneratorContext; import com.aizuda.easy.retry.server.job.task.support.generator.batch.WorkflowTaskBatchGeneratorContext;
import com.aizuda.easy.retry.server.job.task.support.block.job.BlockStrategies.BlockStrategyEnum; import com.aizuda.easy.retry.server.job.task.enums.BlockStrategyEnum;
import com.aizuda.easy.retry.server.job.task.support.handler.WorkflowBatchHandler; import com.aizuda.easy.retry.server.job.task.support.handler.WorkflowBatchHandler;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;

View File

@ -4,7 +4,7 @@ import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum;
import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum; import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException; import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.job.task.support.WorkflowTaskConverter; import com.aizuda.easy.retry.server.job.task.support.WorkflowTaskConverter;
import com.aizuda.easy.retry.server.job.task.support.block.job.BlockStrategies.BlockStrategyEnum; import com.aizuda.easy.retry.server.job.task.enums.BlockStrategyEnum;
import com.aizuda.easy.retry.server.job.task.support.generator.batch.WorkflowBatchGenerator; import com.aizuda.easy.retry.server.job.task.support.generator.batch.WorkflowBatchGenerator;
import com.aizuda.easy.retry.server.job.task.support.generator.batch.WorkflowTaskBatchGeneratorContext; import com.aizuda.easy.retry.server.job.task.support.generator.batch.WorkflowTaskBatchGeneratorContext;
import com.aizuda.easy.retry.server.job.task.support.handler.WorkflowBatchHandler; import com.aizuda.easy.retry.server.job.task.support.handler.WorkflowBatchHandler;

View File

@ -4,7 +4,7 @@ import com.aizuda.easy.retry.server.job.task.support.WorkflowTaskConverter;
import com.aizuda.easy.retry.server.job.task.support.generator.batch.WorkflowBatchGenerator; import com.aizuda.easy.retry.server.job.task.support.generator.batch.WorkflowBatchGenerator;
import com.aizuda.easy.retry.server.job.task.support.generator.batch.WorkflowTaskBatchGeneratorContext; import com.aizuda.easy.retry.server.job.task.support.generator.batch.WorkflowTaskBatchGeneratorContext;
import com.aizuda.easy.retry.server.job.task.support.handler.WorkflowBatchHandler; import com.aizuda.easy.retry.server.job.task.support.handler.WorkflowBatchHandler;
import com.aizuda.easy.retry.server.job.task.support.block.job.BlockStrategies.BlockStrategyEnum; import com.aizuda.easy.retry.server.job.task.enums.BlockStrategyEnum;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;

View File

@ -1,6 +1,6 @@
package com.aizuda.easy.retry.server.job.task.support.block.workflow; package com.aizuda.easy.retry.server.job.task.support.block.workflow;
import com.aizuda.easy.retry.server.job.task.support.block.job.BlockStrategies.BlockStrategyContext; import com.aizuda.easy.retry.server.job.task.support.block.job.BlockStrategyContext;
import lombok.Data; import lombok.Data;
import lombok.EqualsAndHashCode; import lombok.EqualsAndHashCode;
@ -28,8 +28,4 @@ public class WorkflowBlockStrategyContext extends BlockStrategyContext {
*/ */
private String flowInfo; private String flowInfo;
/**
* 执行策略 1auto 2manual 3workflow
*/
private Integer taskExecutorScene;
} }

View File

@ -1,7 +1,7 @@
package com.aizuda.easy.retry.server.job.task.support.block.workflow; package com.aizuda.easy.retry.server.job.task.support.block.workflow;
import com.aizuda.easy.retry.server.job.task.support.BlockStrategy; import com.aizuda.easy.retry.server.job.task.support.BlockStrategy;
import com.aizuda.easy.retry.server.job.task.support.block.job.BlockStrategies.BlockStrategyEnum; import com.aizuda.easy.retry.server.job.task.enums.BlockStrategyEnum;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
@ -16,11 +16,11 @@ public final class WorkflowBlockStrategyFactory {
private WorkflowBlockStrategyFactory() { private WorkflowBlockStrategyFactory() {
} }
protected static void registerTaskStop(BlockStrategyEnum blockStrategyEnum, BlockStrategy blockStrategy) { static void registerBlockStrategy(BlockStrategyEnum blockStrategyEnum, BlockStrategy blockStrategy) {
CACHE.put(blockStrategyEnum, blockStrategy); CACHE.put(blockStrategyEnum, blockStrategy);
} }
public static BlockStrategy getJobTaskStop(Integer blockStrategy) { public static BlockStrategy getBlockStrategy(Integer blockStrategy) {
return CACHE.get(BlockStrategyEnum.valueOf(blockStrategy)); return CACHE.get(BlockStrategyEnum.valueOf(blockStrategy));
} }

View File

@ -4,7 +4,9 @@ import com.aizuda.easy.retry.common.log.EasyRetryLog;
import com.aizuda.easy.retry.server.common.lock.LockBuilder; import com.aizuda.easy.retry.server.common.lock.LockBuilder;
import com.aizuda.easy.retry.server.common.lock.LockProvider; import com.aizuda.easy.retry.server.common.lock.LockProvider;
import com.aizuda.easy.retry.server.job.task.support.LockExecutor; import com.aizuda.easy.retry.server.job.task.support.LockExecutor;
import com.github.rholder.retry.Attempt;
import com.github.rholder.retry.RetryException; import com.github.rholder.retry.RetryException;
import com.github.rholder.retry.RetryListener;
import com.github.rholder.retry.Retryer; import com.github.rholder.retry.Retryer;
import com.github.rholder.retry.RetryerBuilder; import com.github.rholder.retry.RetryerBuilder;
import com.github.rholder.retry.StopStrategies; import com.github.rholder.retry.StopStrategies;
@ -13,9 +15,13 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.time.Duration; import java.time.Duration;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
/** /**
* 分布式锁工具类
*
* @author: xiaowoniu * @author: xiaowoniu
* @date : 2024-01-02 * @date : 2024-01-02
* @since : 2.6.0 * @since : 2.6.0
@ -24,18 +30,36 @@ import java.util.concurrent.TimeUnit;
@Slf4j @Slf4j
public class DistributedLockHandler { public class DistributedLockHandler {
/**
* 获取分布式锁并支持重试
*
* @param lockExecutor 执行器
* @param lockName 锁名称
* @param lockAtMost 锁超时时间
* @param sleepTime 重试间隔
* @param maxRetryTimes 重试次数
*/
public void lockWithDisposableAndRetry(LockExecutor lockExecutor, public void lockWithDisposableAndRetry(LockExecutor lockExecutor,
String lockName, Duration lockAtMost, String lockName, Duration lockAtMost,
Duration sleepTime, Integer maxRetryTimes) { Duration sleepTime, Integer maxRetryTimes) {
LockProvider lockProvider = LockBuilder.newBuilder() LockProvider lockProvider = LockBuilder.newBuilder()
.withDisposable(lockName) .withDisposable(lockName)
.build(); .build();
Retryer<Boolean> retryer = RetryerBuilder.<Boolean>newBuilder() Retryer<Boolean> retryer = RetryerBuilder.<Boolean>newBuilder()
.retryIfResult(result -> false) .retryIfResult(result -> result.equals(Boolean.FALSE))
.withWaitStrategy(WaitStrategies.fixedWait(sleepTime.toMillis(), TimeUnit.MILLISECONDS)) .retryIfException(ex -> true)
.withStopStrategy(StopStrategies.stopAfterAttempt(maxRetryTimes)) .withWaitStrategy(WaitStrategies.fixedWait(sleepTime.toMillis(), TimeUnit.MILLISECONDS))
.build(); .withStopStrategy(StopStrategies.stopAfterAttempt(maxRetryTimes))
.withRetryListener(new RetryListener() {
@Override
public <V> void onRetry(final Attempt<V> attempt) {
if (!attempt.hasResult()) {
EasyRetryLog.LOCAL.warn("第【{}】次尝试获取锁. lockName:[{}]",
attempt.getAttemptNumber(), lockName);
}
}
}).build();
boolean lock = false; boolean lock = false;
try { try {
@ -60,17 +84,17 @@ public class DistributedLockHandler {
} }
/** /**
* TODO 超时处理自旋处理 * 获取分布式锁
* *
* @param lockName * @param lockExecutor 执行器
* @param lockAtMost * @param lockName 锁名称
* @param lockExecutor * @param lockAtMost 锁超时时间
*/ */
public void lockWithDisposable(String lockName, Duration lockAtMost, LockExecutor lockExecutor) { public void lockWithDisposable(String lockName, Duration lockAtMost, LockExecutor lockExecutor) {
LockProvider lockProvider = LockBuilder.newBuilder() LockProvider lockProvider = LockBuilder.newBuilder()
.withDisposable(lockName) .withDisposable(lockName)
.build(); .build();
boolean lock = false; boolean lock = false;
try { try {

View File

@ -9,10 +9,11 @@ import com.aizuda.easy.retry.server.job.task.support.BlockStrategy;
import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter; import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter;
import com.aizuda.easy.retry.server.job.task.dto.JobTaskPrepareDTO; import com.aizuda.easy.retry.server.job.task.dto.JobTaskPrepareDTO;
import com.aizuda.easy.retry.server.job.task.support.JobTaskStopHandler; import com.aizuda.easy.retry.server.job.task.support.JobTaskStopHandler;
import com.aizuda.easy.retry.server.job.task.support.block.job.BlockStrategyContext;
import com.aizuda.easy.retry.server.job.task.support.block.job.JobBlockStrategyFactory;
import com.aizuda.easy.retry.server.job.task.support.stop.JobTaskStopFactory; import com.aizuda.easy.retry.server.job.task.support.stop.JobTaskStopFactory;
import com.aizuda.easy.retry.server.job.task.support.stop.TaskStopJobContext; import com.aizuda.easy.retry.server.job.task.support.stop.TaskStopJobContext;
import com.aizuda.easy.retry.server.job.task.support.block.job.BlockStrategies; import com.aizuda.easy.retry.server.job.task.enums.BlockStrategyEnum;
import com.aizuda.easy.retry.server.job.task.support.block.job.BlockStrategies.BlockStrategyEnum;
import com.aizuda.easy.retry.server.job.task.support.handler.JobTaskBatchHandler; import com.aizuda.easy.retry.server.job.task.support.handler.JobTaskBatchHandler;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
@ -69,9 +70,9 @@ public class RunningJobPrepareHandler extends AbstractJobPrePareHandler {
return; return;
} }
BlockStrategies.BlockStrategyContext blockStrategyContext = JobTaskConverter.INSTANCE.toBlockStrategyContext(prepare); BlockStrategyContext blockStrategyContext = JobTaskConverter.INSTANCE.toBlockStrategyContext(prepare);
blockStrategyContext.setOperationReason(jobOperationReasonEnum.getReason()); blockStrategyContext.setOperationReason(jobOperationReasonEnum.getReason());
BlockStrategy blockStrategyInterface = BlockStrategies.BlockStrategyEnum.getBlockStrategy(blockStrategy); BlockStrategy blockStrategyInterface = JobBlockStrategyFactory.getBlockStrategy(blockStrategy);
blockStrategyInterface.block(blockStrategyContext); blockStrategyInterface.block(blockStrategyContext);
} }

View File

@ -10,12 +10,11 @@ import com.aizuda.easy.retry.server.job.task.support.WorkflowTaskConverter;
import com.aizuda.easy.retry.server.job.task.support.block.workflow.WorkflowBlockStrategyContext; import com.aizuda.easy.retry.server.job.task.support.block.workflow.WorkflowBlockStrategyContext;
import com.aizuda.easy.retry.server.job.task.support.block.workflow.WorkflowBlockStrategyFactory; import com.aizuda.easy.retry.server.job.task.support.block.workflow.WorkflowBlockStrategyFactory;
import com.aizuda.easy.retry.server.job.task.support.handler.WorkflowBatchHandler; import com.aizuda.easy.retry.server.job.task.support.handler.WorkflowBatchHandler;
import com.aizuda.easy.retry.server.job.task.support.block.job.BlockStrategies.BlockStrategyEnum; import com.aizuda.easy.retry.server.job.task.enums.BlockStrategyEnum;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.Objects; import java.util.Objects;
/** /**
@ -65,7 +64,7 @@ public class RunningWorkflowPrepareHandler extends AbstractWorkflowPrePareHandle
} }
// 3. 支持阻塞策略同JOB逻辑一致 // 3. 支持阻塞策略同JOB逻辑一致
BlockStrategy blockStrategyInterface = WorkflowBlockStrategyFactory.getJobTaskStop(blockStrategy); BlockStrategy blockStrategyInterface = WorkflowBlockStrategyFactory.getBlockStrategy(blockStrategy);
WorkflowBlockStrategyContext workflowBlockStrategyContext = WorkflowTaskConverter.INSTANCE.toWorkflowBlockStrategyContext( WorkflowBlockStrategyContext workflowBlockStrategyContext = WorkflowTaskConverter.INSTANCE.toWorkflowBlockStrategyContext(
prepare); prepare);
blockStrategyInterface.block(workflowBlockStrategyContext); blockStrategyInterface.block(workflowBlockStrategyContext);

View File

@ -137,10 +137,7 @@ public class JobBatchServiceImpl implements JobBatchService {
JobTaskStopHandler jobTaskStop = JobTaskStopFactory.getJobTaskStop(job.getTaskType()); JobTaskStopHandler jobTaskStop = JobTaskStopFactory.getJobTaskStop(job.getTaskType());
TaskStopJobContext taskStopJobContext = new TaskStopJobContext(); TaskStopJobContext taskStopJobContext = JobTaskConverter.INSTANCE.toStopJobContext(job);
taskStopJobContext.setJobId(job.getId());
taskStopJobContext.setTaskType(job.getTaskType());
taskStopJobContext.setGroupName(job.getGroupName());
taskStopJobContext.setJobOperationReason(JobOperationReasonEnum.MANNER_STOP.getReason()); taskStopJobContext.setJobOperationReason(JobOperationReasonEnum.MANNER_STOP.getReason());
taskStopJobContext.setTaskBatchId(jobTaskBatch.getId()); taskStopJobContext.setTaskBatchId(jobTaskBatch.getId());
taskStopJobContext.setForceStop(Boolean.TRUE); taskStopJobContext.setForceStop(Boolean.TRUE);