feat(sj_1.0.0): 1、修复号段模式生成ID异常问题 2、 生成snail job启动logo

This commit is contained in:
opensnail 2024-05-06 23:39:21 +08:00
parent 0bc657988a
commit 0e42e379b0
3 changed files with 37 additions and 26 deletions

View File

@ -25,7 +25,6 @@ import java.util.function.Consumer;
* @date : 2023-05-11 21:45 * @date : 2023-05-11 21:45
* @since 1.3.0 * @since 1.3.0
*/ */
@Slf4j
public class RpcClientInvokeHandler<R> implements InvocationHandler { public class RpcClientInvokeHandler<R> implements InvocationHandler {
private final Consumer<R> consumer; private final Consumer<R> consumer;
@ -62,7 +61,7 @@ public class RpcClientInvokeHandler<R> implements InvocationHandler {
sw.stop(); sw.stop();
} }
SnailJobLog.LOCAL.info("request complete requestId:[{}] 耗时:[{}ms]", snailJobRequest.getReqId(), sw.getTotalTimeMillis()); SnailJobLog.LOCAL.debug("request complete requestId:[{}] 耗时:[{}ms]", snailJobRequest.getReqId(), sw.getTotalTimeMillis());
if (async) { if (async) {
return null; return null;
} else { } else {

View File

@ -76,12 +76,15 @@ public interface SystemConstants {
String REPORT_JOB_DISPATCH_RESULT = "/report/dispatch/result"; String REPORT_JOB_DISPATCH_RESULT = "/report/dispatch/result";
} }
String LOGO = " ___ ___ _ \n" + String LOGO = """
" | __|__ _ ____ _ | _ \\___| |_ _ _ _ _ \n" + ______ _ __ _____ __ \s
" | _|/ _` (_-< || | | / -_) _| '_| || |\n" + .' ____ \\ (_)[ | |_ _| [ | \s
" |___\\__,_/__/\\_, | |_|_\\___|\\__|_| \\_, |\n" + | (___ \\_| _ .--. ,--. __ | | | | .--. | |.--. \s
" |__/ |__/ \n" + _.____`. [ `.-. |`'_\\ :[ | | | _ | / .'`\\ \\| '/'`\\ \\\s
" :: Snail Job :: (v{}) \n"; | \\____) | | | | |// | |,| | | | | |__' | \\__. || \\__/ |\s
\\______.'[___||__]'-;__[___|___] `.____.''.__.'[__;.__.' \s
:: Snail Job :: (v{}) \s
""";
/** /**
* 调度时长 * 调度时长

View File

@ -29,7 +29,7 @@ import java.util.stream.Collectors;
* 特别声明: 此算法来自美团的leaf号段模式 * 特别声明: 此算法来自美团的leaf号段模式
* see https://github.com/Meituan-Dianping/Leaf/blob/master/leaf-server/src/main/java/com/sankuai/inf/leaf/server/service/SegmentService.java * see https://github.com/Meituan-Dianping/Leaf/blob/master/leaf-server/src/main/java/com/sankuai/inf/leaf/server/service/SegmentService.java
* *
* @author opensnail * @author opensnail
* @date 2023-05-04 * @date 2023-05-04
* @since 1.2.0 * @since 1.2.0
*/ */
@ -69,17 +69,17 @@ public class SegmentIdGenerator implements IdGenerator, Lifecycle {
@Override @Override
public void start() { public void start() {
SnailJobLog.LOCAL.info("SegmentIdGenerator start"); SnailJobLog.LOCAL.info("SegmentIdGenerator start");
// 确保加载到kv后才初始化成功 // 确保加载到kv后才初始化成功
updateCacheFromDb(); updateCacheFromDb();
initOK = true; initOK = true;
updateCacheFromDbAtEveryMinute(); updateCacheFromDbAtEveryMinute();
SnailJobLog.LOCAL.info("SegmentIdGenerator start end"); SnailJobLog.LOCAL.info("SegmentIdGenerator start end");
} }
@Override @Override
public void close() { public void close() {
SnailJobLog.LOCAL.info("SegmentIdGenerator close"); SnailJobLog.LOCAL.info("SegmentIdGenerator close");
} }
public static class UpdateThreadFactory implements ThreadFactory { public static class UpdateThreadFactory implements ThreadFactory {
@ -109,7 +109,8 @@ public class SegmentIdGenerator implements IdGenerator, Lifecycle {
private void updateCacheFromDb() { private void updateCacheFromDb() {
try { try {
List<SequenceAlloc> sequenceAllocs = sequenceAllocMapper List<SequenceAlloc> sequenceAllocs = sequenceAllocMapper
.selectList(new LambdaQueryWrapper<SequenceAlloc>().select(SequenceAlloc::getGroupName)); .selectList(new LambdaQueryWrapper<SequenceAlloc>()
.select(SequenceAlloc::getGroupName, SequenceAlloc::getNamespaceId));
if (CollectionUtils.isEmpty(sequenceAllocs)) { if (CollectionUtils.isEmpty(sequenceAllocs)) {
return; return;
} }
@ -136,7 +137,7 @@ public class SegmentIdGenerator implements IdGenerator, Lifecycle {
segment.setMax(0); segment.setMax(0);
segment.setStep(0); segment.setStep(0);
cache.put(tag, buffer); cache.put(tag, buffer);
SnailJobLog.LOCAL.debug("Add tag {} from db to IdCache, SegmentBuffer {}", tag, buffer); SnailJobLog.LOCAL.debug("Add tag {} from db to IdCache, SegmentBuffer {}", tag, buffer);
} }
//cache中已失效的tags从cache删除 //cache中已失效的tags从cache删除
for (int i = 0; i < dbTags.size(); i++) { for (int i = 0; i < dbTags.size(); i++) {
@ -147,10 +148,10 @@ public class SegmentIdGenerator implements IdGenerator, Lifecycle {
} }
for (Pair<String, String> tag : removeTagsSet) { for (Pair<String, String> tag : removeTagsSet) {
cache.remove(tag); cache.remove(tag);
SnailJobLog.LOCAL.debug("Remove tag {} from IdCache", tag); SnailJobLog.LOCAL.debug("Remove tag {} from IdCache", tag);
} }
} catch (Exception e) { } catch (Exception e) {
SnailJobLog.LOCAL.error("update cache from db exception", e); SnailJobLog.LOCAL.error("update cache from db exception", e);
} }
} }
@ -170,7 +171,7 @@ public class SegmentIdGenerator implements IdGenerator, Lifecycle {
SnailJobLog.LOCAL.debug("Init buffer. Update key {} {} from db", key, buffer.getCurrent()); SnailJobLog.LOCAL.debug("Init buffer. Update key {} {} from db", key, buffer.getCurrent());
buffer.setInitOk(true); buffer.setInitOk(true);
} catch (Exception e) { } catch (Exception e) {
SnailJobLog.LOCAL.error("Init buffer {} exception", buffer.getCurrent(), e); SnailJobLog.LOCAL.error("Init buffer {} exception", buffer.getCurrent(), e);
} }
} }
} }
@ -184,18 +185,24 @@ public class SegmentIdGenerator implements IdGenerator, Lifecycle {
SegmentBuffer buffer = segment.getBuffer(); SegmentBuffer buffer = segment.getBuffer();
SequenceAlloc sequenceAlloc; SequenceAlloc sequenceAlloc;
LambdaUpdateWrapper<SequenceAlloc> wrapper = new LambdaUpdateWrapper<SequenceAlloc>() LambdaUpdateWrapper<SequenceAlloc> wrapper = new LambdaUpdateWrapper<SequenceAlloc>()
.set(SequenceAlloc::getMaxId, "max_id + step") .setSql("max_id = max_id + step")
.set(SequenceAlloc::getUpdateDt, new Date()) .set(SequenceAlloc::getUpdateDt, new Date())
.eq(SequenceAlloc::getGroupName, key.getKey()) .eq(SequenceAlloc::getGroupName, key.getKey())
.eq(SequenceAlloc::getNamespaceId, key.getValue()); .eq(SequenceAlloc::getNamespaceId, key.getValue());
if (!buffer.isInitOk()) { if (!buffer.isInitOk()) {
sequenceAllocMapper.update(wrapper); sequenceAllocMapper.update(wrapper);
sequenceAlloc = sequenceAllocMapper.selectOne(new LambdaQueryWrapper<SequenceAlloc>().eq(SequenceAlloc::getGroupName, key)); sequenceAlloc = sequenceAllocMapper.selectOne(new LambdaQueryWrapper<SequenceAlloc>()
.eq(SequenceAlloc::getGroupName, key.getKey())
.eq(SequenceAlloc::getNamespaceId, key.getValue())
);
buffer.setStep(sequenceAlloc.getStep()); buffer.setStep(sequenceAlloc.getStep());
buffer.setMinStep(sequenceAlloc.getStep());//leafAlloc中的step为DB中的step buffer.setMinStep(sequenceAlloc.getStep());//leafAlloc中的step为DB中的step
} else if (buffer.getUpdateTimestamp() == 0) { } else if (buffer.getUpdateTimestamp() == 0) {
sequenceAllocMapper.update(wrapper); sequenceAllocMapper.update(wrapper);
sequenceAlloc = sequenceAllocMapper.selectOne(new LambdaQueryWrapper<SequenceAlloc>().eq(SequenceAlloc::getGroupName, key)); sequenceAlloc = sequenceAllocMapper.selectOne(new LambdaQueryWrapper<SequenceAlloc>()
.eq(SequenceAlloc::getGroupName, key.getKey())
.eq(SequenceAlloc::getNamespaceId, key.getValue())
);
buffer.setUpdateTimestamp(System.currentTimeMillis()); buffer.setUpdateTimestamp(System.currentTimeMillis());
buffer.setStep(sequenceAlloc.getStep()); buffer.setStep(sequenceAlloc.getStep());
buffer.setMinStep(sequenceAlloc.getStep());//leafAlloc中的step为DB中的step buffer.setMinStep(sequenceAlloc.getStep());//leafAlloc中的step为DB中的step
@ -215,13 +222,15 @@ public class SegmentIdGenerator implements IdGenerator, Lifecycle {
} }
SnailJobLog.LOCAL.debug("leafKey[{}], step[{}], duration[{}mins], nextStep[{}]", key, buffer.getStep(), String.format("%.2f", ((double) duration / (1000 * 60))), nextStep); SnailJobLog.LOCAL.debug("leafKey[{}], step[{}], duration[{}mins], nextStep[{}]", key, buffer.getStep(), String.format("%.2f", ((double) duration / (1000 * 60))), nextStep);
LambdaUpdateWrapper<SequenceAlloc> wrapper1 = new LambdaUpdateWrapper<SequenceAlloc>() LambdaUpdateWrapper<SequenceAlloc> wrapper1 = new LambdaUpdateWrapper<SequenceAlloc>()
.set(SequenceAlloc::getMaxId, "max_id + " + nextStep) .setSql("max_id = max_id + " + nextStep)
.set(SequenceAlloc::getUpdateDt, new Date()) .set(SequenceAlloc::getUpdateDt, new Date())
.eq(SequenceAlloc::getGroupName, key.getKey()) .eq(SequenceAlloc::getGroupName, key.getKey())
.eq(SequenceAlloc::getNamespaceId, key.getValue()); .eq(SequenceAlloc::getNamespaceId, key.getValue());
sequenceAllocMapper.update(wrapper1); sequenceAllocMapper.update(wrapper1);
sequenceAlloc = sequenceAllocMapper sequenceAlloc = sequenceAllocMapper.selectOne(new LambdaQueryWrapper<SequenceAlloc>()
.selectOne(new LambdaQueryWrapper<SequenceAlloc>().eq(SequenceAlloc::getGroupName, key)); .eq(SequenceAlloc::getGroupName, key.getKey())
.eq(SequenceAlloc::getNamespaceId, key.getValue())
);
buffer.setUpdateTimestamp(System.currentTimeMillis()); buffer.setUpdateTimestamp(System.currentTimeMillis());
buffer.setStep(nextStep); buffer.setStep(nextStep);
buffer.setMinStep(sequenceAlloc.getStep());//leafAlloc的step为DB中的step buffer.setMinStep(sequenceAlloc.getStep());//leafAlloc的step为DB中的step
@ -245,9 +254,9 @@ public class SegmentIdGenerator implements IdGenerator, Lifecycle {
try { try {
updateSegmentFromDb(buffer.getKey(), next); updateSegmentFromDb(buffer.getKey(), next);
updateOk = true; updateOk = true;
SnailJobLog.LOCAL.debug("update segment {} from db {}", buffer.getKey(), next); SnailJobLog.LOCAL.debug("update segment {} from db {}", buffer.getKey(), next);
} catch (Exception e) { } catch (Exception e) {
SnailJobLog.LOCAL.warn(buffer.getKey() + " updateSegmentFromDb exception", e); SnailJobLog.LOCAL.warn(buffer.getKey() + " updateSegmentFromDb exception", e);
} finally { } finally {
if (updateOk) { if (updateOk) {
buffer.wLock().lock(); buffer.wLock().lock();
@ -297,7 +306,7 @@ public class SegmentIdGenerator implements IdGenerator, Lifecycle {
TimeUnit.MILLISECONDS.sleep(20); TimeUnit.MILLISECONDS.sleep(20);
break; break;
} catch (InterruptedException e) { } catch (InterruptedException e) {
SnailJobLog.LOCAL.warn("Thread {} Interrupted", Thread.currentThread().getName()); SnailJobLog.LOCAL.warn("Thread {} Interrupted", Thread.currentThread().getName());
break; break;
} }
} }