feat(sj_1.1.0): 优化客户端传递上下文

1. 客户端上报的上下文只上报add的值,不上报全量的值
2. sj_distributed_lock 分布式锁表去掉自增主键
This commit is contained in:
opensnail 2024-06-18 23:22:01 +08:00
parent fda554fc46
commit e544f31c4a
7 changed files with 30 additions and 20 deletions

View File

@ -235,15 +235,13 @@ CREATE TABLE `sj_server_node`
CREATE TABLE `sj_distributed_lock`
(
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
`name` varchar(64) NOT NULL COMMENT '锁名称',
`lock_until` timestamp(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3) ON UPDATE CURRENT_TIMESTAMP(3) COMMENT '锁定时长',
`locked_at` timestamp(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3) COMMENT '锁定时间',
`locked_by` varchar(255) NOT NULL COMMENT '锁定者',
`create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '修改时间',
PRIMARY KEY (`id`),
UNIQUE KEY `uk_name` (`name`)
PRIMARY KEY (`name`)
) ENGINE = InnoDB
AUTO_INCREMENT = 0
DEFAULT CHARSET = utf8mb4 COMMENT ='锁定表'

View File

@ -2,6 +2,7 @@ package com.aizuda.snailjob.client.job.core.dto;
import cn.hutool.core.util.StrUtil;
import lombok.Data;
import lombok.Getter;
import java.util.Map;
import java.util.Objects;
@ -22,12 +23,22 @@ public class JobArgs {
private Map<String, Object> wfContext;
private Map<String, Object> changeWfContext;
public void appendContext(String key, Object value) {
if (Objects.isNull(wfContext) || StrUtil.isBlank(key) || Objects.isNull(value)) {
return;
}
wfContext.put(key, value);
changeWfContext.put(key, value);
}
public Object getWfContext(String key) {
if (Objects.isNull(wfContext) || StrUtil.isBlank(key)) {
return null;
}
return wfContext.get(key);
}
}

View File

@ -13,15 +13,16 @@ import com.aizuda.snailjob.client.model.ExecuteResult;
import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum;
import com.aizuda.snailjob.common.core.enums.MapReduceStageEnum;
import com.aizuda.snailjob.common.core.model.JobContext;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.common.log.enums.LogTypeEnum;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import lombok.extern.slf4j.Slf4j;
import java.util.Map;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@ -45,6 +46,8 @@ public abstract class AbstractJobExecutor implements IJobExecutor {
// 将任务添加到时间轮中到期停止任务
TimerManager.add(new StopTaskTimerTask(jobContext.getTaskBatchId()), jobContext.getExecutorTimeout(), TimeUnit.SECONDS);
Map<String, Object> changeWfContext = Maps.newConcurrentMap();
// 执行任务
ListenableFuture<ExecuteResult> submit = decorator.submit(() -> {
JobArgs jobArgs;
@ -63,6 +66,7 @@ public abstract class AbstractJobExecutor implements IJobExecutor {
}
jobArgs.setWfContext(jobContext.getWfContext());
jobArgs.setChangeWfContext(changeWfContext);
try {
// 初始化调度信息日志上报LogUtil
@ -76,7 +80,7 @@ public abstract class AbstractJobExecutor implements IJobExecutor {
});
FutureCache.addFuture(jobContext.getTaskBatchId(), submit);
Futures.addCallback(submit, new JobExecutorFutureCallback(jobContext), decorator);
Futures.addCallback(submit, new JobExecutorFutureCallback(jobContext, changeWfContext), decorator);
}
private void initLogContext(JobContext jobContext) {

View File

@ -66,9 +66,11 @@ public class JobExecutorFutureCallback implements FutureCallback<ExecuteResult>
}).build();
private final JobContext jobContext;
private final Map<String, Object> changeWfContext;
public JobExecutorFutureCallback(final JobContext jobContext) {
public JobExecutorFutureCallback(final JobContext jobContext, Map<String, Object> changeWfContext) {
this.jobContext = jobContext;
this.changeWfContext = changeWfContext;
}
@Override
@ -163,9 +165,8 @@ public class JobExecutorFutureCallback implements FutureCallback<ExecuteResult>
dispatchJobRequest.setRetry(jobContext.isRetry());
dispatchJobRequest.setRetryScene(jobContext.getRetryScene());
// 传递上下文
Map<String, Object> wfContext = jobContext.getWfContext();
if (CollUtil.isNotEmpty(wfContext)) {
dispatchJobRequest.setWfContext(JsonUtil.toJsonString(wfContext));
if (CollUtil.isNotEmpty(changeWfContext)) {
dispatchJobRequest.setWfContext(JsonUtil.toJsonString(changeWfContext));
}
return dispatchJobRequest;

View File

@ -20,15 +20,10 @@ public class DistributedLock implements Serializable {
private static final long serialVersionUID = 1L;
/**
* 主键
*/
@TableId(value = "id", type = IdType.AUTO)
private Long id;
/**
* 锁名称
*/
@TableId(value = "name")
private String name;
/**

View File

@ -168,6 +168,7 @@ public class WorkflowExecutorActor extends AbstractActor {
Map<Long, Job> jobMap = StreamUtils.toIdentityMap(jobs, Job::getId);
// TODO 合并job task的结果到全局上下文中
// 此次的并发数与当时父节点的兄弟节点的数量一致
workflowBatchHandler.mergeWorkflowContextAndRetry(workflowTaskBatch,
StreamUtils.toSet(allJobTaskBatchList, JobTaskBatch::getId));

View File

@ -324,7 +324,7 @@ public class WorkflowBatchHandler {
* 合并工作流上下文若合并失败先自旋3次1.5s, 若失败了升级到悲观锁
*
* @param workflowTaskBatch 工作流批次
* @param taskBatchIds 批次列表
* @param taskBatchIds 批次列表
*/
public void mergeWorkflowContextAndRetry(WorkflowTaskBatch workflowTaskBatch, Set<Long> taskBatchIds) {
if (CollUtil.isEmpty(taskBatchIds)) {
@ -358,7 +358,7 @@ public class WorkflowBatchHandler {
retryer.call(() -> mergeAllWorkflowContext(workflowTaskBatch, taskBatchIds));
} catch (Exception e) {
SnailJobLog.LOCAL.warn("update workflow global context error. workflowTaskBatchId:[{}] taskBatchIds:[{}]",
workflowTaskBatch.getId(), taskBatchIds, e);
workflowTaskBatch.getId(), taskBatchIds, e);
if (e.getClass().isAssignableFrom(RetryException.class)) {
// 如果自旋失败就使用悲观锁
distributedLockHandler.lockWithDisposableAndRetry(() -> {
@ -408,8 +408,8 @@ public class WorkflowBatchHandler {
waitUpdateWorkflowTaskBatch.setWfContext(JsonUtil.toJsonString(mergeMap));
waitUpdateWorkflowTaskBatch.setVersion(1);
return 1 == workflowTaskBatchMapper.update(waitUpdateWorkflowTaskBatch, new LambdaQueryWrapper<WorkflowTaskBatch>()
.eq(WorkflowTaskBatch::getId, workflowTaskBatch.getId())
.eq(WorkflowTaskBatch::getVersion, workflowTaskBatch.getVersion())
.eq(WorkflowTaskBatch::getId, workflowTaskBatch.getId())
.eq(WorkflowTaskBatch::getVersion, workflowTaskBatch.getVersion())
);
}