From 9cf223278a8e3eacbd10dd7075cc49a889694704 Mon Sep 17 00:00:00 2001 From: byteblogs168 <598092184@qq.com> Date: Sat, 30 Mar 2024 18:36:06 +0800 Subject: [PATCH] =?UTF-8?q?feat(3.2.0):=20=E5=AE=8C=E6=88=90inserOrUpdate?= =?UTF-8?q?=E7=9A=84=E6=94=B9=E9=80=A0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1. job的任务统计 2. workflow的任务统计 3. 重试任务统计 4. 客户端注册和续期 --- doc/sql/easy_retry_mysql.sql | 4 +- .../persistence/mapper/JobSummaryMapper.java | 5 +- .../mapper/RetrySummaryMapper.java | 4 +- .../persistence/mapper/ServerNodeMapper.java | 6 +- .../mysql/mapper/JobSummaryMapper.xml | 36 +++++-- .../mysql/mapper/RetrySummaryMapper.xml | 32 ++++-- .../mysql/mapper/ServerNodeMapper.xml | 97 +++++++++++-------- .../common/register/AbstractRegister.java | 35 ++++--- .../support/schedule/JobSummarySchedule.java | 73 +++++++++++--- .../schedule/WorkflowJobSummarySchedule.java | 76 ++++++++++++--- .../schedule/RetrySummarySchedule.java | 59 ++++++++++- 11 files changed, 319 insertions(+), 108 deletions(-) diff --git a/doc/sql/easy_retry_mysql.sql b/doc/sql/easy_retry_mysql.sql index 7f29e1ce8..ec59e8c37 100644 --- a/doc/sql/easy_retry_mysql.sql +++ b/doc/sql/easy_retry_mysql.sql @@ -29,7 +29,7 @@ CREATE TABLE `group_config` `namespace_id` varchar(64) NOT NULL DEFAULT '764d604ec6fc45f68cd92514c40e9e1a' COMMENT '命名空间id', `group_name` varchar(64) NOT NULL DEFAULT '' COMMENT '组名称', `description` varchar(256) NOT NULL DEFAULT '' COMMENT '组描述', - `token` varchar(256) NOT NULL DEFAULT 'ER_cKqBTPzCsWA3VyuCfFoccmuIEGXjr5KT' COMMENT 'token', + `token` varchar(64) NOT NULL DEFAULT 'ER_cKqBTPzCsWA3VyuCfFoccmuIEGXjr5KT' COMMENT 'token', `group_status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '组状态 0、未启用 1、启用', `version` int(11) NOT NULL COMMENT '版本号', `group_partition` int(11) NOT NULL COMMENT '分区', @@ -426,6 +426,7 @@ CREATE TABLE `job_summary` `create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', `update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间', PRIMARY KEY (`id`), + KEY `idx_trigger_at` (`trigger_at`), KEY `idx_namespace_id_group_name_business_id` (`namespace_id`, `group_name`, business_id), UNIQUE KEY `uk_business_id_trigger_at` (`business_id`, `trigger_at`) USING BTREE ) ENGINE = InnoDB @@ -446,6 +447,7 @@ CREATE TABLE `retry_summary` `create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', `update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间', PRIMARY KEY (`id`), + KEY `idx_trigger_at` (`trigger_at`), UNIQUE KEY `uk_scene_name_trigger_at` (`namespace_id`, `group_name`, `scene_name`, `trigger_at`) USING BTREE ) ENGINE = InnoDB AUTO_INCREMENT = 1 diff --git a/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/persistence/mapper/JobSummaryMapper.java b/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/persistence/mapper/JobSummaryMapper.java index bfdb0019a..99744e900 100644 --- a/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/persistence/mapper/JobSummaryMapper.java +++ b/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/persistence/mapper/JobSummaryMapper.java @@ -22,7 +22,10 @@ import java.util.List; @Mapper public interface JobSummaryMapper extends BaseMapper { - int insertOrUpdate(@Param("list") List list); + int batchInsert(@Param("list") List list); + + int batchUpdate(@Param("list") List list); + IPage jobTaskList(@Param("ew") Wrapper wrapper, Page page); diff --git a/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/persistence/mapper/RetrySummaryMapper.java b/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/persistence/mapper/RetrySummaryMapper.java index 87c0f3215..0955a94ae 100644 --- a/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/persistence/mapper/RetrySummaryMapper.java +++ b/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/persistence/mapper/RetrySummaryMapper.java @@ -24,7 +24,9 @@ import java.util.List; @Mapper public interface RetrySummaryMapper extends BaseMapper { - int insertOrUpdate(@Param("list") List list); + int batchInsert(@Param("list") List list); + + int batchUpdate(@Param("list") List list); DashboardCardResponseDO.RetryTask retryTask(@Param("ew") Wrapper wrapper); diff --git a/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/persistence/mapper/ServerNodeMapper.java b/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/persistence/mapper/ServerNodeMapper.java index 4b2b1b302..7b0fcfb48 100644 --- a/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/persistence/mapper/ServerNodeMapper.java +++ b/easy-retry-datasource/easy-retry-datasource-template/src/main/java/com/aizuda/easy/retry/template/datasource/persistence/mapper/ServerNodeMapper.java @@ -14,7 +14,11 @@ import java.util.List; @Mapper public interface ServerNodeMapper extends BaseMapper { - int insertOrUpdate(@Param("records") List records); +// int insertOrUpdate(@Param("records") List records); + + int batchUpdateExpireAt(@Param("list") List list); + + int batchInsert(@Param("records") List records); List countActivePod(@Param("ew") Wrapper wrapper); diff --git a/easy-retry-datasource/easy-retry-mysql-datasource/src/main/resources/mysql/mapper/JobSummaryMapper.xml b/easy-retry-datasource/easy-retry-mysql-datasource/src/main/resources/mysql/mapper/JobSummaryMapper.xml index 5fd845e28..d79a88d19 100644 --- a/easy-retry-datasource/easy-retry-mysql-datasource/src/main/resources/mysql/mapper/JobSummaryMapper.xml +++ b/easy-retry-datasource/easy-retry-mysql-datasource/src/main/resources/mysql/mapper/JobSummaryMapper.xml @@ -19,7 +19,7 @@ - + INSERT INTO job_summary (namespace_id, group_name, business_id, trigger_at, system_task_type, success_num,fail_num,fail_reason,stop_num,stop_reason, cancel_num,cancel_reason) VALUES @@ -39,15 +39,33 @@ #{item.cancelReason} ) - ON DUPLICATE KEY UPDATE - success_num = VALUES(`success_num`), - fail_num = VALUES(`fail_num`), - fail_reason = VALUES(`fail_reason`), - stop_num = VALUES(`stop_num`), - stop_reason = VALUES(`stop_reason`), - cancel_num = VALUES(`cancel_num`), - cancel_reason = VALUES(`cancel_reason`) + + UPDATE job_summary rt, + ( + + SELECT + #{item.successNum} AS success_num, + #{item.failNum} AS fail_num, + #{item.failReason} AS fail_reason, + #{item.stopNum} AS stop_num, + #{item.stopReason} AS stop_reason, + #{item.cancelNum} AS cancel_num, + #{item.cancelReason} AS cancel_reason, + #{item.triggerAt} AS trigger_at, + #{item.businessId} AS business_id + + ) tt + SET + rt.success_num = tt.success_num, + rt.fail_num = tt.fail_num, + rt.fail_reason = tt.fail_reason, + rt.stop_num = tt.stop_num, + rt.stop_reason = tt.stop_reason, + rt.cancel_num = tt.cancel_num, + rt.cancel_reason = tt.cancel_reason + WHERE rt.trigger_at = tt.trigger_at and rt.business_id = tt.business_id + diff --git a/easy-retry-datasource/easy-retry-mysql-datasource/src/main/resources/mysql/mapper/ServerNodeMapper.xml b/easy-retry-datasource/easy-retry-mysql-datasource/src/main/resources/mysql/mapper/ServerNodeMapper.xml index 3a5542e71..55eb60f2c 100644 --- a/easy-retry-datasource/easy-retry-mysql-datasource/src/main/resources/mysql/mapper/ServerNodeMapper.xml +++ b/easy-retry-datasource/easy-retry-mysql-datasource/src/main/resources/mysql/mapper/ServerNodeMapper.xml @@ -1,53 +1,64 @@ - - - - - - - - - - - - - - + + + + + + + + + + + + + + - - id, namespace_id, group_name, context_path, host_id, host_ip, host_port, expire_at, node_type,create_dt,update_dt - + + id + , namespace_id, group_name, context_path, host_id, host_ip, host_port, expire_at, node_type,create_dt,update_dt + - - INSERT INTO server_node (namespace_id, group_name, host_id, host_ip, host_port, - expire_at, node_type, ext_attrs, context_path, create_dt) - VALUES + + INSERT INTO server_node (namespace_id, group_name, host_id, host_ip, host_port, + expire_at, node_type, ext_attrs, context_path, create_dt) + VALUES ( - #{item.namespaceId,jdbcType=VARCHAR}, - #{item.groupName,jdbcType=VARCHAR}, - #{item.hostId,jdbcType=VARCHAR}, - #{item.hostIp,jdbcType=VARCHAR}, - #{item.hostPort,jdbcType=INTEGER}, - #{item.expireAt,jdbcType=TIMESTAMP}, - #{item.nodeType,jdbcType=TINYINT}, - #{item.extAttrs,jdbcType=VARCHAR}, - #{item.contextPath,jdbcType=VARCHAR}, - #{item.createDt,jdbcType=TIMESTAMP} + #{item.namespaceId,jdbcType=VARCHAR}, + #{item.groupName,jdbcType=VARCHAR}, + #{item.hostId,jdbcType=VARCHAR}, + #{item.hostIp,jdbcType=VARCHAR}, + #{item.hostPort,jdbcType=INTEGER}, + #{item.expireAt,jdbcType=TIMESTAMP}, + #{item.nodeType,jdbcType=TINYINT}, + #{item.extAttrs,jdbcType=VARCHAR}, + #{item.contextPath,jdbcType=VARCHAR}, + #{item.createDt,jdbcType=TIMESTAMP} ) - ON DUPLICATE KEY UPDATE - expire_at = VALUES(`expire_at`) - - - + + + UPDATE server_node rt, + ( + + SELECT + #{item.expireAt} AS expire_at, + #{item.contextPath} AS context_path, + #{item.hostId} AS host_id, + #{item.hostIp} AS host_ip + + ) tt + SET rt.expire_at = tt.expire_at, rt.context_path = tt.context_path, + rt.context_path = tt.context_path + WHERE rt.host_id = tt.host_id and rt.host_ip = tt.host_ip + + diff --git a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/register/AbstractRegister.java b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/register/AbstractRegister.java index 5ce0b73da..187204371 100644 --- a/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/register/AbstractRegister.java +++ b/easy-retry-server/easy-retry-server-common/src/main/java/com/aizuda/easy/retry/server/common/register/AbstractRegister.java @@ -8,6 +8,7 @@ import com.aizuda.easy.retry.template.datasource.persistence.mapper.ServerNodeMa import com.aizuda.easy.retry.template.datasource.persistence.po.ServerNode; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.dao.DuplicateKeyException; import java.time.LocalDateTime; import java.util.List; @@ -41,22 +42,25 @@ public abstract class AbstractRegister implements Register, Lifecycle { protected void refreshExpireAt(List serverNodes) { - try { - - for (final ServerNode serverNode : serverNodes) { - serverNode.setExpireAt(getExpireAt()); - } - - serverNodeMapper.insertOrUpdate(serverNodes); - - for (final ServerNode serverNode : serverNodes) { - // 刷新本地缓存过期时间 - CacheRegisterTable.refreshExpireAt(serverNode); - } - - }catch (Exception e) { - EasyRetryLog.LOCAL.error("注册节点失败", e); + for (final ServerNode serverNode : serverNodes) { + serverNode.setExpireAt(getExpireAt()); } + + // 批量更新 + if (serverNodes.size() != serverNodeMapper.batchUpdateExpireAt(serverNodes)) { + try { + serverNodeMapper.batchInsert(serverNodes); + } catch (DuplicateKeyException ignored) { + } catch (Exception e) { + EasyRetryLog.LOCAL.error("注册节点失败", e); + } + } + + for (final ServerNode serverNode : serverNodes) { + // 刷新本地缓存过期时间 + CacheRegisterTable.refreshExpireAt(serverNode); + } + } protected abstract void beforeProcessor(RegisterContext context); @@ -86,5 +90,4 @@ public abstract class AbstractRegister implements Register, Lifecycle { protected abstract Integer getNodeType(); - } diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/schedule/JobSummarySchedule.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/schedule/JobSummarySchedule.java index 5e73e9374..a0f2e383d 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/schedule/JobSummarySchedule.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/schedule/JobSummarySchedule.java @@ -8,20 +8,27 @@ import com.aizuda.easy.retry.server.common.config.SystemProperties; import com.aizuda.easy.retry.server.common.dto.JobTaskBatchReason; import com.aizuda.easy.retry.server.common.enums.SyetemTaskTypeEnum; import com.aizuda.easy.retry.server.common.schedule.AbstractSchedule; +import com.aizuda.easy.retry.server.common.triple.Pair; import com.aizuda.easy.retry.template.datasource.persistence.dataobject.JobBatchSummaryResponseDO; import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobSummaryMapper; import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskBatchMapper; import com.aizuda.easy.retry.template.datasource.persistence.po.JobSummary; import com.aizuda.easy.retry.template.datasource.persistence.po.JobTaskBatch; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; +import org.springframework.util.CollectionUtils; import java.time.*; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; import java.util.stream.Collectors; /** @@ -66,10 +73,10 @@ public class JobSummarySchedule extends AbstractSchedule implements Lifecycle { LocalDateTime todayFrom = LocalDateTime.of(LocalDate.now(), LocalTime.MIN).plusDays(-i); LocalDateTime todayTo = LocalDateTime.of(LocalDate.now(), LocalTime.MAX).plusDays(-i); LambdaQueryWrapper wrapper = new LambdaQueryWrapper() - .eq(JobTaskBatch::getSystemTaskType, SyetemTaskTypeEnum.JOB.getType()) - .between(JobTaskBatch::getCreateDt, todayFrom, todayTo) - .groupBy(JobTaskBatch::getNamespaceId, JobTaskBatch::getGroupName, - JobTaskBatch::getJobId, JobTaskBatch::getTaskBatchStatus, JobTaskBatch::getOperationReason); + .eq(JobTaskBatch::getSystemTaskType, SyetemTaskTypeEnum.JOB.getType()) + .between(JobTaskBatch::getCreateDt, todayFrom, todayTo) + .groupBy(JobTaskBatch::getNamespaceId, JobTaskBatch::getGroupName, + JobTaskBatch::getJobId, JobTaskBatch::getTaskBatchStatus, JobTaskBatch::getOperationReason); List summaryResponseDOList = jobTaskBatchMapper.summaryJobBatchList(wrapper); if (summaryResponseDOList == null || summaryResponseDOList.size() < 1) { continue; @@ -77,17 +84,52 @@ public class JobSummarySchedule extends AbstractSchedule implements Lifecycle { // insertOrUpdate List jobSummaryList = jobSummaryList(todayFrom, summaryResponseDOList); - int totalJobSummary = jobSummaryMapper.insertOrUpdate(jobSummaryList); - EasyRetryLog.LOCAL.debug("job summary dashboard success todayFrom:[{}] todayTo:[{}] total:[{}]", todayFrom, todayTo, totalJobSummary); + + List jobSummaries = jobSummaryMapper.selectList(new LambdaQueryWrapper() + .eq(JobSummary::getTriggerAt, todayFrom) + .in(JobSummary::getBusinessId, jobSummaryList.stream().map(JobSummary::getBusinessId).collect( + Collectors.toSet()))); + + Map, JobSummary> summaryMap = jobSummaries.stream() + .collect( + Collectors.toMap(jobSummary -> Pair.of(jobSummary.getBusinessId(), jobSummary.getTriggerAt()), + k -> k)); + + List waitInserts = Lists.newArrayList(); + List waitUpdates = Lists.newArrayList(); + for (final JobSummary jobSummary : jobSummaryList) { + if (Objects.isNull( + summaryMap.get(Pair.of(jobSummary.getBusinessId(), jobSummary.getTriggerAt())))) { + waitInserts.add(jobSummary); + } else { + waitUpdates.add(jobSummary); + } + } + + int updateTotalJobSummary = 0; + if (!CollectionUtils.isEmpty(waitUpdates)) { + updateTotalJobSummary = jobSummaryMapper.batchUpdate(waitUpdates); + } + + int insertTotalJobSummary = 0; + if (!CollectionUtils.isEmpty(waitInserts)) { + insertTotalJobSummary = jobSummaryMapper.batchInsert(waitInserts); + } + + EasyRetryLog.LOCAL.debug( + "job summary dashboard success todayFrom:[{}] todayTo:[{}] updateTotalJobSummary:[{}] insertTotalJobSummary:[{}]", + todayFrom, todayTo, updateTotalJobSummary, insertTotalJobSummary); } } catch (Exception e) { EasyRetryLog.LOCAL.error("job summary dashboard log error", e); } } - private List jobSummaryList(LocalDateTime triggerAt, List summaryResponseDOList) { + private List jobSummaryList(LocalDateTime triggerAt, + List summaryResponseDOList) { List jobSummaryList = new ArrayList<>(); - Map> jobIdListMap = summaryResponseDOList.parallelStream().collect(Collectors.groupingBy(JobBatchSummaryResponseDO::getJobId)); + Map> jobIdListMap = summaryResponseDOList.parallelStream() + .collect(Collectors.groupingBy(JobBatchSummaryResponseDO::getJobId)); for (Map.Entry> job : jobIdListMap.entrySet()) { JobSummary jobSummary = new JobSummary(); jobSummary.setBusinessId(job.getKey()); @@ -100,9 +142,12 @@ public class JobSummarySchedule extends AbstractSchedule implements Lifecycle { jobSummary.setStopNum(job.getValue().stream().mapToInt(JobBatchSummaryResponseDO::getStopNum).sum()); jobSummary.setCancelNum(job.getValue().stream().mapToInt(JobBatchSummaryResponseDO::getCancelNum).sum()); - jobSummary.setFailReason(JsonUtil.toJsonString(jobTaskBatchReasonList(JobTaskBatchStatusEnum.FAIL.getStatus(), job.getValue()))); - jobSummary.setStopReason(JsonUtil.toJsonString(jobTaskBatchReasonList(JobTaskBatchStatusEnum.STOP.getStatus(), job.getValue()))); - jobSummary.setCancelReason(JsonUtil.toJsonString(jobTaskBatchReasonList(JobTaskBatchStatusEnum.CANCEL.getStatus(), job.getValue()))); + jobSummary.setFailReason( + JsonUtil.toJsonString(jobTaskBatchReasonList(JobTaskBatchStatusEnum.FAIL.getStatus(), job.getValue()))); + jobSummary.setStopReason( + JsonUtil.toJsonString(jobTaskBatchReasonList(JobTaskBatchStatusEnum.STOP.getStatus(), job.getValue()))); + jobSummary.setCancelReason(JsonUtil.toJsonString( + jobTaskBatchReasonList(JobTaskBatchStatusEnum.CANCEL.getStatus(), job.getValue()))); jobSummaryList.add(jobSummary); } return jobSummaryList; @@ -115,9 +160,11 @@ public class JobSummarySchedule extends AbstractSchedule implements Lifecycle { * @param jobBatchSummaryResponseDOList * @return */ - private List jobTaskBatchReasonList(int jobTaskBatchStatus, List jobBatchSummaryResponseDOList) { + private List jobTaskBatchReasonList(int jobTaskBatchStatus, + List jobBatchSummaryResponseDOList) { List jobTaskBatchReasonArrayList = new ArrayList<>(); - List summaryResponseDOList = jobBatchSummaryResponseDOList.stream().filter(i -> jobTaskBatchStatus == i.getTaskBatchStatus()).collect(Collectors.toList()); + List summaryResponseDOList = jobBatchSummaryResponseDOList.stream() + .filter(i -> jobTaskBatchStatus == i.getTaskBatchStatus()).collect(Collectors.toList()); for (JobBatchSummaryResponseDO jobBatchSummaryResponseDO : summaryResponseDOList) { JobTaskBatchReason jobTaskBatchReason = new JobTaskBatchReason(); jobTaskBatchReason.setReason(jobBatchSummaryResponseDO.getOperationReason()); diff --git a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/schedule/WorkflowJobSummarySchedule.java b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/schedule/WorkflowJobSummarySchedule.java index 16a12ae57..904d705f3 100644 --- a/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/schedule/WorkflowJobSummarySchedule.java +++ b/easy-retry-server/easy-retry-server-job-task/src/main/java/com/aizuda/easy/retry/server/job/task/support/schedule/WorkflowJobSummarySchedule.java @@ -8,6 +8,7 @@ import com.aizuda.easy.retry.server.common.config.SystemProperties; import com.aizuda.easy.retry.server.common.dto.JobTaskBatchReason; import com.aizuda.easy.retry.server.common.enums.SyetemTaskTypeEnum; import com.aizuda.easy.retry.server.common.schedule.AbstractSchedule; +import com.aizuda.easy.retry.server.common.triple.Pair; import com.aizuda.easy.retry.template.datasource.persistence.dataobject.JobBatchSummaryResponseDO; import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobSummaryMapper; import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskBatchMapper; @@ -15,9 +16,12 @@ import com.aizuda.easy.retry.template.datasource.persistence.po.JobSummary; import com.aizuda.easy.retry.template.datasource.persistence.po.JobTaskBatch; import com.aizuda.easy.retry.template.datasource.persistence.po.WorkflowTaskBatch; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; +import org.springframework.util.CollectionUtils; import java.time.Duration; import java.time.LocalDate; @@ -26,6 +30,8 @@ import java.time.LocalTime; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Objects; +import java.util.Set; import java.util.stream.Collectors; /** @@ -70,27 +76,64 @@ public class WorkflowJobSummarySchedule extends AbstractSchedule implements Life LocalDateTime todayFrom = LocalDateTime.of(LocalDate.now(), LocalTime.MIN).plusDays(-i); LocalDateTime todayTo = LocalDateTime.of(LocalDate.now(), LocalTime.MAX).plusDays(-i); LambdaQueryWrapper wrapper = new LambdaQueryWrapper() - .between(WorkflowTaskBatch::getCreateDt, todayFrom, todayTo) - .groupBy(WorkflowTaskBatch::getNamespaceId, WorkflowTaskBatch::getGroupName, - WorkflowTaskBatch::getWorkflowId, WorkflowTaskBatch::getTaskBatchStatus, WorkflowTaskBatch::getOperationReason); - List summaryWorkflowResponseDOList = jobTaskBatchMapper.summaryWorkflowTaskBatchList(wrapper); - if (summaryWorkflowResponseDOList == null || summaryWorkflowResponseDOList.size() < 1) { + .between(WorkflowTaskBatch::getCreateDt, todayFrom, todayTo) + .groupBy(WorkflowTaskBatch::getNamespaceId, WorkflowTaskBatch::getGroupName, + WorkflowTaskBatch::getWorkflowId, WorkflowTaskBatch::getTaskBatchStatus, + WorkflowTaskBatch::getOperationReason); + List summaryWorkflowResponseDOList = jobTaskBatchMapper.summaryWorkflowTaskBatchList( + wrapper); + if (CollectionUtils.isEmpty(summaryWorkflowResponseDOList)) { continue; } // insertOrUpdate List jobSummaryList = jobSummaryList(todayFrom, summaryWorkflowResponseDOList); - int totalJobSummary = jobSummaryMapper.insertOrUpdate(jobSummaryList); - EasyRetryLog.LOCAL.debug("workflow job summary dashboard success todayFrom:[{}] todayTo:[{}] total:[{}]", todayFrom, todayTo, totalJobSummary); + + List jobSummaries = jobSummaryMapper.selectList(new LambdaQueryWrapper() + .eq(JobSummary::getTriggerAt, todayFrom) + .in(JobSummary::getBusinessId, jobSummaryList.stream().map(JobSummary::getBusinessId).collect( + Collectors.toSet()))); + + Map, JobSummary> summaryMap = jobSummaries.stream() + .collect( + Collectors.toMap(jobSummary -> Pair.of(jobSummary.getBusinessId(), jobSummary.getTriggerAt()), + k -> k)); + + List waitInserts = Lists.newArrayList(); + List waitUpdates = Lists.newArrayList(); + for (final JobSummary jobSummary : jobSummaryList) { + if (Objects.isNull( + summaryMap.get(Pair.of(jobSummary.getBusinessId(), jobSummary.getTriggerAt())))) { + waitInserts.add(jobSummary); + } else { + waitUpdates.add(jobSummary); + } + } + + int updateTotalJobSummary = 0; + if (!CollectionUtils.isEmpty(waitUpdates)) { + updateTotalJobSummary = jobSummaryMapper.batchUpdate(waitUpdates); + } + + int insertTotalJobSummary = 0; + if (!CollectionUtils.isEmpty(waitInserts)) { + insertTotalJobSummary = jobSummaryMapper.batchInsert(waitInserts); + } + + EasyRetryLog.LOCAL.debug( + "workflow job summary dashboard success todayFrom:[{}] todayTo:[{}] updateTotalJobSummary:[{}] insertTotalJobSummary:[{}]", + todayFrom, todayTo, updateTotalJobSummary, insertTotalJobSummary); } } catch (Exception e) { EasyRetryLog.LOCAL.error("workflow job summary dashboard log error", e); } } - private List jobSummaryList(LocalDateTime triggerAt, List summaryResponseDOList) { + private List jobSummaryList(LocalDateTime triggerAt, + List summaryResponseDOList) { List jobSummaryList = new ArrayList<>(); - Map> jobIdListMap = summaryResponseDOList.parallelStream().collect(Collectors.groupingBy(JobBatchSummaryResponseDO::getJobId)); + Map> jobIdListMap = summaryResponseDOList.parallelStream() + .collect(Collectors.groupingBy(JobBatchSummaryResponseDO::getJobId)); for (Map.Entry> job : jobIdListMap.entrySet()) { JobSummary jobSummary = new JobSummary(); jobSummary.setBusinessId(job.getKey()); @@ -103,9 +146,12 @@ public class WorkflowJobSummarySchedule extends AbstractSchedule implements Life jobSummary.setStopNum(job.getValue().stream().mapToInt(JobBatchSummaryResponseDO::getStopNum).sum()); jobSummary.setCancelNum(job.getValue().stream().mapToInt(JobBatchSummaryResponseDO::getCancelNum).sum()); - jobSummary.setFailReason(JsonUtil.toJsonString(jobTaskBatchReasonList(JobTaskBatchStatusEnum.FAIL.getStatus(), job.getValue()))); - jobSummary.setStopReason(JsonUtil.toJsonString(jobTaskBatchReasonList(JobTaskBatchStatusEnum.STOP.getStatus(), job.getValue()))); - jobSummary.setCancelReason(JsonUtil.toJsonString(jobTaskBatchReasonList(JobTaskBatchStatusEnum.CANCEL.getStatus(), job.getValue()))); + jobSummary.setFailReason( + JsonUtil.toJsonString(jobTaskBatchReasonList(JobTaskBatchStatusEnum.FAIL.getStatus(), job.getValue()))); + jobSummary.setStopReason( + JsonUtil.toJsonString(jobTaskBatchReasonList(JobTaskBatchStatusEnum.STOP.getStatus(), job.getValue()))); + jobSummary.setCancelReason(JsonUtil.toJsonString( + jobTaskBatchReasonList(JobTaskBatchStatusEnum.CANCEL.getStatus(), job.getValue()))); jobSummaryList.add(jobSummary); } return jobSummaryList; @@ -118,9 +164,11 @@ public class WorkflowJobSummarySchedule extends AbstractSchedule implements Life * @param jobBatchSummaryResponseDOList * @return */ - private List jobTaskBatchReasonList(int jobTaskBatchStatus, List jobBatchSummaryResponseDOList) { + private List jobTaskBatchReasonList(int jobTaskBatchStatus, + List jobBatchSummaryResponseDOList) { List jobTaskBatchReasonArrayList = new ArrayList<>(); - List summaryResponseDOList = jobBatchSummaryResponseDOList.stream().filter(i -> jobTaskBatchStatus == i.getTaskBatchStatus()).collect(Collectors.toList()); + List summaryResponseDOList = jobBatchSummaryResponseDOList.stream() + .filter(i -> jobTaskBatchStatus == i.getTaskBatchStatus()).collect(Collectors.toList()); for (JobBatchSummaryResponseDO jobBatchSummaryResponseDO : summaryResponseDOList) { JobTaskBatchReason jobTaskBatchReason = new JobTaskBatchReason(); jobTaskBatchReason.setReason(jobBatchSummaryResponseDO.getOperationReason()); diff --git a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/schedule/RetrySummarySchedule.java b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/schedule/RetrySummarySchedule.java index 1fb053168..dcb13a1b2 100644 --- a/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/schedule/RetrySummarySchedule.java +++ b/easy-retry-server/easy-retry-server-retry-task/src/main/java/com/aizuda/easy/retry/server/retry/task/support/schedule/RetrySummarySchedule.java @@ -4,15 +4,21 @@ import com.aizuda.easy.retry.common.log.EasyRetryLog; import com.aizuda.easy.retry.server.common.Lifecycle; import com.aizuda.easy.retry.server.common.config.SystemProperties; import com.aizuda.easy.retry.server.common.schedule.AbstractSchedule; +import com.aizuda.easy.retry.server.common.triple.Pair; +import com.aizuda.easy.retry.server.common.triple.Triple; import com.aizuda.easy.retry.template.datasource.persistence.dataobject.DashboardRetryResponseDO; import com.aizuda.easy.retry.template.datasource.persistence.mapper.RetrySummaryMapper; import com.aizuda.easy.retry.template.datasource.persistence.mapper.RetryTaskLogMapper; +import com.aizuda.easy.retry.template.datasource.persistence.po.JobSummary; import com.aizuda.easy.retry.template.datasource.persistence.po.RetrySummary; import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTaskLog; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; +import org.springframework.util.CollectionUtils; import java.time.Duration; import java.time.LocalDate; @@ -20,6 +26,10 @@ import java.time.LocalDateTime; import java.time.LocalTime; import java.util.ArrayList; import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; /** * Retry Dashboard @@ -66,20 +76,63 @@ public class RetrySummarySchedule extends AbstractSchedule implements Lifecycle .between(RetryTaskLog::getCreateDt, todayFrom, todayTo) .groupBy(RetryTaskLog::getNamespaceId, RetryTaskLog::getGroupName, RetryTaskLog::getSceneName); List dashboardRetryResponseDOList = retryTaskLogMapper.retrySummaryRetryTaskLogList(wrapper); - if (dashboardRetryResponseDOList == null || dashboardRetryResponseDOList.size() < 1) { + if (CollectionUtils.isEmpty(dashboardRetryResponseDOList)) { continue; } // insertOrUpdate List retrySummaryList = retrySummaryList(todayFrom, dashboardRetryResponseDOList); - int totalRetrySummary = retrySummaryMapper.insertOrUpdate(retrySummaryList); - EasyRetryLog.LOCAL.debug("retry summary dashboard success todayFrom:[{}] todayTo:[{}] total:[{}]", todayFrom, todayTo, totalRetrySummary); + + Set groupNames = Sets.newHashSet(); + Set namespaceIds = Sets.newHashSet(); + Set sceneNames = Sets.newHashSet(); + for (final RetrySummary retrySummary : retrySummaryList) { + groupNames.add(retrySummary.getGroupName()); + namespaceIds.add(retrySummary.getNamespaceId()); + sceneNames.add(retrySummary.getSceneName()); + } + + List retrySummaries = retrySummaryMapper.selectList(new LambdaQueryWrapper() + .in(RetrySummary::getGroupName, groupNames) + .in(RetrySummary::getNamespaceId, namespaceIds) + .in(RetrySummary::getSceneName, sceneNames) + .eq(RetrySummary::getTriggerAt, todayFrom) + ); + + Map, RetrySummary> summaryMap = retrySummaries.stream() + .collect(Collectors.toMap(retrySummary -> Triple.of(mergeKey(retrySummary), retrySummary.getSceneName(), retrySummary.getTriggerAt()), k -> k)); + + List waitInserts = Lists.newArrayList(); + List waitUpdates = Lists.newArrayList(); + for (final RetrySummary retrySummary : retrySummaries) { + if (Objects.isNull(summaryMap.get(Triple.of(mergeKey(retrySummary), retrySummary.getSceneName(), retrySummary.getTriggerAt())))) { + waitInserts.add(retrySummary); + } else { + waitUpdates.add(retrySummary); + } + } + + int insertTotalRetrySummary = 0; + if (!CollectionUtils.isEmpty(waitInserts)) { + insertTotalRetrySummary = retrySummaryMapper.batchInsert(waitInserts); + } + + int updateTotalRetrySummary = 0; + if (!CollectionUtils.isEmpty(waitUpdates)) { + updateTotalRetrySummary = retrySummaryMapper.batchUpdate(waitUpdates); + } + + EasyRetryLog.LOCAL.debug("retry summary dashboard success todayFrom:[{}] todayTo:[{}] insertTotalRetrySummary:[{}] updateTotalRetrySummary:[{}]", todayFrom, todayTo, insertTotalRetrySummary, updateTotalRetrySummary); } } catch (Exception e) { EasyRetryLog.LOCAL.error("retry summary dashboard log error", e); } } + private String mergeKey(final RetrySummary retrySummary) { + return retrySummary.getGroupName() + retrySummary.getNamespaceId(); + } + private List retrySummaryList(LocalDateTime triggerAt, List dashboardRetryResponseDOList) { List retrySummaryList = new ArrayList<>(); for (DashboardRetryResponseDO dashboardRetryResponseDO : dashboardRetryResponseDOList) {