feat: 2.6.0

1. 分布式锁优化
This commit is contained in:
byteblogs168 2024-01-11 23:42:16 +08:00
parent d907dcda90
commit de21cdad3e
15 changed files with 432 additions and 252 deletions

View File

@ -1,47 +1,24 @@
package com.aizuda.easy.retry.server.common.dto; package com.aizuda.easy.retry.server.common.dto;
import cn.hutool.core.lang.Assert;
import com.aizuda.easy.retry.server.common.enums.UnLockOperationEnum;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import java.time.Duration; import java.time.Duration;
import java.time.LocalDateTime; import java.time.LocalDateTime;
/** /**
* 分布式锁配置
*
* @author: www.byteblogs.com * @author: www.byteblogs.com
* @date : 2023-07-21 08:43 * @date : 2023-07-21 08:43
* @since 2.1.0 * @since 2.1.0
*/ */
public class LockConfig { public class LockConfig {
private final LocalDateTime createDt; private LocalDateTime createDt;
private final String lockName; private String lockName;
private final Duration lockAtMost; private Duration lockAtMost;
private final Duration lockAtLeast; private Duration lockAtLeast;
private final UnLockOperationEnum unLockOperation;
public LockConfig(final LocalDateTime createDt,
final String lockName,
final Duration lockAtMost,
final Duration lockAtLeast,
final UnLockOperationEnum unLockOperation) {
this.lockName = lockName;
this.lockAtMost = lockAtMost;
this.lockAtLeast = lockAtLeast;
this.createDt = createDt;
this.unLockOperation = unLockOperation;
Assert.notNull(createDt, () -> new EasyRetryServerException("createDt can not be null."));
Assert.notBlank(lockName, () -> new EasyRetryServerException("lockName can not be null."));
Assert.notNull(lockAtMost, () -> new EasyRetryServerException("lockAtMost can not be null. lockName:[{}]", lockName));
Assert.isFalse(lockAtMost.isNegative(), () -> new EasyRetryServerException("lockAtMost is negative. lockName:[{}]", lockName));
Assert.notNull(lockAtLeast, () -> new EasyRetryServerException("lockAtLeast can not be null. lockName:[{}]", lockName));
Assert.isFalse(lockAtLeast.compareTo(lockAtMost) > 0, () -> new EasyRetryServerException("lockAtLeast is longer than lockAtMost for lock. lockName:[{}]", lockName));
Assert.notNull(unLockOperation, () -> new EasyRetryServerException("unLockOperation can not be null. lockName:[{}]", lockName));
}
public LocalDateTime getCreateDt() { public LocalDateTime getCreateDt() {
return createDt; return createDt;
@ -51,12 +28,24 @@ public class LockConfig {
return lockName; return lockName;
} }
public LocalDateTime getLockAtMost() { public void setCreateDt(LocalDateTime createDt) {
return createDt.plus(lockAtMost); this.createDt = createDt;
} }
public UnLockOperationEnum getUnLockOperation() { public void setLockName(String lockName) {
return unLockOperation; this.lockName = lockName;
}
public void setLockAtMost(Duration lockAtMost) {
this.lockAtMost = lockAtMost;
}
public void setLockAtLeast(Duration lockAtLeast) {
this.lockAtLeast = lockAtLeast;
}
public LocalDateTime getLockAtMost() {
return createDt.plus(lockAtMost);
} }
public LocalDateTime getLockAtLeast() { public LocalDateTime getLockAtLeast() {

View File

@ -1,13 +0,0 @@
package com.aizuda.easy.retry.server.common.enums;
/**
* @author: xiaowoniu
* @date : 2024-01-02
* @since : 2.6.0
*/
public enum UnLockOperationEnum {
DELETE,
UPDATE,
;
}

View File

@ -1,11 +1,11 @@
package com.aizuda.easy.retry.server.common.lock; package com.aizuda.easy.retry.server.common.lock;
import cn.hutool.core.lang.Assert;
import com.aizuda.easy.retry.server.common.cache.CacheLockRecord; import com.aizuda.easy.retry.server.common.cache.CacheLockRecord;
import com.aizuda.easy.retry.server.common.dto.LockConfig; import com.aizuda.easy.retry.server.common.dto.LockConfig;
import com.aizuda.easy.retry.template.datasource.enums.DbTypeEnum; import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import java.util.Arrays; import java.time.Duration;
import java.util.List;
/** /**
* @author www.byteblogs.com * @author www.byteblogs.com
@ -13,14 +13,25 @@ import java.util.List;
* @since 2.1.0 * @since 2.1.0
*/ */
public abstract class AbstractLockProvider implements LockProvider { public abstract class AbstractLockProvider implements LockProvider {
protected static final List<String> ALLOW_DB = Arrays.asList(DbTypeEnum.MYSQL.getDb(),
DbTypeEnum.MARIADB.getDb(),
DbTypeEnum.POSTGRES.getDb());
@Override
public boolean lock(final LockConfig lockConfig) {
@Override
public boolean lock(Duration lockAtMost) {
return lock(lockAtMost, lockAtMost);
}
@Override
public boolean lock(Duration lockAtLeast, Duration lockAtMost) {
LockConfig lockConfig = LockManager.getLockConfig();
String lockName = lockConfig.getLockName(); String lockName = lockConfig.getLockName();
Assert.notNull(lockAtMost, () -> new EasyRetryServerException("lockAtMost can not be null. lockName:[{}]", lockName));
Assert.isFalse(lockAtMost.isNegative(), () -> new EasyRetryServerException("lockAtMost is negative. lockName:[{}]", lockName));
Assert.notNull(lockAtLeast, () -> new EasyRetryServerException("lockAtLeast can not be null. lockName:[{}]", lockName));
Assert.isFalse(lockAtLeast.compareTo(lockAtMost) > 0, () -> new EasyRetryServerException("lockAtLeast is longer than lockAtMost for lock. lockName:[{}]", lockName));
LockManager.setLockAtLeast(lockAtLeast);
LockManager.setLockAtLeast(lockAtMost);
boolean tryToCreateLockRecord = !CacheLockRecord.lockRecordRecentlyCreated(lockName); boolean tryToCreateLockRecord = !CacheLockRecord.lockRecordRecentlyCreated(lockName);
if (tryToCreateLockRecord) { if (tryToCreateLockRecord) {
if (doLock(lockConfig)) { if (doLock(lockConfig)) {
@ -43,14 +54,27 @@ public abstract class AbstractLockProvider implements LockProvider {
} }
protected boolean doLockAfter(LockConfig lockConfig) { protected boolean doLockAfter(LockConfig lockConfig) {
return updateRecord(lockConfig); return renewal(lockConfig);
} }
protected boolean doLock(final LockConfig lockConfig) { protected boolean doLock(final LockConfig lockConfig) {
return insertRecord(lockConfig); return createLock(lockConfig);
} }
protected abstract boolean insertRecord(final LockConfig lockConfig); @Override
public boolean unlock() {
try {
LockConfig lockConfig = LockManager.getLockConfig();
return doUnlock(lockConfig);
} finally {
LockManager.clear();
}
protected abstract boolean updateRecord(final LockConfig lockConfig); }
protected abstract boolean doUnlock(LockConfig lockConfig);
protected abstract boolean createLock(final LockConfig lockConfig);
protected abstract boolean renewal(final LockConfig lockConfig);
} }

View File

@ -0,0 +1,35 @@
package com.aizuda.easy.retry.server.common.lock;
import com.aizuda.easy.retry.server.common.dto.LockConfig;
import com.aizuda.easy.retry.server.common.lock.persistence.LockStorage;
import com.aizuda.easy.retry.server.common.lock.persistence.LockStorageFactory;
/**
* @author xiaowoniu
* @date 2024-01-11 21:26:54
* @since 2.6.0
*/
public class DisposableLockProvider extends AbstractLockProvider {
@Override
protected boolean doUnlock(LockConfig lockConfig) {
return doUnlockWithDelete(lockConfig);
}
protected boolean doUnlockWithDelete(LockConfig lockConfig) {
LockStorage lockStorage = LockStorageFactory.getLockStorage();
return lockStorage.releaseLockWithDelete(lockConfig.getLockName());
}
@Override
protected boolean createLock(LockConfig lockConfig) {
LockStorage lockStorage = LockStorageFactory.getLockStorage();
return lockStorage.createLock(lockConfig);
}
@Override
protected boolean renewal(LockConfig lockConfig) {
LockStorage lockStorage = LockStorageFactory.getLockStorage();
return lockStorage.renewal(lockConfig);
}
}

View File

@ -0,0 +1,50 @@
package com.aizuda.easy.retry.server.common.lock;
import cn.hutool.core.lang.Assert;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import java.time.LocalDateTime;
/**
* @author xiaowoniu
* @date 2024-01-11 22:10:25
* @since 2.6.0
*/
public final class LockBuilder {
private String lockName;
private boolean resident;
public static LockBuilder newBuilder() {
return new LockBuilder();
}
public LockBuilder withResident(String lockName) {
this.lockName = lockName;
resident = Boolean.TRUE;
return this;
}
public LockBuilder withDisposable(String lockName) {
this.lockName = lockName;
resident = Boolean.FALSE;
return this;
}
public LockProvider build() {
Assert.notBlank(lockName, () -> new EasyRetryServerException("lockName can not be null."));
LockManager.initialize();
LockManager.setCreateDt(LocalDateTime.now());
LockManager.setLockName(lockName);
if (resident) {
return new ResidentLockProvider();
} else {
return new DisposableLockProvider();
}
}
}

View File

@ -0,0 +1,46 @@
package com.aizuda.easy.retry.server.common.lock;
import com.aizuda.easy.retry.server.common.dto.LockConfig;
import java.time.Duration;
import java.time.LocalDateTime;
/**
* @author xiaowoniu
* @date 2024-01-11 22:06:02
* @since 2.6.0
*/
public final class LockManager {
private static final ThreadLocal<LockConfig> LOCK_CONFIG = new ThreadLocal<>();
public static LockConfig getLockConfig() {
return LOCK_CONFIG.get();
}
public static void initialize() {
LOCK_CONFIG.set(new LockConfig());
}
public static void clear() {
LOCK_CONFIG.remove();
}
public static void setLockName(String lockName) {
getLockConfig().setLockName(lockName);
}
public static void setLockAtLeast(Duration lockAtLeast) {
getLockConfig().setLockAtLeast(lockAtLeast);
}
public static void setCreateDt(LocalDateTime createDt) {
getLockConfig().setCreateDt(createDt);
}
public static void setLockAtMost(Duration lockAtMost) {
getLockConfig().setLockAtMost(lockAtMost);
}
}

View File

@ -3,6 +3,8 @@ package com.aizuda.easy.retry.server.common.lock;
import com.aizuda.easy.retry.server.common.dto.LockConfig; import com.aizuda.easy.retry.server.common.dto.LockConfig;
import java.time.Duration;
/** /**
* @author www.byteblogs.com * @author www.byteblogs.com
* @date 2023-07-20 22:45:41 * @date 2023-07-20 22:45:41
@ -10,10 +12,10 @@ import com.aizuda.easy.retry.server.common.dto.LockConfig;
*/ */
public interface LockProvider { public interface LockProvider {
boolean supports(String storageMedium); boolean lock(Duration lockAtLeast, Duration lockAtMost);
boolean lock(LockConfig lockConfig); boolean lock(Duration lockAtMost);
boolean unlock(LockConfig lockConfig); boolean unlock();
} }

View File

@ -0,0 +1,37 @@
package com.aizuda.easy.retry.server.common.lock;
import com.aizuda.easy.retry.server.common.dto.LockConfig;
import com.aizuda.easy.retry.server.common.lock.persistence.LockStorage;
import com.aizuda.easy.retry.server.common.lock.persistence.LockStorageFactory;
/**
* @author xiaowoniu
* @date 2024-01-11 21:26:54
* @since 2.6.0
*/
public class ResidentLockProvider extends AbstractLockProvider {
@Override
protected boolean doUnlock(LockConfig lockConfig) {
return doUnlockWithUpdate(lockConfig);
}
protected boolean doUnlockWithUpdate(LockConfig lockConfig) {
LockStorage lockStorage = LockStorageFactory.getLockStorage();
return lockStorage.releaseLockWithUpdate(lockConfig.getLockName(), lockConfig.getLockAtLeast());
}
@Override
protected boolean createLock(LockConfig lockConfig) {
LockStorage lockStorage = LockStorageFactory.getLockStorage();
return lockStorage.createLock(lockConfig);
}
@Override
protected boolean renewal(LockConfig lockConfig) {
LockStorage lockStorage = LockStorageFactory.getLockStorage();
return lockStorage.renewal(lockConfig);
}
}

View File

@ -1,20 +1,17 @@
package com.aizuda.easy.retry.server.common.lock; package com.aizuda.easy.retry.server.common.lock.persistence;
import com.aizuda.easy.retry.common.core.log.LogUtils; import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.common.core.util.JsonUtil;
import com.aizuda.easy.retry.server.common.Lifecycle; import com.aizuda.easy.retry.server.common.Lifecycle;
import com.aizuda.easy.retry.server.common.cache.CacheLockRecord; import com.aizuda.easy.retry.server.common.cache.CacheLockRecord;
import com.aizuda.easy.retry.server.common.config.SystemProperties; import com.aizuda.easy.retry.server.common.config.SystemProperties;
import com.aizuda.easy.retry.server.common.dto.LockConfig; import com.aizuda.easy.retry.server.common.dto.LockConfig;
import com.aizuda.easy.retry.server.common.enums.UnLockOperationEnum;
import com.aizuda.easy.retry.server.common.register.ServerRegister; import com.aizuda.easy.retry.server.common.register.ServerRegister;
import com.aizuda.easy.retry.template.datasource.enums.DbTypeEnum;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.DistributedLockMapper; import com.aizuda.easy.retry.template.datasource.persistence.mapper.DistributedLockMapper;
import com.aizuda.easy.retry.template.datasource.persistence.po.DistributedLock; import com.aizuda.easy.retry.template.datasource.persistence.po.DistributedLock;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.dao.ConcurrencyFailureException; import org.springframework.dao.ConcurrencyFailureException;
import org.springframework.dao.DataIntegrityViolationException; import org.springframework.dao.DataIntegrityViolationException;
import org.springframework.dao.DuplicateKeyException; import org.springframework.dao.DuplicateKeyException;
@ -24,6 +21,8 @@ import org.springframework.stereotype.Component;
import org.springframework.transaction.TransactionSystemException; import org.springframework.transaction.TransactionSystemException;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.util.Arrays;
import java.util.List;
/** /**
* 基于DB实现的分布式锁 * 基于DB实现的分布式锁
@ -35,12 +34,13 @@ import java.time.LocalDateTime;
@Component @Component
@RequiredArgsConstructor @RequiredArgsConstructor
@Slf4j @Slf4j
public class JdbcLockProvider extends AbstractLockProvider implements Lifecycle { public class JdbcLockProvider implements LockStorage, Lifecycle {
private final DistributedLockMapper distributedLockMapper; private final DistributedLockMapper distributedLockMapper;
private final SystemProperties systemProperties;
@Autowired protected static final List<String> ALLOW_DB = Arrays.asList(DbTypeEnum.MYSQL.getDb(),
private SystemProperties systemProperties; DbTypeEnum.MARIADB.getDb(),
DbTypeEnum.POSTGRES.getDb());
@Override @Override
public boolean supports(final String storageMedium) { public boolean supports(final String storageMedium) {
@ -48,34 +48,7 @@ public class JdbcLockProvider extends AbstractLockProvider implements Lifecycle
} }
@Override @Override
public boolean unlock(final LockConfig lockConfig) { public boolean createLock(LockConfig lockConfig) {
LocalDateTime now = LocalDateTime.now();
for (int i = 0; i < 10; i++) {
try {
if (lockConfig.getUnLockOperation() == UnLockOperationEnum.UPDATE) {
DistributedLock distributedLock = new DistributedLock();
distributedLock.setLockedBy(ServerRegister.CURRENT_CID);
LocalDateTime lockAtLeast = lockConfig.getLockAtLeast();
distributedLock.setLockUntil(now.isBefore(lockAtLeast) ? lockAtLeast : now);
return distributedLockMapper.update(distributedLock, new LambdaUpdateWrapper<DistributedLock>()
.eq(DistributedLock::getName, lockConfig.getLockName())) > 0;
} else {
CacheLockRecord.remove(lockConfig.getLockName());
return distributedLockMapper.delete(new LambdaUpdateWrapper<DistributedLock>()
.eq(DistributedLock::getName, lockConfig.getLockName())) > 0;
}
} catch (Exception e) {
LogUtils.error(log, "unlock error. retrying attempt [{}] ", i, e);
}
}
return false;
}
@Override
protected boolean insertRecord(final LockConfig lockConfig) {
try { try {
LocalDateTime now = lockConfig.getCreateDt(); LocalDateTime now = lockConfig.getCreateDt();
DistributedLock distributedLock = new DistributedLock(); DistributedLock distributedLock = new DistributedLock();
@ -92,12 +65,10 @@ public class JdbcLockProvider extends AbstractLockProvider implements Lifecycle
LogUtils.error(log, "Unexpected exception. lockName:[{}]", lockConfig.getLockName(), e); LogUtils.error(log, "Unexpected exception. lockName:[{}]", lockConfig.getLockName(), e);
return false; return false;
} }
} }
@Override @Override
protected boolean updateRecord(final LockConfig lockConfig) { public boolean renewal(LockConfig lockConfig) {
LocalDateTime now = lockConfig.getCreateDt(); LocalDateTime now = lockConfig.getCreateDt();
DistributedLock distributedLock = new DistributedLock(); DistributedLock distributedLock = new DistributedLock();
distributedLock.setLockedBy(ServerRegister.CURRENT_CID); distributedLock.setLockedBy(ServerRegister.CURRENT_CID);
@ -106,17 +77,53 @@ public class JdbcLockProvider extends AbstractLockProvider implements Lifecycle
distributedLock.setName(lockConfig.getLockName()); distributedLock.setName(lockConfig.getLockName());
try { try {
return distributedLockMapper.update(distributedLock, new LambdaUpdateWrapper<DistributedLock>() return distributedLockMapper.update(distributedLock, new LambdaUpdateWrapper<DistributedLock>()
.eq(DistributedLock::getName, lockConfig.getLockName()) .eq(DistributedLock::getName, lockConfig.getLockName())
.le(DistributedLock::getLockUntil, now)) > 0; .le(DistributedLock::getLockUntil, now)) > 0;
} catch (ConcurrencyFailureException | DataIntegrityViolationException | TransactionSystemException | } catch (ConcurrencyFailureException | DataIntegrityViolationException | TransactionSystemException |
UncategorizedSQLException e) { UncategorizedSQLException e) {
return false; return false;
} }
}
@Override
public boolean releaseLockWithDelete(String lockName) {
for (int i = 0; i < 10; i++) {
try {
CacheLockRecord.remove(lockName);
return distributedLockMapper.delete(new LambdaUpdateWrapper<DistributedLock>()
.eq(DistributedLock::getName, lockName)) > 0;
} catch (Exception e) {
LogUtils.error(log, "unlock error. retrying attempt [{}] ", i, e);
}
}
return false;
}
@Override
public boolean releaseLockWithUpdate(String lockName, LocalDateTime lockAtLeast) {
LocalDateTime now = LocalDateTime.now();
for (int i = 0; i < 10; i++) {
try {
DistributedLock distributedLock = new DistributedLock();
distributedLock.setLockedBy(ServerRegister.CURRENT_CID);
distributedLock.setLockUntil(now.isBefore(lockAtLeast) ? lockAtLeast : now);
return distributedLockMapper.update(distributedLock, new LambdaUpdateWrapper<DistributedLock>()
.eq(DistributedLock::getName, lockName)) > 0;
} catch (Exception e) {
LogUtils.error(log, "unlock error. retrying attempt [{}] ", i, e);
}
}
return false;
} }
@Override @Override
public void start() { public void start() {
LockStorageFactory.registerLockStorage(this);
} }
@Override @Override

View File

@ -0,0 +1,49 @@
package com.aizuda.easy.retry.server.common.lock.persistence;
import com.aizuda.easy.retry.server.common.dto.LockConfig;
import java.time.LocalDateTime;
/**
* @author xiaowoniu
* @date 2024-01-11 21:38:52
* @since 2.6.0
*/
public interface LockStorage {
boolean supports(String storageMedium);
/**
* 创建锁记录
*
* @param lockConfig 锁配置
* @return
*/
boolean createLock(LockConfig lockConfig);
/**
* 更新锁记录
*
* @param lockConfig 锁配置
* @return
*/
boolean renewal(LockConfig lockConfig);
/**
* 删除锁记录释放锁
*
* @param lockName 锁名称
* @return
*/
boolean releaseLockWithDelete(String lockName);
/**
* 更新锁定时长释放锁
*
* @param lockName 锁名称
* @param lockAtLeast 最少锁定时长
* @return
*/
boolean releaseLockWithUpdate(String lockName, LocalDateTime lockAtLeast);
}

View File

@ -0,0 +1,31 @@
package com.aizuda.easy.retry.server.common.lock.persistence;
import com.aizuda.easy.retry.common.core.context.SpringContext;
import com.aizuda.easy.retry.server.common.config.SystemProperties;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.common.lock.LockProvider;
import com.google.common.collect.Lists;
import java.util.List;
/**
* @author xiaowoniu
* @date 2024-01-11 22:38:36
* @since 2.6.0
*/
public final class LockStorageFactory {
private static final List<LockStorage> LOCK_STORAGES = Lists.newArrayList();
public static void registerLockStorage(LockStorage lockStorage) {
LOCK_STORAGES.add(lockStorage);
}
public static LockStorage getLockStorage() {
SystemProperties systemProperties = SpringContext.getBeanByType(SystemProperties.class);
return LOCK_STORAGES.stream()
.filter(lockProvider -> lockProvider.supports(systemProperties.getDbType().getDb()))
.findFirst().orElseThrow(() -> new EasyRetryServerException("未找到合适锁处理器"));
}
}

View File

@ -4,9 +4,8 @@ import cn.hutool.core.lang.Assert;
import com.aizuda.easy.retry.common.core.log.LogUtils; import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.server.common.Schedule; import com.aizuda.easy.retry.server.common.Schedule;
import com.aizuda.easy.retry.server.common.config.SystemProperties; import com.aizuda.easy.retry.server.common.config.SystemProperties;
import com.aizuda.easy.retry.server.common.dto.LockConfig;
import com.aizuda.easy.retry.server.common.enums.UnLockOperationEnum;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException; import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.common.lock.LockBuilder;
import com.aizuda.easy.retry.server.common.lock.LockProvider; import com.aizuda.easy.retry.server.common.lock.LockProvider;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
@ -14,7 +13,6 @@ import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.TaskScheduler; import org.springframework.scheduling.TaskScheduler;
import java.time.Duration; import java.time.Duration;
import java.time.LocalDateTime;
import java.util.List; import java.util.List;
/** /**
@ -43,14 +41,12 @@ public abstract class AbstractSchedule implements Schedule {
Assert.notBlank(lockAtLeast, () -> new EasyRetryServerException("lockAtLeast can not be null.")); Assert.notBlank(lockAtLeast, () -> new EasyRetryServerException("lockAtLeast can not be null."));
Assert.notBlank(lockName, () -> new EasyRetryServerException("lockName can not be null.")); Assert.notBlank(lockName, () -> new EasyRetryServerException("lockName can not be null."));
LockConfig lockConfig = new LockConfig(LocalDateTime.now(), lockName, Duration.parse(lockAtMost), LockProvider lockProvider = LockBuilder.newBuilder()
Duration.parse(lockAtLeast), .withResident(lockName)
UnLockOperationEnum.UPDATE); .build();
LockProvider lockProvider = getLockAccess();
boolean lock = false; boolean lock = false;
try { try {
lock = lockProvider.lock(lockConfig); lock = lockProvider.lock(Duration.parse(lockAtLeast), Duration.parse(lockAtMost));
if (lock) { if (lock) {
doExecute(); doExecute();
} }
@ -58,7 +54,7 @@ public abstract class AbstractSchedule implements Schedule {
LogUtils.error(log, this.getClass().getName() + " execute error. lockName:[{}]", lockName, e); LogUtils.error(log, this.getClass().getName() + " execute error. lockName:[{}]", lockName, e);
} finally { } finally {
if (lock) { if (lock) {
lockProvider.unlock(lockConfig); lockProvider.unlock();
} }
} }
@ -72,10 +68,5 @@ public abstract class AbstractSchedule implements Schedule {
protected abstract String lockAtLeast(); protected abstract String lockAtLeast();
private LockProvider getLockAccess() {
return lockProviders.stream()
.filter(lockProvider -> lockProvider.supports(systemProperties.getDbType().getDb()))
.findFirst().orElseThrow(() -> new EasyRetryServerException("未找到合适锁处理器"));
}
} }

View File

@ -35,8 +35,11 @@ import org.springframework.transaction.support.TransactionTemplate;
import java.io.IOException; import java.io.IOException;
import java.text.MessageFormat; import java.text.MessageFormat;
import java.time.Duration;
import java.time.temporal.TemporalAmount;
import java.util.Objects; import java.util.Objects;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.TimeUnit;
/** /**
* @author xiaowoniu * @author xiaowoniu
@ -63,34 +66,34 @@ public abstract class AbstractWorkflowExecutor implements WorkflowExecutor, Init
@Override @Override
@Transactional @Transactional
public void execute(WorkflowExecutorContext context) { public void execute(WorkflowExecutorContext context) {
distributedLockHandler.lockAndProcessAfterUnlockDel( distributedLockHandler.lockWithDisposableAndRetry(
MessageFormat.format(KEY, context.getWorkflowTaskBatchId(), context.getWorkflowNodeId()), "PT5S", () -> {
() -> {
Long total = jobTaskBatchMapper.selectCount(new LambdaQueryWrapper<JobTaskBatch>() Long total = jobTaskBatchMapper.selectCount(new LambdaQueryWrapper<JobTaskBatch>()
.eq(JobTaskBatch::getWorkflowTaskBatchId, context.getWorkflowTaskBatchId()) .eq(JobTaskBatch::getWorkflowTaskBatchId, context.getWorkflowTaskBatchId())
.eq(JobTaskBatch::getWorkflowNodeId, context.getWorkflowNodeId()) .eq(JobTaskBatch::getWorkflowNodeId, context.getWorkflowNodeId())
); );
if (total > 0) { if (total > 0) {
log.warn("任务节点[{}]已被执行,请勿重复执行", context.getWorkflowNodeId()); log.warn("任务节点[{}]已被执行,请勿重复执行", context.getWorkflowNodeId());
return; return;
}
transactionTemplate.execute(new TransactionCallbackWithoutResult() {
@Override
protected void doInTransactionWithoutResult(final TransactionStatus status) {
if (!preValidate(context)) {
return;
}
beforeExecute(context);
doExecute(context);
afterExecute(context);
} }
});
}); transactionTemplate.execute(new TransactionCallbackWithoutResult() {
@Override
protected void doInTransactionWithoutResult(final TransactionStatus status) {
if (!preValidate(context)) {
return;
}
beforeExecute(context);
doExecute(context);
afterExecute(context);
}
});
}, MessageFormat.format(KEY, context.getWorkflowTaskBatchId(), context.getWorkflowNodeId()),
Duration.ofSeconds(5), Duration.ofSeconds(1), 3);
} }

View File

@ -1,20 +1,18 @@
package com.aizuda.easy.retry.server.job.task.support.handler; package com.aizuda.easy.retry.server.job.task.support.handler;
import cn.hutool.core.lang.Assert; import com.aizuda.easy.retry.common.log.EasyRetryLog;
import com.aizuda.easy.retry.common.core.log.LogUtils; import com.aizuda.easy.retry.server.common.lock.LockBuilder;
import com.aizuda.easy.retry.server.common.config.SystemProperties;
import com.aizuda.easy.retry.server.common.dto.LockConfig;
import com.aizuda.easy.retry.server.common.enums.UnLockOperationEnum;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.common.lock.LockProvider; import com.aizuda.easy.retry.server.common.lock.LockProvider;
import com.aizuda.easy.retry.server.job.task.support.LockExecutor; import com.aizuda.easy.retry.server.job.task.support.LockExecutor;
import com.github.rholder.retry.Retryer;
import com.github.rholder.retry.RetryerBuilder;
import com.github.rholder.retry.StopStrategies;
import com.github.rholder.retry.WaitStrategies;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.time.Duration; import java.time.Duration;
import java.time.LocalDateTime; import java.util.concurrent.TimeUnit;
import java.util.List;
/** /**
* @author: xiaowoniu * @author: xiaowoniu
@ -25,68 +23,33 @@ import java.util.List;
@Slf4j @Slf4j
public class DistributedLockHandler { public class DistributedLockHandler {
@Autowired public void lockWithDisposableAndRetry(LockExecutor lockExecutor,
private SystemProperties systemProperties; String lockName, Duration lockAtMost,
@Autowired Duration sleepTime, Integer maxRetryTimes) {
private List<LockProvider> lockProviders; LockProvider lockProvider = LockBuilder.newBuilder()
.withDisposable(lockName)
.build();
public boolean tryLock(String lockName, String lockAtMost) { Retryer<Boolean> retryer = RetryerBuilder.<Boolean>newBuilder()
.retryIfResult(result -> false)
.withWaitStrategy(WaitStrategies.fixedWait(sleepTime.toMillis(), TimeUnit.MILLISECONDS))
.withStopStrategy(StopStrategies.stopAfterAttempt(maxRetryTimes))
.build();
Assert.notBlank(lockAtMost, () -> new EasyRetryServerException("lockAtLeast can not be null."));
Assert.notBlank(lockName, () -> new EasyRetryServerException("lockName can not be null."));
LockConfig lockConfig = new LockConfig(LocalDateTime.now(), lockName, Duration.parse(lockAtMost),
Duration.ofMillis(1),
UnLockOperationEnum.UPDATE);
LockProvider lockProvider = getLockAccess();
boolean lock = false; boolean lock = false;
try { try {
lock = lockProvider.lock(lockConfig); lock = retryer.call(() -> lockProvider.lock(lockAtMost));
if (lock) {
lockExecutor.execute();
}
} catch (Exception e) { } catch (Exception e) {
LogUtils.error(log, this.getClass().getName() + " execute error. lockName:[{}]", lockName, e); EasyRetryLog.LOCAL.error("lock execute error. lockName:[{}]", lockName, e);
} finally {
if (lock) {
lockProvider.unlock();
}
} }
return lock;
}
public boolean unlockAndDel(String lockName) {
Assert.notBlank(lockName, () -> new EasyRetryServerException("lockName can not be null."));
LockConfig lockConfig = new LockConfig(LocalDateTime.now(),
lockName,
Duration.ofSeconds(1), Duration.ofSeconds(0),
UnLockOperationEnum.DELETE);
LockProvider lockProvider = getLockAccess();
boolean lock = false;
try {
lockProvider.unlock(lockConfig);
} catch (Exception e) {
LogUtils.error(log, this.getClass().getName() + " execute error. lockName:[{}]", lockName, e);
}
return lock;
}
public boolean unlockAndUpdate(String lockName, String lockAtLeast) {
Assert.notBlank(lockAtLeast, () -> new EasyRetryServerException("lockAtLeast can not be null."));
Assert.notBlank(lockName, () -> new EasyRetryServerException("lockName can not be null."));
LockConfig lockConfig = new LockConfig(LocalDateTime.now(), lockName, Duration.ofSeconds(0),
Duration.parse(lockAtLeast),
UnLockOperationEnum.UPDATE);
LockProvider lockProvider = getLockAccess();
boolean lock = false;
try {
lock = lockProvider.unlock(lockConfig);
} catch (Exception e) {
LogUtils.error(log, this.getClass().getName() + " execute error. lockName:[{}]", lockName, e);
}
return lock;
} }
/** /**
@ -96,54 +59,25 @@ public class DistributedLockHandler {
* @param lockAtMost * @param lockAtMost
* @param lockExecutor * @param lockExecutor
*/ */
public void lockAndProcessAfterUnlockDel(String lockName, String lockAtMost, LockExecutor lockExecutor) { public void lockWithDisposable(String lockName, Duration lockAtMost, LockExecutor lockExecutor) {
LockConfig lockConfig = new LockConfig(LocalDateTime.now(), lockName, Duration.parse(lockAtMost),
Duration.ofMillis(1), LockProvider lockProvider = LockBuilder.newBuilder()
UnLockOperationEnum.DELETE); .withDisposable(lockName)
.build();
LockProvider lockProvider = getLockAccess();
boolean lock = false; boolean lock = false;
try { try {
lock = lockProvider.lock(lockConfig); lock = lockProvider.lock(lockAtMost);
if (lock) { if (lock) {
lockExecutor.execute(); lockExecutor.execute();
} }
} catch (Exception e) { } catch (Exception e) {
LogUtils.error(log, this.getClass().getName() + " execute error. lockName:[{}]", lockName, e); EasyRetryLog.LOCAL.error("lock execute error. lockName:[{}]", lockName, e);
} finally { } finally {
if (lock) { if (lock) {
lockProvider.unlock(lockConfig); lockProvider.unlock();
} }
} }
}
public void lockAndProcessAfterUnlockUpdate(String lockName, String lockAtMost, String lockAtLeast,
LockExecutor lockExecutor) {
LockConfig lockConfig = new LockConfig(LocalDateTime.now(), lockName, Duration.parse(lockAtMost),
Duration.parse(lockAtLeast),
UnLockOperationEnum.UPDATE);
LockProvider lockProvider = getLockAccess();
boolean lock = false;
try {
lock = lockProvider.lock(lockConfig);
if (lock) {
lockExecutor.execute();
}
} catch (Exception e) {
LogUtils.error(log, this.getClass().getName() + " execute error. lockName:[{}]", lockName, e);
} finally {
if (lock) {
lockProvider.unlock(lockConfig);
}
}
}
private LockProvider getLockAccess() {
return lockProviders.stream()
.filter(lockProvider -> lockProvider.supports(systemProperties.getDbType().getDb()))
.findFirst().orElseThrow(() -> new EasyRetryServerException("未找到合适锁处理器"));
} }
} }

View File

@ -4,22 +4,20 @@ import akka.actor.ActorRef;
import cn.hutool.core.lang.Assert; import cn.hutool.core.lang.Assert;
import cn.hutool.core.net.url.UrlQuery; import cn.hutool.core.net.url.UrlQuery;
import com.aizuda.easy.retry.common.core.enums.StatusEnum; import com.aizuda.easy.retry.common.core.enums.StatusEnum;
import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.common.core.log.TaskLogFieldDTO; import com.aizuda.easy.retry.common.core.log.TaskLogFieldDTO;
import com.aizuda.easy.retry.common.core.model.EasyRetryRequest; import com.aizuda.easy.retry.common.core.model.EasyRetryRequest;
import com.aizuda.easy.retry.common.core.model.NettyResult; import com.aizuda.easy.retry.common.core.model.NettyResult;
import com.aizuda.easy.retry.common.core.util.JsonUtil; import com.aizuda.easy.retry.common.core.util.JsonUtil;
import com.aizuda.easy.retry.common.log.EasyRetryLog;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator; import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException; import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.common.handler.PostHttpRequestHandler; import com.aizuda.easy.retry.server.common.handler.PostHttpRequestHandler;
import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter; import com.aizuda.easy.retry.server.job.task.support.JobTaskConverter;
import com.aizuda.easy.retry.server.model.dto.LogTaskDTO; import com.aizuda.easy.retry.server.model.dto.LogTaskDTO;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobLogMessageMapper;
import com.aizuda.easy.retry.template.datasource.persistence.po.JobLogMessage; import com.aizuda.easy.retry.template.datasource.persistence.po.JobLogMessage;
import io.netty.handler.codec.http.HttpHeaders; import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpMethod; import io.netty.handler.codec.http.HttpMethod;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.annotation.Transactional;
@ -43,9 +41,6 @@ import static com.aizuda.easy.retry.common.core.constant.SystemConstants.HTTP_PA
@Slf4j @Slf4j
public class ReportLogHttpRequestHandler extends PostHttpRequestHandler { public class ReportLogHttpRequestHandler extends PostHttpRequestHandler {
@Autowired
private JobLogMessageMapper jobLogMessageMapper;
@Override @Override
public boolean supports(String path) { public boolean supports(String path) {
return BATCH_LOG_REPORT.equals(path); return BATCH_LOG_REPORT.equals(path);
@ -60,7 +55,7 @@ public class ReportLogHttpRequestHandler extends PostHttpRequestHandler {
@Transactional @Transactional
public String doHandler(String content, UrlQuery urlQuery, HttpHeaders headers) { public String doHandler(String content, UrlQuery urlQuery, HttpHeaders headers) {
LogUtils.info(log, "Begin Handler Log Report Data. <|>{}<|>", content); EasyRetryLog.LOCAL.info("Begin Handler Log Report Data. [{}]", content);
EasyRetryRequest retryRequest = JsonUtil.parseObject(content, EasyRetryRequest.class); EasyRetryRequest retryRequest = JsonUtil.parseObject(content, EasyRetryRequest.class);
Object[] args = retryRequest.getArgs(); Object[] args = retryRequest.getArgs();