feat: 2.6.0

1. 修复分布式锁异常问题
This commit is contained in:
byteblogs168 2024-01-22 00:10:40 +08:00
parent 05d024bf11
commit e630d36a44
13 changed files with 165 additions and 115 deletions

View File

@ -45,6 +45,6 @@ public enum JobOperationReasonEnum {
*/
public static final List<Integer> WORKFLOW_SUCCESSOR_SKIP_EXECUTION = Arrays.asList(
WORKFLOW_NODE_NO_REQUIRED.getReason(), WORKFLOW_DECISION_FAILED.getReason(),
WORKFLOW_CONDITION_NODE_EXECUTION_ERROR.reason);
WORKFLOW_CONDITION_NODE_EXECUTION_ERROR.getReason());
}

View File

@ -1,9 +1,11 @@
package com.aizuda.easy.retry.server.common.lock;
import cn.hutool.core.lang.Assert;
import com.aizuda.easy.retry.common.core.context.SpringContext;
import com.aizuda.easy.retry.server.common.cache.CacheLockRecord;
import com.aizuda.easy.retry.server.common.dto.LockConfig;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import org.springframework.transaction.support.TransactionTemplate;
import java.time.Duration;
@ -31,7 +33,6 @@ public abstract class AbstractLockProvider implements LockProvider {
LockManager.setLockAtLeast(lockAtLeast);
LockManager.setLockAtMost(lockAtMost);
boolean tryToCreateLockRecord = !CacheLockRecord.lockRecordRecentlyCreated(lockName);
if (tryToCreateLockRecord) {
if (doLock(lockConfig)) {
@ -42,7 +43,7 @@ public abstract class AbstractLockProvider implements LockProvider {
CacheLockRecord.addLockRecord(lockName);
}
boolean lock = false;
boolean lock;
try {
lock = doLockAfter(lockConfig);
} catch (Exception e) {
@ -51,10 +52,6 @@ public abstract class AbstractLockProvider implements LockProvider {
}
throw e;
} finally {
if (!lock) {
LockManager.clear();
}
}
return lock;

View File

@ -1,5 +1,6 @@
package com.aizuda.easy.retry.server.common.lock.persistence;
import com.aizuda.easy.retry.common.core.context.SpringContext;
import com.aizuda.easy.retry.common.log.EasyRetryLog;
import com.aizuda.easy.retry.server.common.Lifecycle;
import com.aizuda.easy.retry.server.common.cache.CacheLockRecord;
@ -18,7 +19,12 @@ import org.springframework.dao.DuplicateKeyException;
import org.springframework.jdbc.BadSqlGrammarException;
import org.springframework.jdbc.UncategorizedSQLException;
import org.springframework.stereotype.Component;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.TransactionSystemException;
import org.springframework.transaction.support.TransactionCallback;
import org.springframework.transaction.support.TransactionCallbackWithoutResult;
import org.springframework.transaction.support.TransactionTemplate;
import java.time.LocalDateTime;
import java.util.Arrays;
@ -48,76 +54,90 @@ public class JdbcLockProvider implements LockStorage, Lifecycle {
@Override
public boolean createLock(LockConfig lockConfig) {
try {
LocalDateTime now = lockConfig.getCreateDt();
DistributedLock distributedLock = new DistributedLock();
distributedLock.setName(lockConfig.getLockName());
distributedLock.setLockedBy(ServerRegister.CURRENT_CID);
distributedLock.setLockedAt(now);
distributedLock.setLockUntil(lockConfig.getLockAtMost());
distributedLock.setCreateDt(now);
distributedLock.setUpdateDt(now);
return distributedLockMapper.insert(distributedLock) > 0;
} catch (DuplicateKeyException | ConcurrencyFailureException | TransactionSystemException e) {
return false;
} catch (DataIntegrityViolationException | BadSqlGrammarException | UncategorizedSQLException e) {
EasyRetryLog.LOCAL.error("Unexpected exception. lockName:[{}]", lockConfig.getLockName(), e);
return false;
}
TransactionTemplate transactionTemplate = SpringContext.getBean(TransactionTemplate.class);
transactionTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_NOT_SUPPORTED);
return Boolean.TRUE.equals(transactionTemplate.execute(status -> {
try {
LocalDateTime now = lockConfig.getCreateDt();
DistributedLock distributedLock = new DistributedLock();
distributedLock.setName(lockConfig.getLockName());
distributedLock.setLockedBy(ServerRegister.CURRENT_CID);
distributedLock.setLockedAt(now);
distributedLock.setLockUntil(lockConfig.getLockAtMost());
distributedLock.setCreateDt(now);
distributedLock.setUpdateDt(now);
return distributedLockMapper.insert(distributedLock) > 0;
} catch (DuplicateKeyException | ConcurrencyFailureException | TransactionSystemException e) {
return false;
} catch (DataIntegrityViolationException | BadSqlGrammarException | UncategorizedSQLException e) {
EasyRetryLog.LOCAL.error("Unexpected exception. lockName:[{}]", lockConfig.getLockName(), e);
return false;
}
}));
}
@Override
public boolean renewal(LockConfig lockConfig) {
LocalDateTime now = lockConfig.getCreateDt();
DistributedLock distributedLock = new DistributedLock();
distributedLock.setLockedBy(ServerRegister.CURRENT_CID);
distributedLock.setLockedAt(now);
distributedLock.setLockUntil(lockConfig.getLockAtMost());
distributedLock.setName(lockConfig.getLockName());
try {
return distributedLockMapper.update(distributedLock, new LambdaUpdateWrapper<DistributedLock>()
.eq(DistributedLock::getName, lockConfig.getLockName())
.le(DistributedLock::getLockUntil, now)) > 0;
} catch (ConcurrencyFailureException | DataIntegrityViolationException | TransactionSystemException |
UncategorizedSQLException e) {
return false;
}
TransactionTemplate transactionTemplate = SpringContext.getBean(TransactionTemplate.class);
transactionTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_NOT_SUPPORTED);
return Boolean.TRUE.equals(transactionTemplate.execute(status -> {
LocalDateTime now = lockConfig.getCreateDt();
DistributedLock distributedLock = new DistributedLock();
distributedLock.setLockedBy(ServerRegister.CURRENT_CID);
distributedLock.setLockedAt(now);
distributedLock.setLockUntil(lockConfig.getLockAtMost());
distributedLock.setName(lockConfig.getLockName());
try {
return distributedLockMapper.update(distributedLock, new LambdaUpdateWrapper<DistributedLock>()
.eq(DistributedLock::getName, lockConfig.getLockName())
.le(DistributedLock::getLockUntil, now)) > 0;
} catch (ConcurrencyFailureException | DataIntegrityViolationException | TransactionSystemException |
UncategorizedSQLException e) {
return false;
}
}));
}
@Override
public boolean releaseLockWithDelete(String lockName) {
for (int i = 0; i < 10; i++) {
try {
CacheLockRecord.remove(lockName);
return distributedLockMapper.delete(new LambdaUpdateWrapper<DistributedLock>()
.eq(DistributedLock::getName, lockName)) > 0;
} catch (Exception e) {
EasyRetryLog.LOCAL.error("unlock error. retrying attempt [{}] ", i, e);
TransactionTemplate transactionTemplate = SpringContext.getBean(TransactionTemplate.class);
transactionTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_NOT_SUPPORTED);
return Boolean.TRUE.equals(transactionTemplate.execute(status -> {
for (int i = 0; i < 10; i++) {
try {
return distributedLockMapper.delete(new LambdaUpdateWrapper<DistributedLock>()
.eq(DistributedLock::getName, lockName)) > 0;
} catch (Exception e) {
EasyRetryLog.LOCAL.error("unlock error. retrying attempt [{}] ", i, e);
} finally {
CacheLockRecord.remove(lockName);
}
}
}
return false;
return false;
}));
}
@Override
public boolean releaseLockWithUpdate(String lockName, LocalDateTime lockAtLeast) {
TransactionTemplate transactionTemplate = SpringContext.getBean(TransactionTemplate.class);
transactionTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_NOT_SUPPORTED);
LocalDateTime now = LocalDateTime.now();
for (int i = 0; i < 10; i++) {
try {
DistributedLock distributedLock = new DistributedLock();
distributedLock.setLockedBy(ServerRegister.CURRENT_CID);
distributedLock.setLockUntil(now.isBefore(lockAtLeast) ? lockAtLeast : now);
return distributedLockMapper.update(distributedLock, new LambdaUpdateWrapper<DistributedLock>()
.eq(DistributedLock::getName, lockName)) > 0;
} catch (Exception e) {
EasyRetryLog.LOCAL.error("unlock error. retrying attempt [{}] ", i, e);
return Boolean.TRUE.equals(transactionTemplate.execute(status -> {
for (int i = 0; i < 10; i++) {
try {
DistributedLock distributedLock = new DistributedLock();
distributedLock.setLockedBy(ServerRegister.CURRENT_CID);
distributedLock.setLockUntil(now.isBefore(lockAtLeast) ? lockAtLeast : now);
return distributedLockMapper.update(distributedLock, new LambdaUpdateWrapper<DistributedLock>()
.eq(DistributedLock::getName, lockName)) > 0;
} catch (Exception e) {
EasyRetryLog.LOCAL.error("unlock error. retrying attempt [{}] ", i, e);
}
}
}
return false;
return false;
}));
}
@Override
@ -129,6 +149,6 @@ public class JdbcLockProvider implements LockStorage, Lifecycle {
public void close() {
// 删除当前节点获取的锁记录
distributedLockMapper.delete(new LambdaUpdateWrapper<DistributedLock>()
.eq(DistributedLock::getLockedBy, ServerRegister.CURRENT_CID));
.eq(DistributedLock::getLockedBy, ServerRegister.CURRENT_CID));
}
}

View File

@ -7,6 +7,7 @@ import com.aizuda.easy.retry.server.common.Schedule;
import com.aizuda.easy.retry.server.common.config.SystemProperties;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.common.lock.LockBuilder;
import com.aizuda.easy.retry.server.common.lock.LockManager;
import com.aizuda.easy.retry.server.common.lock.LockProvider;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
@ -52,6 +53,8 @@ public abstract class AbstractSchedule implements Schedule {
} finally {
if (lock) {
lockProvider.unlock();
} else {
LockManager.clear();
}
}

View File

@ -73,26 +73,16 @@ public class JobExecutorResultActor extends AbstractActor {
Assert.isTrue(1 == jobTaskMapper.update(jobTask,
new LambdaUpdateWrapper<JobTask>().eq(JobTask::getId, result.getTaskId())),
()-> new EasyRetryServerException("更新任务实例失败"));
// 存在并发问题
distributedLockHandler.lockWithDisposableAndRetry(() -> {
// 更新批次上的状态
CompleteJobBatchDTO completeJobBatchDTO = JobTaskConverter.INSTANCE.toCompleteJobBatchDTO(result);
boolean complete = jobTaskBatchHandler.complete(completeJobBatchDTO);
if (complete) {
// 尝试停止任务
// 若是集群任务则客户端会主动关闭
if (result.getTaskType() != JobTaskTypeEnum.CLUSTER.getType()) {
JobTaskStopHandler instanceInterrupt = JobTaskStopFactory.getJobTaskStop(result.getTaskType());
TaskStopJobContext stopJobContext = JobTaskConverter.INSTANCE.toStopJobContext(result);
stopJobContext.setNeedUpdateTaskStatus(Boolean.FALSE);
stopJobContext.setForceStop(Boolean.TRUE);
instanceInterrupt.stop(stopJobContext);
}
}
}, MessageFormat.format(KEY, result.getTaskBatchId(), result.getJobId()), Duration.ofSeconds(3), Duration.ofSeconds(1), 3);
() -> new EasyRetryServerException("更新任务实例失败"));
// 先尝试完成若已完成则不需要通过获取分布式锁来完成
boolean tryCompleteAndStop = tryCompleteAndStop(result);
if (!tryCompleteAndStop) {
// 存在并发问题
distributedLockHandler.lockWithDisposableAndRetry(() -> {
tryCompleteAndStop(result);
}, MessageFormat.format(KEY, result.getTaskBatchId(),
result.getJobId()), Duration.ofSeconds(3), Duration.ofSeconds(1), 6);
}
}
});
} catch (Exception e) {
@ -102,5 +92,24 @@ public class JobExecutorResultActor extends AbstractActor {
}
}).build();
}
private boolean tryCompleteAndStop(JobExecutorResultDTO result) {
CompleteJobBatchDTO completeJobBatchDTO = JobTaskConverter.INSTANCE.toCompleteJobBatchDTO(result);
boolean complete = jobTaskBatchHandler.complete(completeJobBatchDTO);
if (complete) {
// 尝试停止任务
// 若是集群任务则客户端会主动关闭
if (result.getTaskType() != JobTaskTypeEnum.CLUSTER.getType()) {
JobTaskStopHandler instanceInterrupt = JobTaskStopFactory.getJobTaskStop(result.getTaskType());
TaskStopJobContext stopJobContext = JobTaskConverter.INSTANCE.toStopJobContext(result);
stopJobContext.setNeedUpdateTaskStatus(Boolean.FALSE);
stopJobContext.setForceStop(Boolean.TRUE);
instanceInterrupt.stop(stopJobContext);
}
}
return complete;
}
}

View File

@ -61,6 +61,7 @@ public class JobTaskPrepareActor extends AbstractActor {
prepare.getTaskExecutorScene());
if (TaskTypeEnum.WORKFLOW.getType().equals(jobTaskExecutorSceneEnum.getTaskType().getType())) {
queryWrapper.eq(JobTaskBatch::getWorkflowNodeId, prepare.getWorkflowNodeId());
queryWrapper.eq(JobTaskBatch::getWorkflowTaskBatchId, prepare.getWorkflowTaskBatchId());
queryWrapper.eq(JobTaskBatch::getTaskType, TaskTypeEnum.WORKFLOW.getType());
} else {
queryWrapper.eq(JobTaskBatch::getTaskType, TaskTypeEnum.JOB.getType());

View File

@ -89,7 +89,8 @@ public class WorkflowExecutorActor extends AbstractActor {
// 添加父节点为了判断父节点的处理状态
List<JobTaskBatch> allJobTaskBatchList = jobTaskBatchMapper.selectList(new LambdaQueryWrapper<JobTaskBatch>()
.select(JobTaskBatch::getWorkflowTaskBatchId, JobTaskBatch::getWorkflowNodeId, JobTaskBatch::getTaskBatchStatus)
.select(JobTaskBatch::getWorkflowTaskBatchId, JobTaskBatch::getWorkflowNodeId,
JobTaskBatch::getTaskBatchStatus, JobTaskBatch::getOperationReason)
.eq(JobTaskBatch::getWorkflowTaskBatchId, workflowTaskBatch.getId())
.in(JobTaskBatch::getWorkflowNodeId, Sets.union(successors, Sets.newHashSet(taskExecute.getParentId())))
);

View File

@ -60,7 +60,6 @@ public abstract class AbstractWorkflowExecutor implements WorkflowExecutor, Init
private TransactionTemplate transactionTemplate;
@Override
@Transactional
public void execute(WorkflowExecutorContext context) {
distributedLockHandler.lockWithDisposableAndRetry(
() -> {
@ -111,7 +110,7 @@ public abstract class AbstractWorkflowExecutor implements WorkflowExecutor, Init
}
});
}, MessageFormat.format(KEY, context.getWorkflowTaskBatchId(), context.getWorkflowNodeId()),
Duration.ofSeconds(5), Duration.ofSeconds(1), 3);
Duration.ofSeconds(6), Duration.ofSeconds(2), 12);
}

View File

@ -2,6 +2,7 @@ package com.aizuda.easy.retry.server.job.task.support.handler;
import com.aizuda.easy.retry.common.log.EasyRetryLog;
import com.aizuda.easy.retry.server.common.lock.LockBuilder;
import com.aizuda.easy.retry.server.common.lock.LockManager;
import com.aizuda.easy.retry.server.common.lock.LockProvider;
import com.aizuda.easy.retry.server.job.task.support.LockExecutor;
import com.github.rholder.retry.Attempt;
@ -13,9 +14,10 @@ import com.github.rholder.retry.StopStrategies;
import com.github.rholder.retry.WaitStrategies;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import java.time.Duration;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
@ -54,10 +56,16 @@ public class DistributedLockHandler {
.withRetryListener(new RetryListener() {
@Override
public <V> void onRetry(final Attempt<V> attempt) {
if (!attempt.hasResult()) {
EasyRetryLog.LOCAL.warn("第【{}】次尝试获取锁. lockName:[{}]",
attempt.getAttemptNumber(), lockName);
Object result = null;
if (attempt.hasResult()) {
try {
result = attempt.get();
} catch (ExecutionException ignored) {
}
}
EasyRetryLog.LOCAL.info("第【{}】次尝试获取锁. lockName:[{}] result:[{}] treadName:[{}]",
attempt.getAttemptNumber(), lockName, result, Thread.currentThread().getName());
}
}).build();
@ -71,13 +79,20 @@ public class DistributedLockHandler {
Throwable throwable = e;
if (e.getClass().isAssignableFrom(RetryException.class)) {
RetryException re = (RetryException) e;
throwable = re.getLastFailedAttempt().getExceptionCause();
Attempt<?> lastFailedAttempt = re.getLastFailedAttempt();
if (lastFailedAttempt.hasException()) {
throwable = lastFailedAttempt.getExceptionCause();
}
}
EasyRetryLog.LOCAL.error("lock execute error. lockName:[{}]", lockName, throwable);
} finally {
if (lock) {
EasyRetryLog.LOCAL.info("[{}] 锁已释放", lockName);
lockProvider.unlock();
} else {
// 未获取到锁直接清除线程中存储的锁信息
LockManager.clear();
}
}
@ -90,6 +105,7 @@ public class DistributedLockHandler {
* @param lockName 锁名称
* @param lockAtMost 锁超时时间
*/
@Transactional(propagation = Propagation.NOT_SUPPORTED)
public void lockWithDisposable(String lockName, Duration lockAtMost, LockExecutor lockExecutor) {
LockProvider lockProvider = LockBuilder.newBuilder()
@ -107,6 +123,9 @@ public class DistributedLockHandler {
} finally {
if (lock) {
lockProvider.unlock();
} else {
// 未获取到锁直接清除线程中存储的锁信息
LockManager.clear();
}
}
}

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@ -5,8 +5,8 @@
<meta charset="UTF-8" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<title>Easy Retry</title>
<script type="module" crossorigin src="./assets/1RK3CeRh.js"></script>
<link rel="stylesheet" crossorigin href="./assets/Z7fZzXo1.css">
<script type="module" crossorigin src="./assets/06nP_ETa.js"></script>
<link rel="stylesheet" crossorigin href="./assets/dSfAux_s.css">
</head>
<body>

View File

@ -9,28 +9,28 @@
<div class="log">
<table class="scroller">
<tbody>
<tr v-for="(log, index) in logList" :key="index">
<td class="index">
{{ index + 1 }}
</td>
<td>
<div class="content">
<div class="line">
<div class="flex">
<div class="text" style="color: #2db7f5">{{ timestampToDate(log.time_stamp) }}</div>
<div class="text" :style="{ color: LevelEnum[log.level].color }">
{{ log.level.length === 4 ? log.level + ' ' : log.level }}
<tr v-for="(log, index) in logList" :key="index">
<td class="index">
{{ index + 1 }}
</td>
<td>
<div class="content">
<div class="line">
<div class="flex">
<div class="text" style="color: #2db7f5">{{ timestampToDate(log.time_stamp) }}</div>
<div class="text" :style="{ color: LevelEnum[log.level].color }">
{{ log.level.length === 4 ? log.level + ' ' : log.level }}
</div>
<div class="text" style="color: #00a3a3">[{{ log.thread }}]</div>
<div class="text" style="color: #a771bf; font-weight: 500">{{ log.location }}</div>
<div class="text">:</div>
</div>
<div class="text" style="color: #00a3a3">[{{ log.thread }}]</div>
<div class="text" style="color: #a771bf; font-weight: 500">{{ log.location }}</div>
<div class="text">:</div>
<div class="text" style="font-size: 16px">{{ log.message }}</div>
<div class="text" style="font-size: 16px">{{ log.throwable }}</div>
</div>
<div class="text" style="font-size: 16px">{{ log.message }}</div>
<div class="text" style="font-size: 16px">{{ log.throwable }}</div>
</div>
</div>
</td>
</tr>
</td>
</tr>
</tbody>
</table>
</div>
@ -135,6 +135,7 @@ export default {
this.fromIndex = res.data.fromIndex
if (res.data.message) {
this.logList.push(...res.data.message)
this.logList.sort((a, b) => a.time_stamp - b.time_stamp)
}
})
.catch(() => {
@ -153,7 +154,7 @@ export default {
const hours = date.getHours()
const minutes = date.getMinutes().toString().length === 1 ? '0' + date.getMinutes() : date.getMinutes().toString()
const seconds = date.getSeconds().toString().length === 1 ? '0' + date.getSeconds() : date.getSeconds().toString()
return `${year}-${month}-${day} ${hours}:${minutes}:${seconds}`
return `${year}-${month}-${day} ${hours}:${minutes}:${seconds}.${date.getMilliseconds()}`
}
}
}