feat: 2.6.0

1. 优化分布式锁,支持删除锁记录
2. 工作流执行器添加分布式锁,防止并发时产生多个任务批次问题
This commit is contained in:
byteblogs168 2024-01-02 18:32:25 +08:00
parent 59f99320c6
commit 116e6af226
10 changed files with 247 additions and 24 deletions

View File

@ -1,6 +1,7 @@
package com.aizuda.easy.retry.server.common.dto;
import cn.hutool.core.lang.Assert;
import com.aizuda.easy.retry.server.common.enums.UnLockOperationEnum;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import java.time.Duration;
@ -21,17 +22,25 @@ public class LockConfig {
private final Duration lockAtLeast;
public LockConfig(final LocalDateTime createDt, final String lockName, final Duration lockAtMost, final Duration lockAtLeast) {
private final UnLockOperationEnum unLockOperation;
public LockConfig(final LocalDateTime createDt,
final String lockName,
final Duration lockAtMost,
final Duration lockAtLeast,
final UnLockOperationEnum unLockOperation) {
this.lockName = lockName;
this.lockAtMost = lockAtMost;
this.lockAtLeast = lockAtLeast;
this.createDt = createDt;
this.unLockOperation = unLockOperation;
Assert.notNull(createDt, () -> new EasyRetryServerException("createDt can not be null."));
Assert.notBlank(lockName, () -> new EasyRetryServerException("lockName can not be null."));
Assert.notNull(lockAtMost, () -> new EasyRetryServerException("lockAtMost can not be null. lockName:[{}]", lockName));
Assert.isFalse(lockAtMost.isNegative(), () -> new EasyRetryServerException("lockAtMost is negative. lockName:[{}]", lockName));
Assert.notNull(lockAtLeast, () -> new EasyRetryServerException("lockAtLeast can not be null. lockName:[{}]", lockName));
Assert.isFalse(lockAtLeast.compareTo(lockAtMost) > 0, () -> new EasyRetryServerException("lockAtLeast is longer than lockAtMost for lock. lockName:[{}]", lockName));
Assert.notNull(unLockOperation, () -> new EasyRetryServerException("unLockOperation can not be null. lockName:[{}]", lockName));
}
public LocalDateTime getCreateDt() {
@ -46,6 +55,10 @@ public class LockConfig {
return createDt.plus(lockAtMost);
}
public UnLockOperationEnum getUnLockOperation() {
return unLockOperation;
}
public LocalDateTime getLockAtLeast() {
return createDt.plus(lockAtLeast);
}

View File

@ -0,0 +1,13 @@
package com.aizuda.easy.retry.server.common.enums;
/**
* @author: xiaowoniu
* @date : 2024-01-02
* @since : 2.6.0
*/
public enum UnLockOperationEnum {
DELETE,
UPDATE,
;
}

View File

@ -13,7 +13,7 @@ import java.util.List;
* @since 2.1.0
*/
public abstract class AbstractLockProvider implements LockProvider {
protected final static List<String> ALLOW_DB = Arrays.asList(DbTypeEnum.MYSQL.getDb(),
protected static final List<String> ALLOW_DB = Arrays.asList(DbTypeEnum.MYSQL.getDb(),
DbTypeEnum.MARIADB.getDb(),
DbTypeEnum.POSTGRES.getDb());
@Override

View File

@ -2,8 +2,10 @@ package com.aizuda.easy.retry.server.common.lock;
import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.common.core.util.JsonUtil;
import com.aizuda.easy.retry.server.common.Lifecycle;
import com.aizuda.easy.retry.server.common.config.SystemProperties;
import com.aizuda.easy.retry.server.common.dto.LockConfig;
import com.aizuda.easy.retry.server.common.enums.UnLockOperationEnum;
import com.aizuda.easy.retry.server.common.register.ServerRegister;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.DistributedLockMapper;
import com.aizuda.easy.retry.template.datasource.persistence.po.DistributedLock;
@ -32,7 +34,7 @@ import java.time.LocalDateTime;
@Component
@RequiredArgsConstructor
@Slf4j
public class JdbcLockProvider extends AbstractLockProvider {
public class JdbcLockProvider extends AbstractLockProvider implements Lifecycle {
private final DistributedLockMapper distributedLockMapper;
@ -47,15 +49,20 @@ public class JdbcLockProvider extends AbstractLockProvider {
@Override
public boolean unlock(final LockConfig lockConfig) {
LocalDateTime now = LocalDateTime.now();
DistributedLock distributedLock = new DistributedLock();
distributedLock.setLockedBy(ServerRegister.CURRENT_CID);
LocalDateTime lockAtLeast = lockConfig.getLockAtLeast();
distributedLock.setLockUntil(now.isBefore(lockAtLeast) ? lockAtLeast : now);
for (int i = 0; i < 10; i++) {
try {
return distributedLockMapper.update(distributedLock, new LambdaUpdateWrapper<DistributedLock>()
if (lockConfig.getUnLockOperation() == UnLockOperationEnum.UPDATE) {
DistributedLock distributedLock = new DistributedLock();
distributedLock.setLockedBy(ServerRegister.CURRENT_CID);
LocalDateTime lockAtLeast = lockConfig.getLockAtLeast();
distributedLock.setLockUntil(now.isBefore(lockAtLeast) ? lockAtLeast : now);
return distributedLockMapper.update(distributedLock, new LambdaUpdateWrapper<DistributedLock>()
.eq(DistributedLock::getName, lockConfig.getLockName())) > 0;
} else {
return distributedLockMapper.delete(new LambdaUpdateWrapper<DistributedLock>()
.eq(DistributedLock::getName, lockConfig.getLockName())) > 0;
}
} catch (Exception e) {
LogUtils.error(log, "unlock error. retrying attempt [{}] ", i, e);
}
@ -97,8 +104,8 @@ public class JdbcLockProvider extends AbstractLockProvider {
distributedLock.setName(lockConfig.getLockName());
try {
return distributedLockMapper.update(distributedLock, new LambdaUpdateWrapper<DistributedLock>()
.eq(DistributedLock::getName, lockConfig.getLockName())
.le(DistributedLock::getLockUntil, now)) > 0;
.eq(DistributedLock::getName, lockConfig.getLockName())
.le(DistributedLock::getLockUntil, now)) > 0;
} catch (ConcurrencyFailureException | DataIntegrityViolationException | TransactionSystemException |
UncategorizedSQLException e) {
return false;
@ -106,4 +113,17 @@ public class JdbcLockProvider extends AbstractLockProvider {
}
@Override
public void start() {
// 删除已经过期的锁记录
distributedLockMapper.delete(new LambdaQueryWrapper<DistributedLock>()
.le(DistributedLock::getLockUntil, LocalDateTime.now().minusSeconds(10)));
}
@Override
public void close() {
// 删除当前节点获取的锁记录
distributedLockMapper.delete(new LambdaUpdateWrapper<DistributedLock>()
.eq(DistributedLock::getLockedBy, ServerRegister.CURRENT_CID));
}
}

View File

@ -5,6 +5,7 @@ import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.server.common.Schedule;
import com.aizuda.easy.retry.server.common.config.SystemProperties;
import com.aizuda.easy.retry.server.common.dto.LockConfig;
import com.aizuda.easy.retry.server.common.enums.UnLockOperationEnum;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.common.lock.LockProvider;
import lombok.extern.slf4j.Slf4j;
@ -42,7 +43,9 @@ public abstract class AbstractSchedule implements Schedule {
Assert.notBlank(lockAtLeast, () -> new EasyRetryServerException("lockAtLeast can not be null."));
Assert.notBlank(lockName, () -> new EasyRetryServerException("lockName can not be null."));
LockConfig lockConfig = new LockConfig(LocalDateTime.now(), lockName, Duration.parse(lockAtMost), Duration.parse(lockAtLeast));
LockConfig lockConfig = new LockConfig(LocalDateTime.now(), lockName, Duration.parse(lockAtMost),
Duration.parse(lockAtLeast),
UnLockOperationEnum.UPDATE);
LockProvider lockProvider = getLockAccess();
boolean lock = false;

View File

@ -0,0 +1,12 @@
package com.aizuda.easy.retry.server.job.task.support;
/**
* @author: xiaowoniu
* @date : 2024-01-02
* @since : 2.6.0
*/
@FunctionalInterface
public interface LockExecutor {
void execute();
}

View File

@ -1,24 +1,16 @@
package com.aizuda.easy.retry.server.job.task.support.dispatch;
import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.lang.Pair;
import com.aizuda.easy.retry.common.core.constant.SystemConstants;
import com.aizuda.easy.retry.common.core.enums.FailStrategyEnum;
import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum;
import com.aizuda.easy.retry.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.common.core.util.JsonUtil;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.enums.JobTriggerTypeEnum;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.common.util.DateUtils;
import com.aizuda.easy.retry.server.common.util.GraphUtils;
import com.aizuda.easy.retry.server.job.task.dto.JobTaskPrepareDTO;
import com.aizuda.easy.retry.server.job.task.dto.TaskExecuteDTO;
import com.aizuda.easy.retry.server.job.task.dto.WorkflowNodeTaskExecuteDTO;
import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter;
import com.aizuda.easy.retry.server.job.task.support.WorkflowExecutor;
import com.aizuda.easy.retry.server.job.task.support.WorkflowTaskConverter;
import com.aizuda.easy.retry.server.job.task.support.cache.MutableGraphCache;
@ -34,10 +26,7 @@ import com.aizuda.easy.retry.template.datasource.persistence.po.JobTaskBatch;
import com.aizuda.easy.retry.template.datasource.persistence.po.WorkflowNode;
import com.aizuda.easy.retry.template.datasource.persistence.po.WorkflowTaskBatch;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Sets;
import com.google.common.graph.GraphBuilder;
import com.google.common.graph.MutableGraph;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

View File

@ -1,7 +1,14 @@
package com.aizuda.easy.retry.server.job.task.support.executor.workflow;
import com.aizuda.easy.retry.server.job.task.support.LockExecutor;
import com.aizuda.easy.retry.server.job.task.support.WorkflowExecutor;
import com.aizuda.easy.retry.server.job.task.support.handler.DistributedLockHandler;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskBatchMapper;
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTaskBatch;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.transaction.annotation.Transactional;
/**
@ -9,13 +16,32 @@ import org.springframework.transaction.annotation.Transactional;
* @date 2023-12-24 08:15:19
* @since 2.6.0
*/
@Slf4j
public abstract class AbstractWorkflowExecutor implements WorkflowExecutor, InitializingBean {
@Autowired
private DistributedLockHandler distributedLockHandler;
@Autowired
private JobTaskBatchMapper jobTaskBatchMapper;
@Override
@Transactional
public void execute(WorkflowExecutorContext context) {
distributedLockHandler.lockAndProcessAfterUnlockDel("workflow_execute_" + context.getWorkflowNodeId(), "PT5S",
() -> {
Long total = jobTaskBatchMapper.selectCount(new LambdaQueryWrapper<JobTaskBatch>()
.eq(JobTaskBatch::getWorkflowTaskBatchId, context.getWorkflowTaskBatchId())
.eq(JobTaskBatch::getWorkflowNodeId, context.getWorkflowNodeId())
);
if (total > 0) {
log.warn("任务节点[{}]已被执行,请勿重复执行", context.getWorkflowNodeId());
return;
}
doExecute(context);
});
doExecute(context);
}
protected abstract void doExecute(WorkflowExecutorContext context);

View File

@ -0,0 +1,144 @@
package com.aizuda.easy.retry.server.job.task.support.handler;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.util.StrUtil;
import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.server.common.Lifecycle;
import com.aizuda.easy.retry.server.common.config.SystemProperties;
import com.aizuda.easy.retry.server.common.dto.LockConfig;
import com.aizuda.easy.retry.server.common.enums.UnLockOperationEnum;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.common.lock.LockProvider;
import com.aizuda.easy.retry.server.job.task.support.LockExecutor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.List;
/**
* @author: xiaowoniu
* @date : 2024-01-02
* @since : 2.6.0
*/
@Component
@Slf4j
public class DistributedLockHandler {
@Autowired
private SystemProperties systemProperties;
@Autowired
private List<LockProvider> lockProviders;
public boolean tryLock(String lockName, String lockAtMost) {
Assert.notBlank(lockAtMost, () -> new EasyRetryServerException("lockAtLeast can not be null."));
Assert.notBlank(lockName, () -> new EasyRetryServerException("lockName can not be null."));
LockConfig lockConfig = new LockConfig(LocalDateTime.now(), lockName, Duration.parse(lockAtMost),
Duration.ofMillis(1),
UnLockOperationEnum.UPDATE);
LockProvider lockProvider = getLockAccess();
boolean lock = false;
try {
lock = lockProvider.lock(lockConfig);
} catch (Exception e) {
LogUtils.error(log, this.getClass().getName() + " execute error. lockName:[{}]", lockName, e);
}
return lock;
}
public boolean unlockAndDel(String lockName) {
Assert.notBlank(lockName, () -> new EasyRetryServerException("lockName can not be null."));
LockConfig lockConfig = new LockConfig(LocalDateTime.now(),
lockName,
Duration.ofSeconds(1), Duration.ofSeconds(0),
UnLockOperationEnum.DELETE);
LockProvider lockProvider = getLockAccess();
boolean lock = false;
try {
lockProvider.unlock(lockConfig);
} catch (Exception e) {
LogUtils.error(log, this.getClass().getName() + " execute error. lockName:[{}]", lockName, e);
}
return lock;
}
public boolean unlockAndUpdate(String lockName, String lockAtLeast) {
Assert.notBlank(lockAtLeast, () -> new EasyRetryServerException("lockAtLeast can not be null."));
Assert.notBlank(lockName, () -> new EasyRetryServerException("lockName can not be null."));
LockConfig lockConfig = new LockConfig(LocalDateTime.now(), lockName, Duration.ofSeconds(0),
Duration.parse(lockAtLeast),
UnLockOperationEnum.UPDATE);
LockProvider lockProvider = getLockAccess();
boolean lock = false;
try {
lock = lockProvider.unlock(lockConfig);
} catch (Exception e) {
LogUtils.error(log, this.getClass().getName() + " execute error. lockName:[{}]", lockName, e);
}
return lock;
}
public void lockAndProcessAfterUnlockDel(String lockName, String lockAtMost, LockExecutor lockExecutor) {
LockConfig lockConfig = new LockConfig(LocalDateTime.now(), lockName, Duration.parse(lockAtMost),
Duration.ofMillis(1),
UnLockOperationEnum.DELETE);
LockProvider lockProvider = getLockAccess();
boolean lock = false;
try {
lock = lockProvider.lock(lockConfig);
if (lock) {
lockExecutor.execute();
}
} catch (Exception e) {
LogUtils.error(log, this.getClass().getName() + " execute error. lockName:[{}]", lockName, e);
} finally {
if (lock) {
lockProvider.unlock(lockConfig);
}
}
}
public void lockAndProcessAfterUnlockUpdate(String lockName, String lockAtMost, String lockAtLeast,
LockExecutor lockExecutor) {
LockConfig lockConfig = new LockConfig(LocalDateTime.now(), lockName, Duration.parse(lockAtMost),
Duration.parse(lockAtLeast),
UnLockOperationEnum.UPDATE);
LockProvider lockProvider = getLockAccess();
boolean lock = false;
try {
lock = lockProvider.lock(lockConfig);
if (lock) {
lockExecutor.execute();
}
} catch (Exception e) {
LogUtils.error(log, this.getClass().getName() + " execute error. lockName:[{}]", lockName, e);
} finally {
if (lock) {
lockProvider.unlock(lockConfig);
}
}
}
private LockProvider getLockAccess() {
return lockProviders.stream()
.filter(lockProvider -> lockProvider.supports(systemProperties.getDbType().getDb()))
.findFirst().orElseThrow(() -> new EasyRetryServerException("未找到合适锁处理器"));
}
}

View File

@ -177,7 +177,10 @@ public class WorkflowServiceImpl implements WorkflowService {
workflow.setId(workflowRequestVO.getId());
workflow.setVersion(workflow.getVersion() + 1);
workflow.setFlowInfo(JsonUtil.toJsonString(GraphUtils.serializeGraphToJson(graph)));
Assert.isTrue(workflowMapper.updateById(workflow) > 0, () -> new EasyRetryServerException("更新失败"));
Assert.isTrue(workflowMapper.update(workflow, new LambdaQueryWrapper<Workflow>()
.eq(Workflow::getId, workflow.getId())
.eq(Workflow::getVersion, workflow.getVersion())
) > 0, () -> new EasyRetryServerException("更新失败"));
return Boolean.TRUE;
}