feat(sj_1.1.0): 优化客户端传递上下文
1. 客户端上报的上下文只上报add的值,不上报全量的值 2. sj_distributed_lock 分布式锁表去掉自增主键
This commit is contained in:
parent
aeaa73776f
commit
983f6006ec
@ -235,15 +235,13 @@ CREATE TABLE `sj_server_node`
|
||||
|
||||
CREATE TABLE `sj_distributed_lock`
|
||||
(
|
||||
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
|
||||
`name` varchar(64) NOT NULL COMMENT '锁名称',
|
||||
`lock_until` timestamp(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3) ON UPDATE CURRENT_TIMESTAMP(3) COMMENT '锁定时长',
|
||||
`locked_at` timestamp(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3) COMMENT '锁定时间',
|
||||
`locked_by` varchar(255) NOT NULL COMMENT '锁定者',
|
||||
`create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
|
||||
`update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '修改时间',
|
||||
PRIMARY KEY (`id`),
|
||||
UNIQUE KEY `uk_name` (`name`)
|
||||
PRIMARY KEY (`name`)
|
||||
) ENGINE = InnoDB
|
||||
AUTO_INCREMENT = 0
|
||||
DEFAULT CHARSET = utf8mb4 COMMENT ='锁定表'
|
||||
|
@ -2,6 +2,7 @@ package com.aizuda.snailjob.client.job.core.dto;
|
||||
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import lombok.Data;
|
||||
import lombok.Getter;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
@ -22,12 +23,22 @@ public class JobArgs {
|
||||
|
||||
private Map<String, Object> wfContext;
|
||||
|
||||
private Map<String, Object> changeWfContext;
|
||||
|
||||
public void appendContext(String key, Object value) {
|
||||
if (Objects.isNull(wfContext) || StrUtil.isBlank(key) || Objects.isNull(value)) {
|
||||
return;
|
||||
}
|
||||
|
||||
wfContext.put(key, value);
|
||||
changeWfContext.put(key, value);
|
||||
}
|
||||
|
||||
public Object getWfContext(String key) {
|
||||
if (Objects.isNull(wfContext) || StrUtil.isBlank(key)) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return wfContext.get(key);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -13,15 +13,16 @@ import com.aizuda.snailjob.client.model.ExecuteResult;
|
||||
import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum;
|
||||
import com.aizuda.snailjob.common.core.enums.MapReduceStageEnum;
|
||||
import com.aizuda.snailjob.common.core.model.JobContext;
|
||||
import com.aizuda.snailjob.common.core.util.JsonUtil;
|
||||
import com.aizuda.snailjob.common.log.enums.LogTypeEnum;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.util.concurrent.Futures;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import com.google.common.util.concurrent.ListeningExecutorService;
|
||||
import com.google.common.util.concurrent.MoreExecutors;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
@ -45,6 +46,8 @@ public abstract class AbstractJobExecutor implements IJobExecutor {
|
||||
// 将任务添加到时间轮中,到期停止任务
|
||||
TimerManager.add(new StopTaskTimerTask(jobContext.getTaskBatchId()), jobContext.getExecutorTimeout(), TimeUnit.SECONDS);
|
||||
|
||||
Map<String, Object> changeWfContext = Maps.newConcurrentMap();
|
||||
|
||||
// 执行任务
|
||||
ListenableFuture<ExecuteResult> submit = decorator.submit(() -> {
|
||||
JobArgs jobArgs;
|
||||
@ -63,6 +66,7 @@ public abstract class AbstractJobExecutor implements IJobExecutor {
|
||||
}
|
||||
|
||||
jobArgs.setWfContext(jobContext.getWfContext());
|
||||
jobArgs.setChangeWfContext(changeWfContext);
|
||||
|
||||
try {
|
||||
// 初始化调度信息(日志上报LogUtil)
|
||||
@ -76,7 +80,7 @@ public abstract class AbstractJobExecutor implements IJobExecutor {
|
||||
});
|
||||
|
||||
FutureCache.addFuture(jobContext.getTaskBatchId(), submit);
|
||||
Futures.addCallback(submit, new JobExecutorFutureCallback(jobContext), decorator);
|
||||
Futures.addCallback(submit, new JobExecutorFutureCallback(jobContext, changeWfContext), decorator);
|
||||
}
|
||||
|
||||
private void initLogContext(JobContext jobContext) {
|
||||
|
@ -66,9 +66,11 @@ public class JobExecutorFutureCallback implements FutureCallback<ExecuteResult>
|
||||
}).build();
|
||||
|
||||
private final JobContext jobContext;
|
||||
private final Map<String, Object> changeWfContext;
|
||||
|
||||
public JobExecutorFutureCallback(final JobContext jobContext) {
|
||||
public JobExecutorFutureCallback(final JobContext jobContext, Map<String, Object> changeWfContext) {
|
||||
this.jobContext = jobContext;
|
||||
this.changeWfContext = changeWfContext;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -163,9 +165,8 @@ public class JobExecutorFutureCallback implements FutureCallback<ExecuteResult>
|
||||
dispatchJobRequest.setRetry(jobContext.isRetry());
|
||||
dispatchJobRequest.setRetryScene(jobContext.getRetryScene());
|
||||
// 传递上下文
|
||||
Map<String, Object> wfContext = jobContext.getWfContext();
|
||||
if (CollUtil.isNotEmpty(wfContext)) {
|
||||
dispatchJobRequest.setWfContext(JsonUtil.toJsonString(wfContext));
|
||||
if (CollUtil.isNotEmpty(changeWfContext)) {
|
||||
dispatchJobRequest.setWfContext(JsonUtil.toJsonString(changeWfContext));
|
||||
}
|
||||
|
||||
return dispatchJobRequest;
|
||||
|
@ -17,15 +17,10 @@ import java.time.LocalDateTime;
|
||||
@TableName("sj_distributed_lock")
|
||||
public class DistributedLock extends CreateUpdateDt {
|
||||
|
||||
/**
|
||||
* 主键
|
||||
*/
|
||||
@TableId(value = "id", type = IdType.AUTO)
|
||||
private Long id;
|
||||
|
||||
/**
|
||||
* 锁名称
|
||||
*/
|
||||
@TableId(value = "name")
|
||||
private String name;
|
||||
|
||||
/**
|
||||
|
@ -168,6 +168,7 @@ public class WorkflowExecutorActor extends AbstractActor {
|
||||
Map<Long, Job> jobMap = StreamUtils.toIdentityMap(jobs, Job::getId);
|
||||
|
||||
// TODO 合并job task的结果到全局上下文中
|
||||
// 此次的并发数与当时父节点的兄弟节点的数量一致
|
||||
workflowBatchHandler.mergeWorkflowContextAndRetry(workflowTaskBatch,
|
||||
StreamUtils.toSet(allJobTaskBatchList, JobTaskBatch::getId));
|
||||
|
||||
|
@ -324,7 +324,7 @@ public class WorkflowBatchHandler {
|
||||
* 合并工作流上下文若合并失败先自旋3次1.5s, 若失败了升级到悲观锁
|
||||
*
|
||||
* @param workflowTaskBatch 工作流批次
|
||||
* @param taskBatchIds 批次列表
|
||||
* @param taskBatchIds 批次列表
|
||||
*/
|
||||
public void mergeWorkflowContextAndRetry(WorkflowTaskBatch workflowTaskBatch, Set<Long> taskBatchIds) {
|
||||
if (CollUtil.isEmpty(taskBatchIds)) {
|
||||
@ -358,7 +358,7 @@ public class WorkflowBatchHandler {
|
||||
retryer.call(() -> mergeAllWorkflowContext(workflowTaskBatch, taskBatchIds));
|
||||
} catch (Exception e) {
|
||||
SnailJobLog.LOCAL.warn("update workflow global context error. workflowTaskBatchId:[{}] taskBatchIds:[{}]",
|
||||
workflowTaskBatch.getId(), taskBatchIds, e);
|
||||
workflowTaskBatch.getId(), taskBatchIds, e);
|
||||
if (e.getClass().isAssignableFrom(RetryException.class)) {
|
||||
// 如果自旋失败,就使用悲观锁
|
||||
distributedLockHandler.lockWithDisposableAndRetry(() -> {
|
||||
@ -408,8 +408,8 @@ public class WorkflowBatchHandler {
|
||||
waitUpdateWorkflowTaskBatch.setWfContext(JsonUtil.toJsonString(mergeMap));
|
||||
waitUpdateWorkflowTaskBatch.setVersion(1);
|
||||
return 1 == workflowTaskBatchMapper.update(waitUpdateWorkflowTaskBatch, new LambdaQueryWrapper<WorkflowTaskBatch>()
|
||||
.eq(WorkflowTaskBatch::getId, workflowTaskBatch.getId())
|
||||
.eq(WorkflowTaskBatch::getVersion, workflowTaskBatch.getVersion())
|
||||
.eq(WorkflowTaskBatch::getId, workflowTaskBatch.getId())
|
||||
.eq(WorkflowTaskBatch::getVersion, workflowTaskBatch.getVersion())
|
||||
);
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user