feat:2.3.0

1. 新增任务手动触发执行功能
This commit is contained in:
byteblogs168 2023-09-11 23:06:39 +08:00
parent 5a60d5108e
commit ab65e55fdc
7 changed files with 257 additions and 7 deletions

View File

@ -71,4 +71,20 @@ public interface RetryTaskService {
* @return
*/
Integer parseLogs(ParseLogsVO parseLogsVO);
/**
* 手动支持重试任务
*
* @param requestVO
* @return
*/
boolean manualTriggerRetryTask(ManualTriggerTaskRequestVO requestVO);
/**
* 手动执行回调任务
*
* @param requestVO
* @return
*/
boolean manualTriggerCallbackTask(ManualTriggerTaskRequestVO requestVO);
}

View File

@ -1,20 +1,32 @@
package com.aizuda.easy.retry.server.service.impl;
import akka.actor.ActorRef;
import cn.hutool.core.lang.Assert;
import com.aizuda.easy.retry.client.model.DispatchRetryResultDTO;
import com.aizuda.easy.retry.client.model.GenerateRetryIdempotentIdDTO;
import com.aizuda.easy.retry.common.core.enums.RetryStatusEnum;
import com.aizuda.easy.retry.common.core.model.Result;
import com.aizuda.easy.retry.common.core.util.JsonUtil;
import com.aizuda.easy.retry.server.akka.ActorGenerator;
import com.aizuda.easy.retry.server.dto.RegisterNodeInfo;
import com.aizuda.easy.retry.server.enums.TaskGeneratorScene;
import com.aizuda.easy.retry.server.enums.TaskTypeEnum;
import com.aizuda.easy.retry.server.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.model.dto.RetryTaskDTO;
import com.aizuda.easy.retry.server.service.RetryTaskService;
import com.aizuda.easy.retry.server.service.convert.RetryTaskResponseVOConverter;
import com.aizuda.easy.retry.server.service.convert.TaskContextConverter;
import com.aizuda.easy.retry.server.support.IdempotentStrategy;
import com.aizuda.easy.retry.server.support.WaitStrategy;
import com.aizuda.easy.retry.server.support.context.CallbackRetryContext;
import com.aizuda.easy.retry.server.support.context.MaxAttemptsPersistenceRetryContext;
import com.aizuda.easy.retry.server.support.generator.TaskGenerator;
import com.aizuda.easy.retry.server.support.generator.task.TaskContext;
import com.aizuda.easy.retry.server.support.handler.ClientNodeAllocateHandler;
import com.aizuda.easy.retry.server.support.retry.RetryBuilder;
import com.aizuda.easy.retry.server.support.retry.RetryExecutor;
import com.aizuda.easy.retry.server.support.strategy.FilterStrategies;
import com.aizuda.easy.retry.server.support.strategy.StopStrategies;
import com.aizuda.easy.retry.server.support.strategy.WaitStrategies;
import com.aizuda.easy.retry.server.web.model.base.PageResult;
import com.aizuda.easy.retry.server.web.model.request.*;
@ -26,12 +38,14 @@ import com.aizuda.easy.retry.template.datasource.persistence.mapper.RetryTaskLog
import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTask;
import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTaskLog;
import com.aizuda.easy.retry.template.datasource.persistence.po.RetryTaskLogMessage;
import com.aizuda.easy.retry.template.datasource.persistence.po.SceneConfig;
import com.aizuda.easy.retry.template.datasource.utils.RequestDataHelper;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO;
import org.apache.commons.lang.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.http.HttpEntity;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@ -68,6 +82,9 @@ public class RetryTaskServiceImpl implements RetryTaskService {
private AccessTemplate accessTemplate;
@Autowired
private List<TaskGenerator> taskGenerators;
@Autowired
@Qualifier("bitSetIdempotentStrategyHandler")
protected IdempotentStrategy<String, Integer> idempotentStrategy;
@Override
public PageResult<List<RetryTaskResponseVO>> getRetryTaskPage(RetryTaskQueryVO queryVO) {
@ -286,4 +303,107 @@ public class RetryTaskServiceImpl implements RetryTaskService {
return waitInsertList.size();
}
@Override
public boolean manualTriggerRetryTask(ManualTriggerTaskRequestVO requestVO) {
List<String> uniqueIds = requestVO.getUniqueIds();
String groupName = requestVO.getGroupName();
List<RetryTask> list = accessTemplate.getRetryTaskAccess().list(requestVO.getGroupName(),
new LambdaQueryWrapper<RetryTask>()
.eq(RetryTask::getTaskType, TaskTypeEnum.RETRY.getType())
.in(RetryTask::getUniqueId, uniqueIds));
Assert.notEmpty(list, () -> new EasyRetryServerException("没有可执行的任务"));
for (RetryTask retryTask : list) {
MaxAttemptsPersistenceRetryContext<Result<DispatchRetryResultDTO>> retryContext = new MaxAttemptsPersistenceRetryContext<>();
retryContext.setRetryTask(retryTask);
retryContext.setSceneBlacklist(accessTemplate.getSceneConfigAccess().getBlacklist(groupName));
retryContext.setServerNode(clientNodeAllocateHandler.getServerNode(retryTask.getGroupName()));
retryCountIncrement(retryTask);
RetryExecutor<Result<DispatchRetryResultDTO>> executor = RetryBuilder.<Result<DispatchRetryResultDTO>>newBuilder()
.withStopStrategy(StopStrategies.stopException())
.withStopStrategy(StopStrategies.stopResultStatusCode())
.withWaitStrategy(getRetryTaskWaitWaitStrategy(retryTask.getGroupName(), retryTask.getSceneName()))
.withFilterStrategy(FilterStrategies.bitSetIdempotentFilter(idempotentStrategy))
.withFilterStrategy(FilterStrategies.checkAliveClientPodFilter())
.withFilterStrategy(FilterStrategies.rebalanceFilterStrategies())
.withFilterStrategy(FilterStrategies.rateLimiterFilter())
.withRetryContext(retryContext)
.build();
Assert.isTrue(executor.filter(), () -> new EasyRetryServerException("任务:{}不满足执行条件.具体原因请查看系统日志", retryTask.getUniqueId()));
productExecUnitActor(executor, ActorGenerator.execUnitActor());
}
return true;
}
@Override
public boolean manualTriggerCallbackTask(ManualTriggerTaskRequestVO requestVO) {
List<String> uniqueIds = requestVO.getUniqueIds();
String groupName = requestVO.getGroupName();
List<RetryTask> list = accessTemplate.getRetryTaskAccess().list(requestVO.getGroupName(),
new LambdaQueryWrapper<RetryTask>()
.eq(RetryTask::getTaskType, TaskTypeEnum.CALLBACK.getType())
.in(RetryTask::getUniqueId, uniqueIds));
Assert.notEmpty(list, () -> new EasyRetryServerException("没有可执行的任务"));
for (RetryTask retryTask : list) {
CallbackRetryContext<Result> retryContext = new CallbackRetryContext<>();
retryContext.setRetryTask(retryTask);
retryContext.setSceneBlacklist(accessTemplate.getSceneConfigAccess().getBlacklist(groupName));
retryContext.setServerNode(clientNodeAllocateHandler.getServerNode(retryTask.getGroupName()));
retryCountIncrement(retryTask);
RetryExecutor<Result> executor = RetryBuilder.<Result>newBuilder()
.withStopStrategy(StopStrategies.stopException())
.withStopStrategy(StopStrategies.stopResultStatusCode())
.withWaitStrategy(getCallbackWaitWaitStrategy())
.withFilterStrategy(FilterStrategies.bitSetIdempotentFilter(idempotentStrategy))
.withFilterStrategy(FilterStrategies.checkAliveClientPodFilter())
.withFilterStrategy(FilterStrategies.rebalanceFilterStrategies())
.withFilterStrategy(FilterStrategies.rateLimiterFilter())
.withRetryContext(retryContext)
.build();
Assert.isTrue(executor.filter(), () -> new EasyRetryServerException("任务:{}不满足执行条件.具体原因请查看系统日志", retryTask.getUniqueId()));
productExecUnitActor(executor, ActorGenerator.execCallbackUnitActor());
}
return true;
}
private WaitStrategy getRetryTaskWaitWaitStrategy(String groupName, String sceneName) {
SceneConfig sceneConfig = accessTemplate.getSceneConfigAccess().getSceneConfigByGroupNameAndSceneName(groupName, sceneName);
Integer backOff = sceneConfig.getBackOff();
return WaitStrategies.WaitStrategyEnum.getWaitStrategy(backOff);
}
private WaitStrategy getCallbackWaitWaitStrategy() {
// 回调失败每15min重试一次
return WaitStrategies.WaitStrategyEnum.getWaitStrategy(WaitStrategies.WaitStrategyEnum.FIXED.getBackOff());
}
private void retryCountIncrement(RetryTask retryTask) {
Integer retryCount = retryTask.getRetryCount();
retryTask.setRetryCount(++retryCount);
}
private void productExecUnitActor(RetryExecutor retryExecutor, ActorRef actorRef) {
String groupIdHash = retryExecutor.getRetryContext().getRetryTask().getGroupName();
Long retryId = retryExecutor.getRetryContext().getRetryTask().getId();
idempotentStrategy.set(groupIdHash, retryId.intValue());
actorRef.tell(retryExecutor, actorRef);
}
}

View File

@ -120,7 +120,6 @@ public abstract class AbstractScanGroup extends AbstractActor {
Long retryId = retryExecutor.getRetryContext().getRetryTask().getId();
idempotentStrategy.set(groupIdHash, retryId.intValue());
// 重试成功回调客户端
ActorRef actorRef = getActorRef();
actorRef.tell(retryExecutor, actorRef);
}

View File

@ -81,4 +81,16 @@ public class RetryTaskController {
public Integer parseLogs(@RequestBody @Validated ParseLogsVO parseLogsVO) {
return retryTaskService.parseLogs(parseLogsVO);
}
@LoginRequired
@PostMapping("/manual/trigger/retry/task")
public boolean manualTriggerRetryTask(@RequestBody @Validated ManualTriggerTaskRequestVO requestVO) {
return retryTaskService.manualTriggerRetryTask(requestVO);
}
@LoginRequired
@PostMapping("/manual/trigger/callback/task")
public boolean manualTriggerCallbackTask(@RequestBody @Validated ManualTriggerTaskRequestVO requestVO) {
return retryTaskService.manualTriggerCallbackTask(requestVO);
}
}

View File

@ -0,0 +1,25 @@
package com.aizuda.easy.retry.server.web.model.request;
import lombok.Data;
import javax.validation.constraints.NotBlank;
import javax.validation.constraints.NotEmpty;
import javax.validation.constraints.Pattern;
import java.util.List;
/**
* @author www.byteblogs.com
* @date 2023-09-11 22:00:26
* @since 2.3.0
*/
@Data
public class ManualTriggerTaskRequestVO {
@NotBlank(message = "groupName 不能为空")
@Pattern(regexp = "^[A-Za-z0-9_]{1,64}$", message = "仅支持长度为1~64字符且类型为数字、字母和下划线")
private String groupName;
@NotEmpty(message = "uniqueIds 不能为空")
private List<String> uniqueIds;
}

View File

@ -19,6 +19,8 @@ const api = {
batchUpdate: '/retry-task/batch',
deleteRetryTask: '/retry-task/batch',
updateRetryTaskStatus: '/retry-task/status',
manualTriggerRetryTask: '/retry-task/manual/trigger/retry/task',
manualTriggerCallbackTask: '/retry-task/manual/trigger/callback/task',
retryTaskLogPage: '/retry-task-log/list',
retryTaskLogMessagePage: '/retry-task-log/message/list',
retryTaskLogById: '/retry-task-log/',
@ -209,6 +211,22 @@ export function updateRetryTaskStatus (data) {
})
}
export function manualTriggerCallbackTask (data) {
return request({
url: api.manualTriggerCallbackTask,
method: 'post',
data
})
}
export function manualTriggerRetryTask (data) {
return request({
url: api.manualTriggerRetryTask,
method: 'post',
data
})
}
export function getScenePage (parameter) {
return request({
url: api.scenePageList,

View File

@ -104,12 +104,42 @@
<template>
<a @click="handleInfo(record)">详情</a>
<a-divider type="vertical" />
<a @click="handleSuspend(record)" v-if="record.retryStatus === 0">暂停</a>
<a-popconfirm
title="是否暂停?"
ok-text="恢复"
cancel-text="取消"
@confirm="handleSuspend(record)"
>
<a href="javascript:;" v-if="record.retryStatus === 0">暂停</a>
</a-popconfirm>
<a-divider type="vertical" v-if="record.retryStatus === 0" />
<a @click="handleRecovery(record)" v-if="record.retryStatus === 3">恢复</a>
<a-popconfirm
title="是否恢复?"
ok-text="恢复"
cancel-text="取消"
@confirm="handleRecovery(record)"
>
<a href="javascript:;" v-if="record.retryStatus === 3">恢复</a>
</a-popconfirm>
<a-divider type="vertical" v-if="record.retryStatus === 3" />
<a @click="handleFinish(record)" v-if="record.retryStatus !== 1">完成</a>
<a-divider type="vertical" v-if="record.retryStatus !== 1" />
<a-popconfirm
title="是否完成?"
ok-text="完成"
cancel-text="取消"
@confirm="handleFinish(record)"
>
<a href="javascript:;" v-if="record.retryStatus !== 1 && record.retryStatus !== 2">完成</a>
</a-popconfirm>
<a-divider type="vertical" v-if="record.retryStatus !== 1 && record.retryStatus !== 2" />
<a-popconfirm
title="是否执行任务?"
ok-text="执行"
cancel-text="取消"
@confirm="handleTrigger(record)"
>
<a href="javascript:;" v-if="record.retryStatus !== 1 && record.retryStatus !== 2">执行</a>
</a-popconfirm>
</template>
</span>
</s-table>
@ -124,7 +154,14 @@
<script>
import ATextarea from 'ant-design-vue/es/input/TextArea'
import AInput from 'ant-design-vue/es/input/Input'
import { getAllGroupNameList, getRetryTaskPage, getSceneList, updateRetryTaskStatus, batchDelete } from '@/api/manage'
import {
getAllGroupNameList,
getRetryTaskPage,
getSceneList,
updateRetryTaskStatus,
batchDelete,
manualTriggerCallbackTask, manualTriggerRetryTask
} from '@/api/manage'
import { STable } from '@/components'
import SaveRetryTask from './form/SaveRetryTask'
import BatchUpdateRetryTaskInfo from './form/BatchUpdateRetryTaskInfo'
@ -245,7 +282,7 @@ export default {
title: '操作',
fixed: 'right',
dataIndex: 'action',
width: '150px',
width: '180px',
scopedSlots: { customRender: 'action' }
}
],
@ -338,6 +375,29 @@ export default {
}
})
},
handleTrigger (record) {
if (record.taskType === 1) {
manualTriggerRetryTask({ groupName: record.groupName, uniqueIds: [ record.uniqueId ] }).then(res => {
const { status } = res
if (status === 0) {
this.$message.error('执行失败')
} else {
this.$refs.table.refresh(true)
this.$message.success('重试完成成功')
}
})
} else {
manualTriggerCallbackTask({ groupName: record.groupName, uniqueIds: [ record.uniqueId ] }).then(res => {
const { status } = res
if (status === 0) {
this.$message.error('执行失败')
} else {
this.$refs.table.refresh(true)
this.$message.success('重试完成成功')
}
})
}
},
refreshTable (v) {
this.$refs.table.refresh(true)
},