feat: 3.2.0
1. 优化分布式锁TransactionTemplate线程安全问题
This commit is contained in:
parent
a711c2c9ca
commit
73a51130bf
@ -1,6 +1,5 @@
|
|||||||
package com.aizuda.easy.retry.server.common.lock.persistence;
|
package com.aizuda.easy.retry.server.common.lock.persistence;
|
||||||
|
|
||||||
import com.aizuda.easy.retry.common.core.context.SpringContext;
|
|
||||||
import com.aizuda.easy.retry.common.log.EasyRetryLog;
|
import com.aizuda.easy.retry.common.log.EasyRetryLog;
|
||||||
import com.aizuda.easy.retry.server.common.Lifecycle;
|
import com.aizuda.easy.retry.server.common.Lifecycle;
|
||||||
import com.aizuda.easy.retry.server.common.cache.CacheLockRecord;
|
import com.aizuda.easy.retry.server.common.cache.CacheLockRecord;
|
||||||
@ -19,8 +18,10 @@ import org.springframework.dao.DuplicateKeyException;
|
|||||||
import org.springframework.jdbc.BadSqlGrammarException;
|
import org.springframework.jdbc.BadSqlGrammarException;
|
||||||
import org.springframework.jdbc.UncategorizedSQLException;
|
import org.springframework.jdbc.UncategorizedSQLException;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
import org.springframework.transaction.PlatformTransactionManager;
|
||||||
import org.springframework.transaction.TransactionDefinition;
|
import org.springframework.transaction.TransactionDefinition;
|
||||||
import org.springframework.transaction.TransactionSystemException;
|
import org.springframework.transaction.TransactionSystemException;
|
||||||
|
import org.springframework.transaction.support.TransactionCallback;
|
||||||
import org.springframework.transaction.support.TransactionTemplate;
|
import org.springframework.transaction.support.TransactionTemplate;
|
||||||
|
|
||||||
import java.time.LocalDateTime;
|
import java.time.LocalDateTime;
|
||||||
@ -40,13 +41,14 @@ import java.util.List;
|
|||||||
public class JdbcLockProvider implements LockStorage, Lifecycle {
|
public class JdbcLockProvider implements LockStorage, Lifecycle {
|
||||||
|
|
||||||
protected static final List<String> ALLOW_DB = Arrays.asList(
|
protected static final List<String> ALLOW_DB = Arrays.asList(
|
||||||
DbTypeEnum.MYSQL.getDb(),
|
DbTypeEnum.MYSQL.getDb(),
|
||||||
DbTypeEnum.MARIADB.getDb(),
|
DbTypeEnum.MARIADB.getDb(),
|
||||||
DbTypeEnum.POSTGRES.getDb(),
|
DbTypeEnum.POSTGRES.getDb(),
|
||||||
DbTypeEnum.ORACLE.getDb(),
|
DbTypeEnum.ORACLE.getDb(),
|
||||||
DbTypeEnum.SQLSERVER.getDb());
|
DbTypeEnum.SQLSERVER.getDb());
|
||||||
|
|
||||||
private final DistributedLockMapper distributedLockMapper;
|
private final DistributedLockMapper distributedLockMapper;
|
||||||
|
private final PlatformTransactionManager platformTransactionManager;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean supports(final String storageMedium) {
|
public boolean supports(final String storageMedium) {
|
||||||
@ -55,9 +57,7 @@ public class JdbcLockProvider implements LockStorage, Lifecycle {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean createLock(LockConfig lockConfig) {
|
public boolean createLock(LockConfig lockConfig) {
|
||||||
TransactionTemplate transactionTemplate = SpringContext.getBean(TransactionTemplate.class);
|
return Boolean.TRUE.equals(notSupportedTransaction(status -> {
|
||||||
transactionTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_NOT_SUPPORTED);
|
|
||||||
return Boolean.TRUE.equals(transactionTemplate.execute(status -> {
|
|
||||||
try {
|
try {
|
||||||
LocalDateTime now = lockConfig.getCreateDt();
|
LocalDateTime now = lockConfig.getCreateDt();
|
||||||
DistributedLock distributedLock = new DistributedLock();
|
DistributedLock distributedLock = new DistributedLock();
|
||||||
@ -80,9 +80,7 @@ public class JdbcLockProvider implements LockStorage, Lifecycle {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean renewal(LockConfig lockConfig) {
|
public boolean renewal(LockConfig lockConfig) {
|
||||||
TransactionTemplate transactionTemplate = SpringContext.getBean(TransactionTemplate.class);
|
return Boolean.TRUE.equals(notSupportedTransaction(status -> {
|
||||||
transactionTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_NOT_SUPPORTED);
|
|
||||||
return Boolean.TRUE.equals(transactionTemplate.execute(status -> {
|
|
||||||
LocalDateTime now = lockConfig.getCreateDt();
|
LocalDateTime now = lockConfig.getCreateDt();
|
||||||
DistributedLock distributedLock = new DistributedLock();
|
DistributedLock distributedLock = new DistributedLock();
|
||||||
distributedLock.setLockedBy(ServerRegister.CURRENT_CID);
|
distributedLock.setLockedBy(ServerRegister.CURRENT_CID);
|
||||||
@ -91,8 +89,8 @@ public class JdbcLockProvider implements LockStorage, Lifecycle {
|
|||||||
distributedLock.setName(lockConfig.getLockName());
|
distributedLock.setName(lockConfig.getLockName());
|
||||||
try {
|
try {
|
||||||
return distributedLockMapper.update(distributedLock, new LambdaUpdateWrapper<DistributedLock>()
|
return distributedLockMapper.update(distributedLock, new LambdaUpdateWrapper<DistributedLock>()
|
||||||
.eq(DistributedLock::getName, lockConfig.getLockName())
|
.eq(DistributedLock::getName, lockConfig.getLockName())
|
||||||
.le(DistributedLock::getLockUntil, now)) > 0;
|
.le(DistributedLock::getLockUntil, now)) > 0;
|
||||||
} catch (ConcurrencyFailureException | DataIntegrityViolationException | TransactionSystemException |
|
} catch (ConcurrencyFailureException | DataIntegrityViolationException | TransactionSystemException |
|
||||||
UncategorizedSQLException e) {
|
UncategorizedSQLException e) {
|
||||||
return false;
|
return false;
|
||||||
@ -102,13 +100,11 @@ public class JdbcLockProvider implements LockStorage, Lifecycle {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean releaseLockWithDelete(String lockName) {
|
public boolean releaseLockWithDelete(String lockName) {
|
||||||
TransactionTemplate transactionTemplate = SpringContext.getBean(TransactionTemplate.class);
|
return Boolean.TRUE.equals(notSupportedTransaction(status -> {
|
||||||
transactionTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_NOT_SUPPORTED);
|
|
||||||
return Boolean.TRUE.equals(transactionTemplate.execute(status -> {
|
|
||||||
for (int i = 0; i < 10; i++) {
|
for (int i = 0; i < 10; i++) {
|
||||||
try {
|
try {
|
||||||
return distributedLockMapper.delete(new LambdaUpdateWrapper<DistributedLock>()
|
return distributedLockMapper.delete(new LambdaUpdateWrapper<DistributedLock>()
|
||||||
.eq(DistributedLock::getName, lockName)) > 0;
|
.eq(DistributedLock::getName, lockName)) > 0;
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
EasyRetryLog.LOCAL.error("unlock error. retrying attempt [{}] ", i, e);
|
EasyRetryLog.LOCAL.error("unlock error. retrying attempt [{}] ", i, e);
|
||||||
} finally {
|
} finally {
|
||||||
@ -117,21 +113,20 @@ public class JdbcLockProvider implements LockStorage, Lifecycle {
|
|||||||
}
|
}
|
||||||
return false;
|
return false;
|
||||||
}));
|
}));
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean releaseLockWithUpdate(String lockName, LocalDateTime lockAtLeast) {
|
public boolean releaseLockWithUpdate(String lockName, LocalDateTime lockAtLeast) {
|
||||||
TransactionTemplate transactionTemplate = SpringContext.getBean(TransactionTemplate.class);
|
|
||||||
transactionTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_NOT_SUPPORTED);
|
|
||||||
LocalDateTime now = LocalDateTime.now();
|
LocalDateTime now = LocalDateTime.now();
|
||||||
return Boolean.TRUE.equals(transactionTemplate.execute(status -> {
|
return Boolean.TRUE.equals(notSupportedTransaction(status -> {
|
||||||
for (int i = 0; i < 10; i++) {
|
for (int i = 0; i < 10; i++) {
|
||||||
try {
|
try {
|
||||||
DistributedLock distributedLock = new DistributedLock();
|
DistributedLock distributedLock = new DistributedLock();
|
||||||
distributedLock.setLockedBy(ServerRegister.CURRENT_CID);
|
distributedLock.setLockedBy(ServerRegister.CURRENT_CID);
|
||||||
distributedLock.setLockUntil(now.isBefore(lockAtLeast) ? lockAtLeast : now);
|
distributedLock.setLockUntil(now.isBefore(lockAtLeast) ? lockAtLeast : now);
|
||||||
return distributedLockMapper.update(distributedLock, new LambdaUpdateWrapper<DistributedLock>()
|
return distributedLockMapper.update(distributedLock, new LambdaUpdateWrapper<DistributedLock>()
|
||||||
.eq(DistributedLock::getName, lockName)) > 0;
|
.eq(DistributedLock::getName, lockName)) > 0;
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
EasyRetryLog.LOCAL.error("unlock error. retrying attempt [{}] ", i, e);
|
EasyRetryLog.LOCAL.error("unlock error. retrying attempt [{}] ", i, e);
|
||||||
}
|
}
|
||||||
@ -150,6 +145,12 @@ public class JdbcLockProvider implements LockStorage, Lifecycle {
|
|||||||
public void close() {
|
public void close() {
|
||||||
// 删除当前节点获取的锁记录
|
// 删除当前节点获取的锁记录
|
||||||
distributedLockMapper.delete(new LambdaUpdateWrapper<DistributedLock>()
|
distributedLockMapper.delete(new LambdaUpdateWrapper<DistributedLock>()
|
||||||
.eq(DistributedLock::getLockedBy, ServerRegister.CURRENT_CID));
|
.eq(DistributedLock::getLockedBy, ServerRegister.CURRENT_CID));
|
||||||
|
}
|
||||||
|
|
||||||
|
private Boolean notSupportedTransaction(TransactionCallback<Boolean> action) {
|
||||||
|
TransactionTemplate template = new TransactionTemplate(platformTransactionManager);
|
||||||
|
template.setPropagationBehavior(TransactionDefinition.PROPAGATION_NOT_SUPPORTED);
|
||||||
|
return template.execute(action);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -170,7 +170,6 @@ public class JobLogMergeSchedule extends AbstractSchedule implements Lifecycle {
|
|||||||
jobLogMessageInsertBatchIds.add(jobLogMessage);
|
jobLogMessageInsertBatchIds.add(jobLogMessage);
|
||||||
}
|
}
|
||||||
|
|
||||||
transactionTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
|
|
||||||
transactionTemplate.execute(new TransactionCallbackWithoutResult() {
|
transactionTemplate.execute(new TransactionCallbackWithoutResult() {
|
||||||
@Override
|
@Override
|
||||||
protected void doInTransactionWithoutResult(final TransactionStatus status) {
|
protected void doInTransactionWithoutResult(final TransactionStatus status) {
|
||||||
|
@ -182,7 +182,6 @@ public class RetryLogMergeSchedule extends AbstractSchedule implements Lifecycle
|
|||||||
jobLogMessageUpdateList.add(jobLogMessage);
|
jobLogMessageUpdateList.add(jobLogMessage);
|
||||||
}
|
}
|
||||||
|
|
||||||
transactionTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
|
|
||||||
transactionTemplate.execute(new TransactionCallbackWithoutResult() {
|
transactionTemplate.execute(new TransactionCallbackWithoutResult() {
|
||||||
@Override
|
@Override
|
||||||
protected void doInTransactionWithoutResult(final TransactionStatus status) {
|
protected void doInTransactionWithoutResult(final TransactionStatus status) {
|
||||||
@ -196,8 +195,6 @@ public class RetryLogMergeSchedule extends AbstractSchedule implements Lifecycle
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
Loading…
Reference in New Issue
Block a user