feat: 2.6.0

1. 修复分布式锁异常问题
This commit is contained in:
byteblogs168 2024-01-22 00:10:40 +08:00
parent 145d397b10
commit fa4eee8842
13 changed files with 165 additions and 115 deletions

View File

@ -45,6 +45,6 @@ public enum JobOperationReasonEnum {
*/ */
public static final List<Integer> WORKFLOW_SUCCESSOR_SKIP_EXECUTION = Arrays.asList( public static final List<Integer> WORKFLOW_SUCCESSOR_SKIP_EXECUTION = Arrays.asList(
WORKFLOW_NODE_NO_REQUIRED.getReason(), WORKFLOW_DECISION_FAILED.getReason(), WORKFLOW_NODE_NO_REQUIRED.getReason(), WORKFLOW_DECISION_FAILED.getReason(),
WORKFLOW_CONDITION_NODE_EXECUTION_ERROR.reason); WORKFLOW_CONDITION_NODE_EXECUTION_ERROR.getReason());
} }

View File

@ -1,9 +1,11 @@
package com.aizuda.easy.retry.server.common.lock; package com.aizuda.easy.retry.server.common.lock;
import cn.hutool.core.lang.Assert; import cn.hutool.core.lang.Assert;
import com.aizuda.easy.retry.common.core.context.SpringContext;
import com.aizuda.easy.retry.server.common.cache.CacheLockRecord; import com.aizuda.easy.retry.server.common.cache.CacheLockRecord;
import com.aizuda.easy.retry.server.common.dto.LockConfig; import com.aizuda.easy.retry.server.common.dto.LockConfig;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException; import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import org.springframework.transaction.support.TransactionTemplate;
import java.time.Duration; import java.time.Duration;
@ -31,7 +33,6 @@ public abstract class AbstractLockProvider implements LockProvider {
LockManager.setLockAtLeast(lockAtLeast); LockManager.setLockAtLeast(lockAtLeast);
LockManager.setLockAtMost(lockAtMost); LockManager.setLockAtMost(lockAtMost);
boolean tryToCreateLockRecord = !CacheLockRecord.lockRecordRecentlyCreated(lockName); boolean tryToCreateLockRecord = !CacheLockRecord.lockRecordRecentlyCreated(lockName);
if (tryToCreateLockRecord) { if (tryToCreateLockRecord) {
if (doLock(lockConfig)) { if (doLock(lockConfig)) {
@ -42,7 +43,7 @@ public abstract class AbstractLockProvider implements LockProvider {
CacheLockRecord.addLockRecord(lockName); CacheLockRecord.addLockRecord(lockName);
} }
boolean lock = false; boolean lock;
try { try {
lock = doLockAfter(lockConfig); lock = doLockAfter(lockConfig);
} catch (Exception e) { } catch (Exception e) {
@ -51,10 +52,6 @@ public abstract class AbstractLockProvider implements LockProvider {
} }
throw e; throw e;
} finally {
if (!lock) {
LockManager.clear();
}
} }
return lock; return lock;

View File

@ -1,5 +1,6 @@
package com.aizuda.easy.retry.server.common.lock.persistence; package com.aizuda.easy.retry.server.common.lock.persistence;
import com.aizuda.easy.retry.common.core.context.SpringContext;
import com.aizuda.easy.retry.common.log.EasyRetryLog; import com.aizuda.easy.retry.common.log.EasyRetryLog;
import com.aizuda.easy.retry.server.common.Lifecycle; import com.aizuda.easy.retry.server.common.Lifecycle;
import com.aizuda.easy.retry.server.common.cache.CacheLockRecord; import com.aizuda.easy.retry.server.common.cache.CacheLockRecord;
@ -18,7 +19,12 @@ import org.springframework.dao.DuplicateKeyException;
import org.springframework.jdbc.BadSqlGrammarException; import org.springframework.jdbc.BadSqlGrammarException;
import org.springframework.jdbc.UncategorizedSQLException; import org.springframework.jdbc.UncategorizedSQLException;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.TransactionSystemException; import org.springframework.transaction.TransactionSystemException;
import org.springframework.transaction.support.TransactionCallback;
import org.springframework.transaction.support.TransactionCallbackWithoutResult;
import org.springframework.transaction.support.TransactionTemplate;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.util.Arrays; import java.util.Arrays;
@ -48,6 +54,9 @@ public class JdbcLockProvider implements LockStorage, Lifecycle {
@Override @Override
public boolean createLock(LockConfig lockConfig) { public boolean createLock(LockConfig lockConfig) {
TransactionTemplate transactionTemplate = SpringContext.getBean(TransactionTemplate.class);
transactionTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_NOT_SUPPORTED);
return Boolean.TRUE.equals(transactionTemplate.execute(status -> {
try { try {
LocalDateTime now = lockConfig.getCreateDt(); LocalDateTime now = lockConfig.getCreateDt();
DistributedLock distributedLock = new DistributedLock(); DistributedLock distributedLock = new DistributedLock();
@ -64,10 +73,15 @@ public class JdbcLockProvider implements LockStorage, Lifecycle {
EasyRetryLog.LOCAL.error("Unexpected exception. lockName:[{}]", lockConfig.getLockName(), e); EasyRetryLog.LOCAL.error("Unexpected exception. lockName:[{}]", lockConfig.getLockName(), e);
return false; return false;
} }
}));
} }
@Override @Override
public boolean renewal(LockConfig lockConfig) { public boolean renewal(LockConfig lockConfig) {
TransactionTemplate transactionTemplate = SpringContext.getBean(TransactionTemplate.class);
transactionTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_NOT_SUPPORTED);
return Boolean.TRUE.equals(transactionTemplate.execute(status -> {
LocalDateTime now = lockConfig.getCreateDt(); LocalDateTime now = lockConfig.getCreateDt();
DistributedLock distributedLock = new DistributedLock(); DistributedLock distributedLock = new DistributedLock();
distributedLock.setLockedBy(ServerRegister.CURRENT_CID); distributedLock.setLockedBy(ServerRegister.CURRENT_CID);
@ -82,29 +96,34 @@ public class JdbcLockProvider implements LockStorage, Lifecycle {
UncategorizedSQLException e) { UncategorizedSQLException e) {
return false; return false;
} }
}));
} }
@Override @Override
public boolean releaseLockWithDelete(String lockName) { public boolean releaseLockWithDelete(String lockName) {
TransactionTemplate transactionTemplate = SpringContext.getBean(TransactionTemplate.class);
transactionTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_NOT_SUPPORTED);
return Boolean.TRUE.equals(transactionTemplate.execute(status -> {
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
try { try {
CacheLockRecord.remove(lockName);
return distributedLockMapper.delete(new LambdaUpdateWrapper<DistributedLock>() return distributedLockMapper.delete(new LambdaUpdateWrapper<DistributedLock>()
.eq(DistributedLock::getName, lockName)) > 0; .eq(DistributedLock::getName, lockName)) > 0;
} catch (Exception e) { } catch (Exception e) {
EasyRetryLog.LOCAL.error("unlock error. retrying attempt [{}] ", i, e); EasyRetryLog.LOCAL.error("unlock error. retrying attempt [{}] ", i, e);
} finally {
CacheLockRecord.remove(lockName);
} }
} }
return false; return false;
}));
} }
@Override @Override
public boolean releaseLockWithUpdate(String lockName, LocalDateTime lockAtLeast) { public boolean releaseLockWithUpdate(String lockName, LocalDateTime lockAtLeast) {
TransactionTemplate transactionTemplate = SpringContext.getBean(TransactionTemplate.class);
transactionTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_NOT_SUPPORTED);
LocalDateTime now = LocalDateTime.now(); LocalDateTime now = LocalDateTime.now();
return Boolean.TRUE.equals(transactionTemplate.execute(status -> {
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
try { try {
DistributedLock distributedLock = new DistributedLock(); DistributedLock distributedLock = new DistributedLock();
@ -118,6 +137,7 @@ public class JdbcLockProvider implements LockStorage, Lifecycle {
} }
return false; return false;
}));
} }
@Override @Override

View File

@ -7,6 +7,7 @@ import com.aizuda.easy.retry.server.common.Schedule;
import com.aizuda.easy.retry.server.common.config.SystemProperties; import com.aizuda.easy.retry.server.common.config.SystemProperties;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException; import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.common.lock.LockBuilder; import com.aizuda.easy.retry.server.common.lock.LockBuilder;
import com.aizuda.easy.retry.server.common.lock.LockManager;
import com.aizuda.easy.retry.server.common.lock.LockProvider; import com.aizuda.easy.retry.server.common.lock.LockProvider;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
@ -52,6 +53,8 @@ public abstract class AbstractSchedule implements Schedule {
} finally { } finally {
if (lock) { if (lock) {
lockProvider.unlock(); lockProvider.unlock();
} else {
LockManager.clear();
} }
} }

View File

@ -73,11 +73,29 @@ public class JobExecutorResultActor extends AbstractActor {
Assert.isTrue(1 == jobTaskMapper.update(jobTask, Assert.isTrue(1 == jobTaskMapper.update(jobTask,
new LambdaUpdateWrapper<JobTask>().eq(JobTask::getId, result.getTaskId())), new LambdaUpdateWrapper<JobTask>().eq(JobTask::getId, result.getTaskId())),
()-> new EasyRetryServerException("更新任务实例失败")); () -> new EasyRetryServerException("更新任务实例失败"));
// 先尝试完成若已完成则不需要通过获取分布式锁来完成
boolean tryCompleteAndStop = tryCompleteAndStop(result);
if (!tryCompleteAndStop) {
// 存在并发问题 // 存在并发问题
distributedLockHandler.lockWithDisposableAndRetry(() -> { distributedLockHandler.lockWithDisposableAndRetry(() -> {
// 更新批次上的状态 tryCompleteAndStop(result);
}, MessageFormat.format(KEY, result.getTaskBatchId(),
result.getJobId()), Duration.ofSeconds(3), Duration.ofSeconds(1), 6);
}
}
});
} catch (Exception e) {
EasyRetryLog.LOCAL.error(" job executor result exception. [{}]", result, e);
} finally {
getContext().stop(getSelf());
}
}).build();
}
private boolean tryCompleteAndStop(JobExecutorResultDTO result) {
CompleteJobBatchDTO completeJobBatchDTO = JobTaskConverter.INSTANCE.toCompleteJobBatchDTO(result); CompleteJobBatchDTO completeJobBatchDTO = JobTaskConverter.INSTANCE.toCompleteJobBatchDTO(result);
boolean complete = jobTaskBatchHandler.complete(completeJobBatchDTO); boolean complete = jobTaskBatchHandler.complete(completeJobBatchDTO);
if (complete) { if (complete) {
@ -91,16 +109,7 @@ public class JobExecutorResultActor extends AbstractActor {
instanceInterrupt.stop(stopJobContext); instanceInterrupt.stop(stopJobContext);
} }
} }
}, MessageFormat.format(KEY, result.getTaskBatchId(), result.getJobId()), Duration.ofSeconds(3), Duration.ofSeconds(1), 3);
} return complete;
});
} catch (Exception e) {
EasyRetryLog.LOCAL.error(" job executor result exception. [{}]", result, e);
} finally {
getContext().stop(getSelf());
}
}).build();
} }
} }

View File

@ -61,6 +61,7 @@ public class JobTaskPrepareActor extends AbstractActor {
prepare.getTaskExecutorScene()); prepare.getTaskExecutorScene());
if (TaskTypeEnum.WORKFLOW.getType().equals(jobTaskExecutorSceneEnum.getTaskType().getType())) { if (TaskTypeEnum.WORKFLOW.getType().equals(jobTaskExecutorSceneEnum.getTaskType().getType())) {
queryWrapper.eq(JobTaskBatch::getWorkflowNodeId, prepare.getWorkflowNodeId()); queryWrapper.eq(JobTaskBatch::getWorkflowNodeId, prepare.getWorkflowNodeId());
queryWrapper.eq(JobTaskBatch::getWorkflowTaskBatchId, prepare.getWorkflowTaskBatchId());
queryWrapper.eq(JobTaskBatch::getTaskType, TaskTypeEnum.WORKFLOW.getType()); queryWrapper.eq(JobTaskBatch::getTaskType, TaskTypeEnum.WORKFLOW.getType());
} else { } else {
queryWrapper.eq(JobTaskBatch::getTaskType, TaskTypeEnum.JOB.getType()); queryWrapper.eq(JobTaskBatch::getTaskType, TaskTypeEnum.JOB.getType());

View File

@ -89,7 +89,8 @@ public class WorkflowExecutorActor extends AbstractActor {
// 添加父节点为了判断父节点的处理状态 // 添加父节点为了判断父节点的处理状态
List<JobTaskBatch> allJobTaskBatchList = jobTaskBatchMapper.selectList(new LambdaQueryWrapper<JobTaskBatch>() List<JobTaskBatch> allJobTaskBatchList = jobTaskBatchMapper.selectList(new LambdaQueryWrapper<JobTaskBatch>()
.select(JobTaskBatch::getWorkflowTaskBatchId, JobTaskBatch::getWorkflowNodeId, JobTaskBatch::getTaskBatchStatus) .select(JobTaskBatch::getWorkflowTaskBatchId, JobTaskBatch::getWorkflowNodeId,
JobTaskBatch::getTaskBatchStatus, JobTaskBatch::getOperationReason)
.eq(JobTaskBatch::getWorkflowTaskBatchId, workflowTaskBatch.getId()) .eq(JobTaskBatch::getWorkflowTaskBatchId, workflowTaskBatch.getId())
.in(JobTaskBatch::getWorkflowNodeId, Sets.union(successors, Sets.newHashSet(taskExecute.getParentId()))) .in(JobTaskBatch::getWorkflowNodeId, Sets.union(successors, Sets.newHashSet(taskExecute.getParentId())))
); );

View File

@ -60,7 +60,6 @@ public abstract class AbstractWorkflowExecutor implements WorkflowExecutor, Init
private TransactionTemplate transactionTemplate; private TransactionTemplate transactionTemplate;
@Override @Override
@Transactional
public void execute(WorkflowExecutorContext context) { public void execute(WorkflowExecutorContext context) {
distributedLockHandler.lockWithDisposableAndRetry( distributedLockHandler.lockWithDisposableAndRetry(
() -> { () -> {
@ -111,7 +110,7 @@ public abstract class AbstractWorkflowExecutor implements WorkflowExecutor, Init
} }
}); });
}, MessageFormat.format(KEY, context.getWorkflowTaskBatchId(), context.getWorkflowNodeId()), }, MessageFormat.format(KEY, context.getWorkflowTaskBatchId(), context.getWorkflowNodeId()),
Duration.ofSeconds(5), Duration.ofSeconds(1), 3); Duration.ofSeconds(6), Duration.ofSeconds(2), 12);
} }

View File

@ -2,6 +2,7 @@ package com.aizuda.easy.retry.server.job.task.support.handler;
import com.aizuda.easy.retry.common.log.EasyRetryLog; import com.aizuda.easy.retry.common.log.EasyRetryLog;
import com.aizuda.easy.retry.server.common.lock.LockBuilder; import com.aizuda.easy.retry.server.common.lock.LockBuilder;
import com.aizuda.easy.retry.server.common.lock.LockManager;
import com.aizuda.easy.retry.server.common.lock.LockProvider; import com.aizuda.easy.retry.server.common.lock.LockProvider;
import com.aizuda.easy.retry.server.job.task.support.LockExecutor; import com.aizuda.easy.retry.server.job.task.support.LockExecutor;
import com.github.rholder.retry.Attempt; import com.github.rholder.retry.Attempt;
@ -13,9 +14,10 @@ import com.github.rholder.retry.StopStrategies;
import com.github.rholder.retry.WaitStrategies; import com.github.rholder.retry.WaitStrategies;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import java.time.Duration; import java.time.Duration;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -54,11 +56,17 @@ public class DistributedLockHandler {
.withRetryListener(new RetryListener() { .withRetryListener(new RetryListener() {
@Override @Override
public <V> void onRetry(final Attempt<V> attempt) { public <V> void onRetry(final Attempt<V> attempt) {
if (!attempt.hasResult()) { Object result = null;
EasyRetryLog.LOCAL.warn("第【{}】次尝试获取锁. lockName:[{}]", if (attempt.hasResult()) {
attempt.getAttemptNumber(), lockName); try {
result = attempt.get();
} catch (ExecutionException ignored) {
} }
} }
EasyRetryLog.LOCAL.info("第【{}】次尝试获取锁. lockName:[{}] result:[{}] treadName:[{}]",
attempt.getAttemptNumber(), lockName, result, Thread.currentThread().getName());
}
}).build(); }).build();
boolean lock = false; boolean lock = false;
@ -71,13 +79,20 @@ public class DistributedLockHandler {
Throwable throwable = e; Throwable throwable = e;
if (e.getClass().isAssignableFrom(RetryException.class)) { if (e.getClass().isAssignableFrom(RetryException.class)) {
RetryException re = (RetryException) e; RetryException re = (RetryException) e;
throwable = re.getLastFailedAttempt().getExceptionCause(); Attempt<?> lastFailedAttempt = re.getLastFailedAttempt();
if (lastFailedAttempt.hasException()) {
throwable = lastFailedAttempt.getExceptionCause();
}
} }
EasyRetryLog.LOCAL.error("lock execute error. lockName:[{}]", lockName, throwable); EasyRetryLog.LOCAL.error("lock execute error. lockName:[{}]", lockName, throwable);
} finally { } finally {
if (lock) { if (lock) {
EasyRetryLog.LOCAL.info("[{}] 锁已释放", lockName);
lockProvider.unlock(); lockProvider.unlock();
} else {
// 未获取到锁直接清除线程中存储的锁信息
LockManager.clear();
} }
} }
@ -90,6 +105,7 @@ public class DistributedLockHandler {
* @param lockName 锁名称 * @param lockName 锁名称
* @param lockAtMost 锁超时时间 * @param lockAtMost 锁超时时间
*/ */
@Transactional(propagation = Propagation.NOT_SUPPORTED)
public void lockWithDisposable(String lockName, Duration lockAtMost, LockExecutor lockExecutor) { public void lockWithDisposable(String lockName, Duration lockAtMost, LockExecutor lockExecutor) {
LockProvider lockProvider = LockBuilder.newBuilder() LockProvider lockProvider = LockBuilder.newBuilder()
@ -107,6 +123,9 @@ public class DistributedLockHandler {
} finally { } finally {
if (lock) { if (lock) {
lockProvider.unlock(); lockProvider.unlock();
} else {
// 未获取到锁直接清除线程中存储的锁信息
LockManager.clear();
} }
} }
} }

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@ -5,8 +5,8 @@
<meta charset="UTF-8" /> <meta charset="UTF-8" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" /> <meta name="viewport" content="width=device-width, initial-scale=1.0" />
<title>Easy Retry</title> <title>Easy Retry</title>
<script type="module" crossorigin src="./assets/1RK3CeRh.js"></script> <script type="module" crossorigin src="./assets/06nP_ETa.js"></script>
<link rel="stylesheet" crossorigin href="./assets/Z7fZzXo1.css"> <link rel="stylesheet" crossorigin href="./assets/dSfAux_s.css">
</head> </head>
<body> <body>

View File

@ -135,6 +135,7 @@ export default {
this.fromIndex = res.data.fromIndex this.fromIndex = res.data.fromIndex
if (res.data.message) { if (res.data.message) {
this.logList.push(...res.data.message) this.logList.push(...res.data.message)
this.logList.sort((a, b) => a.time_stamp - b.time_stamp)
} }
}) })
.catch(() => { .catch(() => {
@ -153,7 +154,7 @@ export default {
const hours = date.getHours() const hours = date.getHours()
const minutes = date.getMinutes().toString().length === 1 ? '0' + date.getMinutes() : date.getMinutes().toString() const minutes = date.getMinutes().toString().length === 1 ? '0' + date.getMinutes() : date.getMinutes().toString()
const seconds = date.getSeconds().toString().length === 1 ? '0' + date.getSeconds() : date.getSeconds().toString() const seconds = date.getSeconds().toString().length === 1 ? '0' + date.getSeconds() : date.getSeconds().toString()
return `${year}-${month}-${day} ${hours}:${minutes}:${seconds}` return `${year}-${month}-${day} ${hours}:${minutes}:${seconds}.${date.getMilliseconds()}`
} }
} }
} }