feat(3.2.0): 完成inserOrUpdate的改造

1. job的任务统计
2. workflow的任务统计
3. 重试任务统计
4. 客户端注册和续期
This commit is contained in:
byteblogs168 2024-03-30 18:36:06 +08:00
parent a7211b4691
commit 9cf223278a
11 changed files with 319 additions and 108 deletions

View File

@ -29,7 +29,7 @@ CREATE TABLE `group_config`
`namespace_id` varchar(64) NOT NULL DEFAULT '764d604ec6fc45f68cd92514c40e9e1a' COMMENT '命名空间id',
`group_name` varchar(64) NOT NULL DEFAULT '' COMMENT '组名称',
`description` varchar(256) NOT NULL DEFAULT '' COMMENT '组描述',
`token` varchar(256) NOT NULL DEFAULT 'ER_cKqBTPzCsWA3VyuCfFoccmuIEGXjr5KT' COMMENT 'token',
`token` varchar(64) NOT NULL DEFAULT 'ER_cKqBTPzCsWA3VyuCfFoccmuIEGXjr5KT' COMMENT 'token',
`group_status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '组状态 0、未启用 1、启用',
`version` int(11) NOT NULL COMMENT '版本号',
`group_partition` int(11) NOT NULL COMMENT '分区',
@ -426,6 +426,7 @@ CREATE TABLE `job_summary`
`create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
PRIMARY KEY (`id`),
KEY `idx_trigger_at` (`trigger_at`),
KEY `idx_namespace_id_group_name_business_id` (`namespace_id`, `group_name`, business_id),
UNIQUE KEY `uk_business_id_trigger_at` (`business_id`, `trigger_at`) USING BTREE
) ENGINE = InnoDB
@ -446,6 +447,7 @@ CREATE TABLE `retry_summary`
`create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
PRIMARY KEY (`id`),
KEY `idx_trigger_at` (`trigger_at`),
UNIQUE KEY `uk_scene_name_trigger_at` (`namespace_id`, `group_name`, `scene_name`, `trigger_at`) USING BTREE
) ENGINE = InnoDB
AUTO_INCREMENT = 1

View File

@ -22,7 +22,10 @@ import java.util.List;
@Mapper
public interface JobSummaryMapper extends BaseMapper<JobSummary> {
int insertOrUpdate(@Param("list") List<JobSummary> list);
int batchInsert(@Param("list") List<JobSummary> list);
int batchUpdate(@Param("list") List<JobSummary> list);
IPage<DashboardRetryLineResponseDO.Task> jobTaskList(@Param("ew") Wrapper<Job> wrapper, Page<Object> page);

View File

@ -24,7 +24,9 @@ import java.util.List;
@Mapper
public interface RetrySummaryMapper extends BaseMapper<RetrySummary> {
int insertOrUpdate(@Param("list") List<RetrySummary> list);
int batchInsert(@Param("list") List<RetrySummary> list);
int batchUpdate(@Param("list") List<RetrySummary> list);
DashboardCardResponseDO.RetryTask retryTask(@Param("ew") Wrapper<RetrySummary> wrapper);

View File

@ -14,7 +14,11 @@ import java.util.List;
@Mapper
public interface ServerNodeMapper extends BaseMapper<ServerNode> {
int insertOrUpdate(@Param("records") List<ServerNode> records);
// int insertOrUpdate(@Param("records") List<ServerNode> records);
int batchUpdateExpireAt(@Param("list") List<ServerNode> list);
int batchInsert(@Param("records") List<ServerNode> records);
List<ActivePodQuantityResponseDO> countActivePod(@Param("ew") Wrapper<ServerNode> wrapper);

View File

@ -19,7 +19,7 @@
<result column="update_dt" jdbcType="TIMESTAMP" property="updateDt"/>
</resultMap>
<insert id="insertOrUpdate" parameterType="java.util.List" useGeneratedKeys="true" keyProperty="id">
<insert id="batchInsert" parameterType="java.util.List" useGeneratedKeys="true" keyProperty="id">
INSERT INTO job_summary (namespace_id, group_name, business_id, trigger_at, system_task_type,
success_num,fail_num,fail_reason,stop_num,stop_reason, cancel_num,cancel_reason)
VALUES
@ -39,15 +39,33 @@
#{item.cancelReason}
)
</foreach>
ON DUPLICATE KEY UPDATE
success_num = VALUES(`success_num`),
fail_num = VALUES(`fail_num`),
fail_reason = VALUES(`fail_reason`),
stop_num = VALUES(`stop_num`),
stop_reason = VALUES(`stop_reason`),
cancel_num = VALUES(`cancel_num`),
cancel_reason = VALUES(`cancel_reason`)
</insert>
<update id="batchUpdate" parameterType="java.util.List">
UPDATE job_summary rt,
(
<foreach collection="list" item="item" index="index" separator=" UNION ALL ">
SELECT
#{item.successNum} AS success_num,
#{item.failNum} AS fail_num,
#{item.failReason} AS fail_reason,
#{item.stopNum} AS stop_num,
#{item.stopReason} AS stop_reason,
#{item.cancelNum} AS cancel_num,
#{item.cancelReason} AS cancel_reason,
#{item.triggerAt} AS trigger_at,
#{item.businessId} AS business_id
</foreach>
) tt
SET
rt.success_num = tt.success_num,
rt.fail_num = tt.fail_num,
rt.fail_reason = tt.fail_reason,
rt.stop_num = tt.stop_num,
rt.stop_reason = tt.stop_reason,
rt.cancel_num = tt.cancel_num,
rt.cancel_reason = tt.cancel_reason
WHERE rt.trigger_at = tt.trigger_at and rt.business_id = tt.business_id
</update>
<select id="jobLineList"
resultType="com.aizuda.easy.retry.template.datasource.persistence.dataobject.DashboardLineResponseDO">

View File

@ -15,7 +15,7 @@
<result column="update_dt" jdbcType="TIMESTAMP" property="updateDt"/>
</resultMap>
<insert id="insertOrUpdate" parameterType="java.util.List" useGeneratedKeys="true" keyProperty="id">
<insert id="batchInsert" parameterType="java.util.List" useGeneratedKeys="true" keyProperty="id">
INSERT INTO
retry_summary (namespace_id, group_name, scene_name, trigger_at, running_num, finish_num, max_count_num,
suspend_num)
@ -32,12 +32,32 @@
#{item.suspendNum}
)
</foreach>
ON DUPLICATE KEY UPDATE
running_num = values(`running_num`),
finish_num = values(`finish_num`),
max_count_num = values(`max_count_num`),
suspend_num = values(`suspend_num`)
</insert>
<update id="batchUpdate" parameterType="java.util.List">
UPDATE retry_summary rt,
(
<foreach collection="list" item="item" index="index" separator=" UNION ALL ">
SELECT
#{item.runningNum} AS running_num,
#{item.finishNum} AS finish_num,
#{item.maxCountNum} AS max_count_num,
#{item.suspendNum} AS suspend_num,
#{item.triggerAt} AS trigger_at,
#{item.sceneName} AS scene_name,
#{item.namespaceId} AS namespace_id,
#{item.groupName} AS group_name
</foreach>
) tt
SET
rt.running_num = tt.running_num,
rt.finish_num = tt.finish_num,
rt.max_count_num = tt.max_count_num,
rt.suspend_num = tt.suspend_num
WHERE rt.trigger_at = tt.trigger_at
and rt.group_name = tt.group_name
and rt.namespace_id = tt.namespace_id
and rt.scene_name = tt.scene_name
</update>
<select id="retryTask"
resultType="com.aizuda.easy.retry.template.datasource.persistence.dataobject.DashboardCardResponseDO$RetryTask">

View File

@ -1,53 +1,64 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.aizuda.easy.retry.template.datasource.persistence.mapper.ServerNodeMapper">
<resultMap id="BaseResultMap" type="com.aizuda.easy.retry.template.datasource.persistence.po.ServerNode">
<id column="id" jdbcType="BIGINT" property="id" />
<result column="namespace_id" jdbcType="VARCHAR" property="namespaceId" />
<result column="group_name" jdbcType="VARCHAR" property="groupName" />
<result column="host_id" jdbcType="VARCHAR" property="hostId" />
<result column="host_ip" jdbcType="VARCHAR" property="hostIp" />
<result column="host_port" jdbcType="INTEGER" property="hostPort" />
<result column="expire_at" jdbcType="TIMESTAMP" property="expireAt" />
<result column="node_type" jdbcType="TINYINT" property="nodeType" />
<result column="context_path" jdbcType="VARCHAR" property="contextPath" />
<result column="ext_attrs" jdbcType="VARCHAR" property="extAttrs" />
<result column="create_dt" jdbcType="TIMESTAMP" property="createDt" />
<result column="update_dt" jdbcType="TIMESTAMP" property="updateDt" />
</resultMap>
<resultMap id="BaseResultMap" type="com.aizuda.easy.retry.template.datasource.persistence.po.ServerNode">
<id column="id" jdbcType="BIGINT" property="id"/>
<result column="namespace_id" jdbcType="VARCHAR" property="namespaceId"/>
<result column="group_name" jdbcType="VARCHAR" property="groupName"/>
<result column="host_id" jdbcType="VARCHAR" property="hostId"/>
<result column="host_ip" jdbcType="VARCHAR" property="hostIp"/>
<result column="host_port" jdbcType="INTEGER" property="hostPort"/>
<result column="expire_at" jdbcType="TIMESTAMP" property="expireAt"/>
<result column="node_type" jdbcType="TINYINT" property="nodeType"/>
<result column="context_path" jdbcType="VARCHAR" property="contextPath"/>
<result column="ext_attrs" jdbcType="VARCHAR" property="extAttrs"/>
<result column="create_dt" jdbcType="TIMESTAMP" property="createDt"/>
<result column="update_dt" jdbcType="TIMESTAMP" property="updateDt"/>
</resultMap>
<sql id="Base_Column_List">
id, namespace_id, group_name, context_path, host_id, host_ip, host_port, expire_at, node_type,create_dt,update_dt
</sql>
<sql id="Base_Column_List">
id
, namespace_id, group_name, context_path, host_id, host_ip, host_port, expire_at, node_type,create_dt,update_dt
</sql>
<insert id="insertOrUpdate" parameterType="java.util.List" useGeneratedKeys="true" keyProperty="id">
INSERT INTO server_node (namespace_id, group_name, host_id, host_ip, host_port,
expire_at, node_type, ext_attrs, context_path, create_dt)
VALUES
<insert id="batchInsert" parameterType="java.util.List" useGeneratedKeys="true" keyProperty="id">
INSERT INTO server_node (namespace_id, group_name, host_id, host_ip, host_port,
expire_at, node_type, ext_attrs, context_path, create_dt)
VALUES
<foreach collection="records" item="item" index="index" separator=",">
(
#{item.namespaceId,jdbcType=VARCHAR},
#{item.groupName,jdbcType=VARCHAR},
#{item.hostId,jdbcType=VARCHAR},
#{item.hostIp,jdbcType=VARCHAR},
#{item.hostPort,jdbcType=INTEGER},
#{item.expireAt,jdbcType=TIMESTAMP},
#{item.nodeType,jdbcType=TINYINT},
#{item.extAttrs,jdbcType=VARCHAR},
#{item.contextPath,jdbcType=VARCHAR},
#{item.createDt,jdbcType=TIMESTAMP}
#{item.namespaceId,jdbcType=VARCHAR},
#{item.groupName,jdbcType=VARCHAR},
#{item.hostId,jdbcType=VARCHAR},
#{item.hostIp,jdbcType=VARCHAR},
#{item.hostPort,jdbcType=INTEGER},
#{item.expireAt,jdbcType=TIMESTAMP},
#{item.nodeType,jdbcType=TINYINT},
#{item.extAttrs,jdbcType=VARCHAR},
#{item.contextPath,jdbcType=VARCHAR},
#{item.createDt,jdbcType=TIMESTAMP}
)
</foreach>
ON DUPLICATE KEY UPDATE
expire_at = VALUES(`expire_at`)
</insert>
<select id="countActivePod"
resultType="com.aizuda.easy.retry.template.datasource.persistence.dataobject.ActivePodQuantityResponseDO">
SELECT
node_type AS nodeType,
COUNT(*) AS total
FROM server_node
${ew.customSqlSegment}
</select>
</insert>
<update id="batchUpdateExpireAt" parameterType="java.util.List">
UPDATE server_node rt,
(
<foreach collection="list" item="item" index="index" separator=" UNION ALL ">
SELECT
#{item.expireAt} AS expire_at,
#{item.contextPath} AS context_path,
#{item.hostId} AS host_id,
#{item.hostIp} AS host_ip
</foreach>
) tt
SET rt.expire_at = tt.expire_at, rt.context_path = tt.context_path,
rt.context_path = tt.context_path
WHERE rt.host_id = tt.host_id and rt.host_ip = tt.host_ip
</update>
<select id="countActivePod"
resultType="com.aizuda.easy.retry.template.datasource.persistence.dataobject.ActivePodQuantityResponseDO">
SELECT node_type AS nodeType,
COUNT(*) AS total
FROM server_node ${ew.customSqlSegment}
</select>
</mapper>

View File

@ -8,6 +8,7 @@ import com.aizuda.easy.retry.template.datasource.persistence.mapper.ServerNodeMa
import com.aizuda.easy.retry.template.datasource.persistence.po.ServerNode;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.dao.DuplicateKeyException;
import java.time.LocalDateTime;
import java.util.List;
@ -41,22 +42,25 @@ public abstract class AbstractRegister implements Register, Lifecycle {
protected void refreshExpireAt(List<ServerNode> serverNodes) {
try {
for (final ServerNode serverNode : serverNodes) {
serverNode.setExpireAt(getExpireAt());
}
serverNodeMapper.insertOrUpdate(serverNodes);
for (final ServerNode serverNode : serverNodes) {
// 刷新本地缓存过期时间
CacheRegisterTable.refreshExpireAt(serverNode);
}
}catch (Exception e) {
EasyRetryLog.LOCAL.error("注册节点失败", e);
for (final ServerNode serverNode : serverNodes) {
serverNode.setExpireAt(getExpireAt());
}
// 批量更新
if (serverNodes.size() != serverNodeMapper.batchUpdateExpireAt(serverNodes)) {
try {
serverNodeMapper.batchInsert(serverNodes);
} catch (DuplicateKeyException ignored) {
} catch (Exception e) {
EasyRetryLog.LOCAL.error("注册节点失败", e);
}
}
for (final ServerNode serverNode : serverNodes) {
// 刷新本地缓存过期时间
CacheRegisterTable.refreshExpireAt(serverNode);
}
}
protected abstract void beforeProcessor(RegisterContext context);
@ -86,5 +90,4 @@ public abstract class AbstractRegister implements Register, Lifecycle {
protected abstract Integer getNodeType();
}

View File

@ -8,20 +8,27 @@ import com.aizuda.easy.retry.server.common.config.SystemProperties;
import com.aizuda.easy.retry.server.common.dto.JobTaskBatchReason;
import com.aizuda.easy.retry.server.common.enums.SyetemTaskTypeEnum;
import com.aizuda.easy.retry.server.common.schedule.AbstractSchedule;
import com.aizuda.easy.retry.server.common.triple.Pair;
import com.aizuda.easy.retry.template.datasource.persistence.dataobject.JobBatchSummaryResponseDO;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobSummaryMapper;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskBatchMapper;
import com.aizuda.easy.retry.template.datasource.persistence.po.JobSummary;
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTaskBatch;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.time.*;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
/**
@ -66,10 +73,10 @@ public class JobSummarySchedule extends AbstractSchedule implements Lifecycle {
LocalDateTime todayFrom = LocalDateTime.of(LocalDate.now(), LocalTime.MIN).plusDays(-i);
LocalDateTime todayTo = LocalDateTime.of(LocalDate.now(), LocalTime.MAX).plusDays(-i);
LambdaQueryWrapper<JobTaskBatch> wrapper = new LambdaQueryWrapper<JobTaskBatch>()
.eq(JobTaskBatch::getSystemTaskType, SyetemTaskTypeEnum.JOB.getType())
.between(JobTaskBatch::getCreateDt, todayFrom, todayTo)
.groupBy(JobTaskBatch::getNamespaceId, JobTaskBatch::getGroupName,
JobTaskBatch::getJobId, JobTaskBatch::getTaskBatchStatus, JobTaskBatch::getOperationReason);
.eq(JobTaskBatch::getSystemTaskType, SyetemTaskTypeEnum.JOB.getType())
.between(JobTaskBatch::getCreateDt, todayFrom, todayTo)
.groupBy(JobTaskBatch::getNamespaceId, JobTaskBatch::getGroupName,
JobTaskBatch::getJobId, JobTaskBatch::getTaskBatchStatus, JobTaskBatch::getOperationReason);
List<JobBatchSummaryResponseDO> summaryResponseDOList = jobTaskBatchMapper.summaryJobBatchList(wrapper);
if (summaryResponseDOList == null || summaryResponseDOList.size() < 1) {
continue;
@ -77,17 +84,52 @@ public class JobSummarySchedule extends AbstractSchedule implements Lifecycle {
// insertOrUpdate
List<JobSummary> jobSummaryList = jobSummaryList(todayFrom, summaryResponseDOList);
int totalJobSummary = jobSummaryMapper.insertOrUpdate(jobSummaryList);
EasyRetryLog.LOCAL.debug("job summary dashboard success todayFrom:[{}] todayTo:[{}] total:[{}]", todayFrom, todayTo, totalJobSummary);
List<JobSummary> jobSummaries = jobSummaryMapper.selectList(new LambdaQueryWrapper<JobSummary>()
.eq(JobSummary::getTriggerAt, todayFrom)
.in(JobSummary::getBusinessId, jobSummaryList.stream().map(JobSummary::getBusinessId).collect(
Collectors.toSet())));
Map<Pair<Long, LocalDateTime>, JobSummary> summaryMap = jobSummaries.stream()
.collect(
Collectors.toMap(jobSummary -> Pair.of(jobSummary.getBusinessId(), jobSummary.getTriggerAt()),
k -> k));
List<JobSummary> waitInserts = Lists.newArrayList();
List<JobSummary> waitUpdates = Lists.newArrayList();
for (final JobSummary jobSummary : jobSummaryList) {
if (Objects.isNull(
summaryMap.get(Pair.of(jobSummary.getBusinessId(), jobSummary.getTriggerAt())))) {
waitInserts.add(jobSummary);
} else {
waitUpdates.add(jobSummary);
}
}
int updateTotalJobSummary = 0;
if (!CollectionUtils.isEmpty(waitUpdates)) {
updateTotalJobSummary = jobSummaryMapper.batchUpdate(waitUpdates);
}
int insertTotalJobSummary = 0;
if (!CollectionUtils.isEmpty(waitInserts)) {
insertTotalJobSummary = jobSummaryMapper.batchInsert(waitInserts);
}
EasyRetryLog.LOCAL.debug(
"job summary dashboard success todayFrom:[{}] todayTo:[{}] updateTotalJobSummary:[{}] insertTotalJobSummary:[{}]",
todayFrom, todayTo, updateTotalJobSummary, insertTotalJobSummary);
}
} catch (Exception e) {
EasyRetryLog.LOCAL.error("job summary dashboard log error", e);
}
}
private List<JobSummary> jobSummaryList(LocalDateTime triggerAt, List<JobBatchSummaryResponseDO> summaryResponseDOList) {
private List<JobSummary> jobSummaryList(LocalDateTime triggerAt,
List<JobBatchSummaryResponseDO> summaryResponseDOList) {
List<JobSummary> jobSummaryList = new ArrayList<>();
Map<Long, List<JobBatchSummaryResponseDO>> jobIdListMap = summaryResponseDOList.parallelStream().collect(Collectors.groupingBy(JobBatchSummaryResponseDO::getJobId));
Map<Long, List<JobBatchSummaryResponseDO>> jobIdListMap = summaryResponseDOList.parallelStream()
.collect(Collectors.groupingBy(JobBatchSummaryResponseDO::getJobId));
for (Map.Entry<Long, List<JobBatchSummaryResponseDO>> job : jobIdListMap.entrySet()) {
JobSummary jobSummary = new JobSummary();
jobSummary.setBusinessId(job.getKey());
@ -100,9 +142,12 @@ public class JobSummarySchedule extends AbstractSchedule implements Lifecycle {
jobSummary.setStopNum(job.getValue().stream().mapToInt(JobBatchSummaryResponseDO::getStopNum).sum());
jobSummary.setCancelNum(job.getValue().stream().mapToInt(JobBatchSummaryResponseDO::getCancelNum).sum());
jobSummary.setFailReason(JsonUtil.toJsonString(jobTaskBatchReasonList(JobTaskBatchStatusEnum.FAIL.getStatus(), job.getValue())));
jobSummary.setStopReason(JsonUtil.toJsonString(jobTaskBatchReasonList(JobTaskBatchStatusEnum.STOP.getStatus(), job.getValue())));
jobSummary.setCancelReason(JsonUtil.toJsonString(jobTaskBatchReasonList(JobTaskBatchStatusEnum.CANCEL.getStatus(), job.getValue())));
jobSummary.setFailReason(
JsonUtil.toJsonString(jobTaskBatchReasonList(JobTaskBatchStatusEnum.FAIL.getStatus(), job.getValue())));
jobSummary.setStopReason(
JsonUtil.toJsonString(jobTaskBatchReasonList(JobTaskBatchStatusEnum.STOP.getStatus(), job.getValue())));
jobSummary.setCancelReason(JsonUtil.toJsonString(
jobTaskBatchReasonList(JobTaskBatchStatusEnum.CANCEL.getStatus(), job.getValue())));
jobSummaryList.add(jobSummary);
}
return jobSummaryList;
@ -115,9 +160,11 @@ public class JobSummarySchedule extends AbstractSchedule implements Lifecycle {
* @param jobBatchSummaryResponseDOList
* @return
*/
private List<JobTaskBatchReason> jobTaskBatchReasonList(int jobTaskBatchStatus, List<JobBatchSummaryResponseDO> jobBatchSummaryResponseDOList) {
private List<JobTaskBatchReason> jobTaskBatchReasonList(int jobTaskBatchStatus,
List<JobBatchSummaryResponseDO> jobBatchSummaryResponseDOList) {
List<JobTaskBatchReason> jobTaskBatchReasonArrayList = new ArrayList<>();
List<JobBatchSummaryResponseDO> summaryResponseDOList = jobBatchSummaryResponseDOList.stream().filter(i -> jobTaskBatchStatus == i.getTaskBatchStatus()).collect(Collectors.toList());
List<JobBatchSummaryResponseDO> summaryResponseDOList = jobBatchSummaryResponseDOList.stream()
.filter(i -> jobTaskBatchStatus == i.getTaskBatchStatus()).collect(Collectors.toList());
for (JobBatchSummaryResponseDO jobBatchSummaryResponseDO : summaryResponseDOList) {
JobTaskBatchReason jobTaskBatchReason = new JobTaskBatchReason();
jobTaskBatchReason.setReason(jobBatchSummaryResponseDO.getOperationReason());

View File

@ -8,6 +8,7 @@ import com.aizuda.easy.retry.server.common.config.SystemProperties;
import com.aizuda.easy.retry.server.common.dto.JobTaskBatchReason;
import com.aizuda.easy.retry.server.common.enums.SyetemTaskTypeEnum;
import com.aizuda.easy.retry.server.common.schedule.AbstractSchedule;
import com.aizuda.easy.retry.server.common.triple.Pair;
import com.aizuda.easy.retry.template.datasource.persistence.dataobject.JobBatchSummaryResponseDO;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobSummaryMapper;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.JobTaskBatchMapper;
@ -15,9 +16,12 @@ import com.aizuda.easy.retry.template.datasource.persistence.po.JobSummary;
import com.aizuda.easy.retry.template.datasource.persistence.po.JobTaskBatch;
import com.aizuda.easy.retry.template.datasource.persistence.po.WorkflowTaskBatch;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.time.Duration;
import java.time.LocalDate;
@ -26,6 +30,8 @@ import java.time.LocalTime;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
/**
@ -70,27 +76,64 @@ public class WorkflowJobSummarySchedule extends AbstractSchedule implements Life
LocalDateTime todayFrom = LocalDateTime.of(LocalDate.now(), LocalTime.MIN).plusDays(-i);
LocalDateTime todayTo = LocalDateTime.of(LocalDate.now(), LocalTime.MAX).plusDays(-i);
LambdaQueryWrapper<WorkflowTaskBatch> wrapper = new LambdaQueryWrapper<WorkflowTaskBatch>()
.between(WorkflowTaskBatch::getCreateDt, todayFrom, todayTo)
.groupBy(WorkflowTaskBatch::getNamespaceId, WorkflowTaskBatch::getGroupName,
WorkflowTaskBatch::getWorkflowId, WorkflowTaskBatch::getTaskBatchStatus, WorkflowTaskBatch::getOperationReason);
List<JobBatchSummaryResponseDO> summaryWorkflowResponseDOList = jobTaskBatchMapper.summaryWorkflowTaskBatchList(wrapper);
if (summaryWorkflowResponseDOList == null || summaryWorkflowResponseDOList.size() < 1) {
.between(WorkflowTaskBatch::getCreateDt, todayFrom, todayTo)
.groupBy(WorkflowTaskBatch::getNamespaceId, WorkflowTaskBatch::getGroupName,
WorkflowTaskBatch::getWorkflowId, WorkflowTaskBatch::getTaskBatchStatus,
WorkflowTaskBatch::getOperationReason);
List<JobBatchSummaryResponseDO> summaryWorkflowResponseDOList = jobTaskBatchMapper.summaryWorkflowTaskBatchList(
wrapper);
if (CollectionUtils.isEmpty(summaryWorkflowResponseDOList)) {
continue;
}
// insertOrUpdate
List<JobSummary> jobSummaryList = jobSummaryList(todayFrom, summaryWorkflowResponseDOList);
int totalJobSummary = jobSummaryMapper.insertOrUpdate(jobSummaryList);
EasyRetryLog.LOCAL.debug("workflow job summary dashboard success todayFrom:[{}] todayTo:[{}] total:[{}]", todayFrom, todayTo, totalJobSummary);
List<JobSummary> jobSummaries = jobSummaryMapper.selectList(new LambdaQueryWrapper<JobSummary>()
.eq(JobSummary::getTriggerAt, todayFrom)
.in(JobSummary::getBusinessId, jobSummaryList.stream().map(JobSummary::getBusinessId).collect(
Collectors.toSet())));
Map<Pair<Long, LocalDateTime>, JobSummary> summaryMap = jobSummaries.stream()
.collect(
Collectors.toMap(jobSummary -> Pair.of(jobSummary.getBusinessId(), jobSummary.getTriggerAt()),
k -> k));
List<JobSummary> waitInserts = Lists.newArrayList();
List<JobSummary> waitUpdates = Lists.newArrayList();
for (final JobSummary jobSummary : jobSummaryList) {
if (Objects.isNull(
summaryMap.get(Pair.of(jobSummary.getBusinessId(), jobSummary.getTriggerAt())))) {
waitInserts.add(jobSummary);
} else {
waitUpdates.add(jobSummary);
}
}
int updateTotalJobSummary = 0;
if (!CollectionUtils.isEmpty(waitUpdates)) {
updateTotalJobSummary = jobSummaryMapper.batchUpdate(waitUpdates);
}
int insertTotalJobSummary = 0;
if (!CollectionUtils.isEmpty(waitInserts)) {
insertTotalJobSummary = jobSummaryMapper.batchInsert(waitInserts);
}
EasyRetryLog.LOCAL.debug(
"workflow job summary dashboard success todayFrom:[{}] todayTo:[{}] updateTotalJobSummary:[{}] insertTotalJobSummary:[{}]",
todayFrom, todayTo, updateTotalJobSummary, insertTotalJobSummary);
}
} catch (Exception e) {
EasyRetryLog.LOCAL.error("workflow job summary dashboard log error", e);
}
}
private List<JobSummary> jobSummaryList(LocalDateTime triggerAt, List<JobBatchSummaryResponseDO> summaryResponseDOList) {
private List<JobSummary> jobSummaryList(LocalDateTime triggerAt,
List<JobBatchSummaryResponseDO> summaryResponseDOList) {
List<JobSummary> jobSummaryList = new ArrayList<>();
Map<Long, List<JobBatchSummaryResponseDO>> jobIdListMap = summaryResponseDOList.parallelStream().collect(Collectors.groupingBy(JobBatchSummaryResponseDO::getJobId));
Map<Long, List<JobBatchSummaryResponseDO>> jobIdListMap = summaryResponseDOList.parallelStream()
.collect(Collectors.groupingBy(JobBatchSummaryResponseDO::getJobId));
for (Map.Entry<Long, List<JobBatchSummaryResponseDO>> job : jobIdListMap.entrySet()) {
JobSummary jobSummary = new JobSummary();
jobSummary.setBusinessId(job.getKey());
@ -103,9 +146,12 @@ public class WorkflowJobSummarySchedule extends AbstractSchedule implements Life
jobSummary.setStopNum(job.getValue().stream().mapToInt(JobBatchSummaryResponseDO::getStopNum).sum());
jobSummary.setCancelNum(job.getValue().stream().mapToInt(JobBatchSummaryResponseDO::getCancelNum).sum());
jobSummary.setFailReason(JsonUtil.toJsonString(jobTaskBatchReasonList(JobTaskBatchStatusEnum.FAIL.getStatus(), job.getValue())));
jobSummary.setStopReason(JsonUtil.toJsonString(jobTaskBatchReasonList(JobTaskBatchStatusEnum.STOP.getStatus(), job.getValue())));
jobSummary.setCancelReason(JsonUtil.toJsonString(jobTaskBatchReasonList(JobTaskBatchStatusEnum.CANCEL.getStatus(), job.getValue())));
jobSummary.setFailReason(
JsonUtil.toJsonString(jobTaskBatchReasonList(JobTaskBatchStatusEnum.FAIL.getStatus(), job.getValue())));
jobSummary.setStopReason(
JsonUtil.toJsonString(jobTaskBatchReasonList(JobTaskBatchStatusEnum.STOP.getStatus(), job.getValue())));
jobSummary.setCancelReason(JsonUtil.toJsonString(
jobTaskBatchReasonList(JobTaskBatchStatusEnum.CANCEL.getStatus(), job.getValue())));
jobSummaryList.add(jobSummary);
}
return jobSummaryList;
@ -118,9 +164,11 @@ public class WorkflowJobSummarySchedule extends AbstractSchedule implements Life
* @param jobBatchSummaryResponseDOList
* @return
*/
private List<JobTaskBatchReason> jobTaskBatchReasonList(int jobTaskBatchStatus, List<JobBatchSummaryResponseDO> jobBatchSummaryResponseDOList) {
private List<JobTaskBatchReason> jobTaskBatchReasonList(int jobTaskBatchStatus,
List<JobBatchSummaryResponseDO> jobBatchSummaryResponseDOList) {
List<JobTaskBatchReason> jobTaskBatchReasonArrayList = new ArrayList<>();
List<JobBatchSummaryResponseDO> summaryResponseDOList = jobBatchSummaryResponseDOList.stream().filter(i -> jobTaskBatchStatus == i.getTaskBatchStatus()).collect(Collectors.toList());
List<JobBatchSummaryResponseDO> summaryResponseDOList = jobBatchSummaryResponseDOList.stream()
.filter(i -> jobTaskBatchStatus == i.getTaskBatchStatus()).collect(Collectors.toList());
for (JobBatchSummaryResponseDO jobBatchSummaryResponseDO : summaryResponseDOList) {
JobTaskBatchReason jobTaskBatchReason = new JobTaskBatchReason();
jobTaskBatchReason.setReason(jobBatchSummaryResponseDO.getOperationReason());

View File

@ -4,15 +4,21 @@ import com.aizuda.easy.retry.common.log.EasyRetryLog;
import com.aizuda.easy.retry.server.common.Lifecycle;
import com.aizuda.easy.retry.server.common.config.SystemProperties;
import com.aizuda.easy.retry.server.common.schedule.AbstractSchedule;
import com.aizuda.easy.retry.server.common.triple.Pair;
import com.aizuda.easy.retry.server.common.triple.Triple;
import com.aizuda.easy.retry.template.datasource.persistence.dataobject.DashboardRetryResponseDO;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.RetrySummaryMapper;
import com.aizuda.easy.retry.template.datasource.persistence.mapper.RetryTaskLogMapper;
import com.aizuda.easy.retry.template.datasource.persistence.po.JobSummary;
import com.aizuda.easy.retry.template.datasource.persistence.po.RetrySummary;
import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTaskLog;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.time.Duration;
import java.time.LocalDate;
@ -20,6 +26,10 @@ import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
/**
* Retry Dashboard
@ -66,20 +76,63 @@ public class RetrySummarySchedule extends AbstractSchedule implements Lifecycle
.between(RetryTaskLog::getCreateDt, todayFrom, todayTo)
.groupBy(RetryTaskLog::getNamespaceId, RetryTaskLog::getGroupName, RetryTaskLog::getSceneName);
List<DashboardRetryResponseDO> dashboardRetryResponseDOList = retryTaskLogMapper.retrySummaryRetryTaskLogList(wrapper);
if (dashboardRetryResponseDOList == null || dashboardRetryResponseDOList.size() < 1) {
if (CollectionUtils.isEmpty(dashboardRetryResponseDOList)) {
continue;
}
// insertOrUpdate
List<RetrySummary> retrySummaryList = retrySummaryList(todayFrom, dashboardRetryResponseDOList);
int totalRetrySummary = retrySummaryMapper.insertOrUpdate(retrySummaryList);
EasyRetryLog.LOCAL.debug("retry summary dashboard success todayFrom:[{}] todayTo:[{}] total:[{}]", todayFrom, todayTo, totalRetrySummary);
Set<String> groupNames = Sets.newHashSet();
Set<String> namespaceIds = Sets.newHashSet();
Set<String> sceneNames = Sets.newHashSet();
for (final RetrySummary retrySummary : retrySummaryList) {
groupNames.add(retrySummary.getGroupName());
namespaceIds.add(retrySummary.getNamespaceId());
sceneNames.add(retrySummary.getSceneName());
}
List<RetrySummary> retrySummaries = retrySummaryMapper.selectList(new LambdaQueryWrapper<RetrySummary>()
.in(RetrySummary::getGroupName, groupNames)
.in(RetrySummary::getNamespaceId, namespaceIds)
.in(RetrySummary::getSceneName, sceneNames)
.eq(RetrySummary::getTriggerAt, todayFrom)
);
Map<Triple<String, String, LocalDateTime>, RetrySummary> summaryMap = retrySummaries.stream()
.collect(Collectors.toMap(retrySummary -> Triple.of(mergeKey(retrySummary), retrySummary.getSceneName(), retrySummary.getTriggerAt()), k -> k));
List<RetrySummary> waitInserts = Lists.newArrayList();
List<RetrySummary> waitUpdates = Lists.newArrayList();
for (final RetrySummary retrySummary : retrySummaries) {
if (Objects.isNull(summaryMap.get(Triple.of(mergeKey(retrySummary), retrySummary.getSceneName(), retrySummary.getTriggerAt())))) {
waitInserts.add(retrySummary);
} else {
waitUpdates.add(retrySummary);
}
}
int insertTotalRetrySummary = 0;
if (!CollectionUtils.isEmpty(waitInserts)) {
insertTotalRetrySummary = retrySummaryMapper.batchInsert(waitInserts);
}
int updateTotalRetrySummary = 0;
if (!CollectionUtils.isEmpty(waitUpdates)) {
updateTotalRetrySummary = retrySummaryMapper.batchUpdate(waitUpdates);
}
EasyRetryLog.LOCAL.debug("retry summary dashboard success todayFrom:[{}] todayTo:[{}] insertTotalRetrySummary:[{}] updateTotalRetrySummary:[{}]", todayFrom, todayTo, insertTotalRetrySummary, updateTotalRetrySummary);
}
} catch (Exception e) {
EasyRetryLog.LOCAL.error("retry summary dashboard log error", e);
}
}
private String mergeKey(final RetrySummary retrySummary) {
return retrySummary.getGroupName() + retrySummary.getNamespaceId();
}
private List<RetrySummary> retrySummaryList(LocalDateTime triggerAt, List<DashboardRetryResponseDO> dashboardRetryResponseDOList) {
List<RetrySummary> retrySummaryList = new ArrayList<>();
for (DashboardRetryResponseDO dashboardRetryResponseDO : dashboardRetryResponseDOList) {