diff --git a/doc/sql/snail_job_mysql.sql b/doc/sql/snail_job_mysql.sql index 940156313..9b8a28f80 100644 --- a/doc/sql/snail_job_mysql.sql +++ b/doc/sql/snail_job_mysql.sql @@ -235,15 +235,13 @@ CREATE TABLE `sj_server_node` CREATE TABLE `sj_distributed_lock` ( - `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键', `name` varchar(64) NOT NULL COMMENT '锁名称', `lock_until` timestamp(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3) ON UPDATE CURRENT_TIMESTAMP(3) COMMENT '锁定时长', `locked_at` timestamp(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3) COMMENT '锁定时间', `locked_by` varchar(255) NOT NULL COMMENT '锁定者', `create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', `update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '修改时间', - PRIMARY KEY (`id`), - UNIQUE KEY `uk_name` (`name`) + PRIMARY KEY (`name`) ) ENGINE = InnoDB AUTO_INCREMENT = 0 DEFAULT CHARSET = utf8mb4 COMMENT ='锁定表' diff --git a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/dto/JobArgs.java b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/dto/JobArgs.java index e7523b19a..c7160575f 100644 --- a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/dto/JobArgs.java +++ b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/dto/JobArgs.java @@ -2,6 +2,7 @@ package com.aizuda.snailjob.client.job.core.dto; import cn.hutool.core.util.StrUtil; import lombok.Data; +import lombok.Getter; import java.util.Map; import java.util.Objects; @@ -22,12 +23,22 @@ public class JobArgs { private Map wfContext; + private Map changeWfContext; + public void appendContext(String key, Object value) { if (Objects.isNull(wfContext) || StrUtil.isBlank(key) || Objects.isNull(value)) { return; } - wfContext.put(key, value); + changeWfContext.put(key, value); + } + + public Object getWfContext(String key) { + if (Objects.isNull(wfContext) || StrUtil.isBlank(key)) { + return null; + } + + return wfContext.get(key); } } diff --git a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AbstractJobExecutor.java b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AbstractJobExecutor.java index d1e281613..8056b1bea 100644 --- a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AbstractJobExecutor.java +++ b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/AbstractJobExecutor.java @@ -13,15 +13,16 @@ import com.aizuda.snailjob.client.model.ExecuteResult; import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum; import com.aizuda.snailjob.common.core.enums.MapReduceStageEnum; import com.aizuda.snailjob.common.core.model.JobContext; -import com.aizuda.snailjob.common.core.util.JsonUtil; import com.aizuda.snailjob.common.log.enums.LogTypeEnum; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import lombok.extern.slf4j.Slf4j; +import java.util.Map; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -45,6 +46,8 @@ public abstract class AbstractJobExecutor implements IJobExecutor { // 将任务添加到时间轮中,到期停止任务 TimerManager.add(new StopTaskTimerTask(jobContext.getTaskBatchId()), jobContext.getExecutorTimeout(), TimeUnit.SECONDS); + Map changeWfContext = Maps.newConcurrentMap(); + // 执行任务 ListenableFuture submit = decorator.submit(() -> { JobArgs jobArgs; @@ -63,6 +66,7 @@ public abstract class AbstractJobExecutor implements IJobExecutor { } jobArgs.setWfContext(jobContext.getWfContext()); + jobArgs.setChangeWfContext(changeWfContext); try { // 初始化调度信息(日志上报LogUtil) @@ -76,7 +80,7 @@ public abstract class AbstractJobExecutor implements IJobExecutor { }); FutureCache.addFuture(jobContext.getTaskBatchId(), submit); - Futures.addCallback(submit, new JobExecutorFutureCallback(jobContext), decorator); + Futures.addCallback(submit, new JobExecutorFutureCallback(jobContext, changeWfContext), decorator); } private void initLogContext(JobContext jobContext) { diff --git a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/JobExecutorFutureCallback.java b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/JobExecutorFutureCallback.java index 1d0641df5..9ce04801a 100644 --- a/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/JobExecutorFutureCallback.java +++ b/snail-job-client/snail-job-client-job-core/src/main/java/com/aizuda/snailjob/client/job/core/executor/JobExecutorFutureCallback.java @@ -66,9 +66,11 @@ public class JobExecutorFutureCallback implements FutureCallback }).build(); private final JobContext jobContext; + private final Map changeWfContext; - public JobExecutorFutureCallback(final JobContext jobContext) { + public JobExecutorFutureCallback(final JobContext jobContext, Map changeWfContext) { this.jobContext = jobContext; + this.changeWfContext = changeWfContext; } @Override @@ -163,9 +165,8 @@ public class JobExecutorFutureCallback implements FutureCallback dispatchJobRequest.setRetry(jobContext.isRetry()); dispatchJobRequest.setRetryScene(jobContext.getRetryScene()); // 传递上下文 - Map wfContext = jobContext.getWfContext(); - if (CollUtil.isNotEmpty(wfContext)) { - dispatchJobRequest.setWfContext(JsonUtil.toJsonString(wfContext)); + if (CollUtil.isNotEmpty(changeWfContext)) { + dispatchJobRequest.setWfContext(JsonUtil.toJsonString(changeWfContext)); } return dispatchJobRequest; diff --git a/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/po/DistributedLock.java b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/po/DistributedLock.java index da02f398a..b34aaa1ad 100644 --- a/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/po/DistributedLock.java +++ b/snail-job-datasource/snail-job-datasource-template/src/main/java/com/aizuda/snailjob/template/datasource/persistence/po/DistributedLock.java @@ -17,15 +17,10 @@ import java.time.LocalDateTime; @TableName("sj_distributed_lock") public class DistributedLock extends CreateUpdateDt { - /** - * 主键 - */ - @TableId(value = "id", type = IdType.AUTO) - private Long id; - /** * 锁名称 */ + @TableId(value = "name") private String name; /** diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/WorkflowExecutorActor.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/WorkflowExecutorActor.java index fdb2dbb7a..ab27869de 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/WorkflowExecutorActor.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/dispatch/WorkflowExecutorActor.java @@ -168,6 +168,7 @@ public class WorkflowExecutorActor extends AbstractActor { Map jobMap = StreamUtils.toIdentityMap(jobs, Job::getId); // TODO 合并job task的结果到全局上下文中 + // 此次的并发数与当时父节点的兄弟节点的数量一致 workflowBatchHandler.mergeWorkflowContextAndRetry(workflowTaskBatch, StreamUtils.toSet(allJobTaskBatchList, JobTaskBatch::getId)); diff --git a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/handler/WorkflowBatchHandler.java b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/handler/WorkflowBatchHandler.java index 3d2de4765..546304513 100644 --- a/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/handler/WorkflowBatchHandler.java +++ b/snail-job-server/snail-job-server-job-task/src/main/java/com/aizuda/snailjob/server/job/task/support/handler/WorkflowBatchHandler.java @@ -324,7 +324,7 @@ public class WorkflowBatchHandler { * 合并工作流上下文若合并失败先自旋3次1.5s, 若失败了升级到悲观锁 * * @param workflowTaskBatch 工作流批次 - * @param taskBatchIds 批次列表 + * @param taskBatchIds 批次列表 */ public void mergeWorkflowContextAndRetry(WorkflowTaskBatch workflowTaskBatch, Set taskBatchIds) { if (CollUtil.isEmpty(taskBatchIds)) { @@ -358,7 +358,7 @@ public class WorkflowBatchHandler { retryer.call(() -> mergeAllWorkflowContext(workflowTaskBatch, taskBatchIds)); } catch (Exception e) { SnailJobLog.LOCAL.warn("update workflow global context error. workflowTaskBatchId:[{}] taskBatchIds:[{}]", - workflowTaskBatch.getId(), taskBatchIds, e); + workflowTaskBatch.getId(), taskBatchIds, e); if (e.getClass().isAssignableFrom(RetryException.class)) { // 如果自旋失败,就使用悲观锁 distributedLockHandler.lockWithDisposableAndRetry(() -> { @@ -408,8 +408,8 @@ public class WorkflowBatchHandler { waitUpdateWorkflowTaskBatch.setWfContext(JsonUtil.toJsonString(mergeMap)); waitUpdateWorkflowTaskBatch.setVersion(1); return 1 == workflowTaskBatchMapper.update(waitUpdateWorkflowTaskBatch, new LambdaQueryWrapper() - .eq(WorkflowTaskBatch::getId, workflowTaskBatch.getId()) - .eq(WorkflowTaskBatch::getVersion, workflowTaskBatch.getVersion()) + .eq(WorkflowTaskBatch::getId, workflowTaskBatch.getId()) + .eq(WorkflowTaskBatch::getVersion, workflowTaskBatch.getVersion()) ); }