feat: 2.6.0

1. 添加工作流手动执行和手动终止任务
This commit is contained in:
byteblogs168 2024-01-14 22:52:39 +08:00
parent 0adb8eb87e
commit b32c5e6c75
17 changed files with 126 additions and 58 deletions

View File

@ -68,7 +68,7 @@ public interface SystemConstants {
/**
* 批量日志上报
*/
String BATCH_LOG_REPORT = "/batch/server/reportLog";
String BATCH_LOG_REPORT = "/batch/server/report/log";
/**
* 上报job的运行结果

View File

@ -51,7 +51,7 @@ public class JobTaskPrepareDTO {
private boolean onlyTimeoutCheck;
/**
* 执行策略 1auto 2manual 3workflow
* 执行策略 1auto_job 2manual_job 3auto_workflow 4manual_workflow
*/
private Integer taskExecutorScene;

View File

@ -34,6 +34,11 @@ public interface WorkflowTaskConverter {
)
WorkflowTaskPrepareDTO toWorkflowTaskPrepareDTO(WorkflowPartitionTaskDTO workflowPartitionTaskDTO);
@Mappings(
@Mapping(source = "id", target = "workflowId")
)
WorkflowTaskPrepareDTO toWorkflowTaskPrepareDTO(Workflow workflow);
WorkflowTaskBatchGeneratorContext toWorkflowTaskBatchGeneratorContext(WorkflowTaskPrepareDTO workflowTaskPrepareDTO);
WorkflowTaskBatch toWorkflowTaskBatch(WorkflowTaskBatchGeneratorContext context);

View File

@ -3,6 +3,8 @@ package com.aizuda.easy.retry.server.job.task.support.dispatch;
import akka.actor.AbstractActor;
import com.aizuda.easy.retry.common.core.context.SpringContext;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.enums.JobTaskExecutorSceneEnum;
import com.aizuda.easy.retry.server.common.enums.TaskTypeEnum;
import com.aizuda.easy.retry.server.job.task.dto.JobTaskPrepareDTO;
import com.aizuda.easy.retry.server.job.task.support.JobPrePareHandler;
import com.aizuda.easy.retry.server.job.task.support.prepare.job.TerminalJobPrepareHandler;
@ -50,11 +52,18 @@ public class JobTaskPrepareActor extends AbstractActor {
}
private void doPrepare(JobTaskPrepareDTO prepare) {
LambdaQueryWrapper<JobTaskBatch> queryWrapper = new LambdaQueryWrapper<JobTaskBatch>()
.eq(JobTaskBatch::getJobId, prepare.getJobId())
.in(JobTaskBatch::getTaskBatchStatus, NOT_COMPLETE);
JobTaskExecutorSceneEnum jobTaskExecutorSceneEnum = JobTaskExecutorSceneEnum.get(
prepare.getTaskExecutorScene());
if (TaskTypeEnum.WORKFLOW.getType().equals(jobTaskExecutorSceneEnum.getTaskType().getType())) {
queryWrapper.eq(JobTaskBatch::getWorkflowNodeId, prepare.getWorkflowNodeId());
}
List<JobTaskBatch> notCompleteJobTaskBatchList = jobTaskBatchMapper
.selectList(new LambdaQueryWrapper<JobTaskBatch>()
.eq(JobTaskBatch::getJobId, prepare.getJobId())
.in(JobTaskBatch::getTaskBatchStatus, NOT_COMPLETE));
.selectList(queryWrapper);
// 说明所以任务已经完成
if (CollectionUtils.isEmpty(notCompleteJobTaskBatchList)) {

View File

@ -60,7 +60,7 @@ public class ScanWorkflowTaskActor extends AbstractActor {
}
private void doScan(ScanTask scanTask) {
PartitionTaskUtils.process(startId -> listAvailableJobs(startId, scanTask),
PartitionTaskUtils.process(startId -> listAvailableWorkflows(startId, scanTask),
this::processPartitionTasks, 0);
}
@ -70,7 +70,7 @@ public class ScanWorkflowTaskActor extends AbstractActor {
long now = DateUtils.toNowMilli();
for (PartitionTask partitionTask : partitionTasks) {
WorkflowPartitionTaskDTO workflowPartitionTaskDTO = (WorkflowPartitionTaskDTO) partitionTask;
processJob(workflowPartitionTaskDTO, waitUpdateJobs, waitExecWorkflows, now);
processWorkflow(workflowPartitionTaskDTO, waitUpdateJobs, waitExecWorkflows, now);
}
// 批量更新
@ -84,8 +84,8 @@ public class ScanWorkflowTaskActor extends AbstractActor {
}
}
private void processJob(WorkflowPartitionTaskDTO partitionTask, List<Workflow> waitUpdateWorkflows,
List<WorkflowTaskPrepareDTO> waitExecJobs, long now) {
private void processWorkflow(WorkflowPartitionTaskDTO partitionTask, List<Workflow> waitUpdateWorkflows,
List<WorkflowTaskPrepareDTO> waitExecJobs, long now) {
CacheConsumerGroup.addOrUpdate(partitionTask.getGroupName(), partitionTask.getNamespaceId());
Workflow workflow = new Workflow();
@ -116,7 +116,7 @@ public class ScanWorkflowTaskActor extends AbstractActor {
return waitStrategy.computeTriggerTime(waitStrategyContext);
}
private List<WorkflowPartitionTaskDTO> listAvailableJobs(Long startId, ScanTask scanTask) {
private List<WorkflowPartitionTaskDTO> listAvailableWorkflows(Long startId, ScanTask scanTask) {
if (CollectionUtils.isEmpty(scanTask.getBuckets())) {
return Collections.emptyList();
}

View File

@ -74,7 +74,7 @@ public class WorkflowExecutorActor extends AbstractActor {
}).build();
}
private void doExecutor(WorkflowNodeTaskExecuteDTO taskExecute) throws IOException {
private void doExecutor(WorkflowNodeTaskExecuteDTO taskExecute) {
WorkflowTaskBatch workflowTaskBatch = workflowTaskBatchMapper.selectById(taskExecute.getWorkflowTaskBatchId());
Assert.notNull(workflowTaskBatch, () -> new EasyRetryServerException("任务不存在"));

View File

@ -163,7 +163,7 @@ public abstract class AbstractWorkflowExecutor implements WorkflowExecutor, Init
}
@Override
public void afterPropertiesSet() throws Exception {
public void afterPropertiesSet() {
WorkflowExecutorFactory.registerJobExecutor(getWorkflowNodeType(), this);
}
}

View File

@ -7,10 +7,7 @@ import com.aizuda.easy.retry.server.web.model.response.WorkflowBatchResponseVO;
import com.aizuda.easy.retry.server.web.model.response.WorkflowDetailResponseVO;
import com.aizuda.easy.retry.server.web.service.WorkflowBatchService;
import lombok.RequiredArgsConstructor;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.bind.annotation.*;
import java.util.List;
@ -36,4 +33,9 @@ public class WorkflowBatchController {
public WorkflowDetailResponseVO getWorkflowBatchDetail(@PathVariable("id") Long id) {
return workflowBatchService.getWorkflowBatchDetail(id);
}
@PostMapping("/stop/{id}")
public Boolean stop(@PathVariable("id") Long id) {
return workflowBatchService.stop(id);
}
}

View File

@ -62,24 +62,9 @@ public class WorkflowController {
return workflowService.deleteById(id);
}
@PostMapping("/start")
public void startWorkflow() {
}
@PostMapping("/stop")
public void stopWorkflow() {
}
@PostMapping("/pause")
public void pauseWorkflow() {
}
@PostMapping("/resume")
public void resumeWorkflow() {
@PostMapping("/trigger/{id}")
public Boolean trigger(@PathVariable("id") Long id) {
return workflowService.trigger(id);
}

View File

@ -18,4 +18,5 @@ public interface WorkflowBatchService {
WorkflowDetailResponseVO getWorkflowBatchDetail(Long id);
Boolean stop(Long id);
}

View File

@ -27,4 +27,6 @@ public interface WorkflowService {
Boolean updateStatus(Long id);
Boolean deleteById(Long id);
Boolean trigger(Long id);
}

View File

@ -43,6 +43,8 @@ public class JobLogServiceImpl implements JobLogService {
queryWrapper
.select(JobLogMessage::getId, JobLogMessage::getLogNum)
.ge(JobLogMessage::getId, queryVO.getStartId())
.ge(JobLogMessage::getTaskBatchId, queryVO.getTaskBatchId())
.ge(JobLogMessage::getJobId, queryVO.getJobId())
.eq(JobLogMessage::getTaskId, queryVO.getTaskId());
queryWrapper.orderByAsc(JobLogMessage::getRealTime).orderByDesc(JobLogMessage::getId);

View File

@ -1,5 +1,6 @@
package com.aizuda.easy.retry.server.web.service.impl;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.util.StrUtil;
import com.aizuda.easy.retry.common.core.constant.SystemConstants;
import com.aizuda.easy.retry.common.core.enums.JobOperationReasonEnum;
@ -8,6 +9,7 @@ import com.aizuda.easy.retry.common.core.enums.StatusEnum;
import com.aizuda.easy.retry.server.common.dto.JobTaskConfig;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.job.task.support.cache.MutableGraphCache;
import com.aizuda.easy.retry.server.job.task.support.handler.WorkflowBatchHandler;
import com.aizuda.easy.retry.server.web.model.base.PageResult;
import com.aizuda.easy.retry.server.web.model.request.UserSessionVO;
import com.aizuda.easy.retry.server.web.model.request.WorkflowBatchQueryVO;
@ -59,6 +61,7 @@ public class WorkflowBatchServiceImpl implements WorkflowBatchService {
private final WorkflowNodeMapper workflowNodeMapper;
private final JobTaskBatchMapper jobTaskBatchMapper;
private final WorkflowHandler workflowHandler;
private final WorkflowBatchHandler workflowBatchHandler;
@Override
public PageResult<List<WorkflowBatchResponseVO>> listPage(WorkflowBatchQueryVO queryVO) {
@ -172,6 +175,15 @@ public class WorkflowBatchServiceImpl implements WorkflowBatchService {
return responseVO;
}
@Override
public Boolean stop(Long id) {
WorkflowTaskBatch workflowTaskBatch = workflowTaskBatchMapper.selectById(id);
Assert.notNull(workflowTaskBatch, () -> new EasyRetryServerException("workflow batch can not be null."));
workflowBatchHandler.stop(id, JobOperationReasonEnum.MANNER_STOP.getReason());
return Boolean.TRUE;
}
private static boolean isNoOperation(JobTaskBatch i) {
return i.getOperationReason() == JobOperationReasonEnum.WORKFLOW_NODE_NO_OPERATION_REQUIRED.getReason()
|| i.getTaskBatchStatus() == JobTaskBatchStatusEnum.STOP.getStatus();

View File

@ -8,11 +8,18 @@ import com.aizuda.easy.retry.common.core.enums.StatusEnum;
import com.aizuda.easy.retry.common.core.util.JsonUtil;
import com.aizuda.easy.retry.server.common.WaitStrategy;
import com.aizuda.easy.retry.server.common.config.SystemProperties;
import com.aizuda.easy.retry.server.common.enums.JobTaskExecutorSceneEnum;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.common.strategy.WaitStrategies;
import com.aizuda.easy.retry.server.common.util.CronUtils;
import com.aizuda.easy.retry.server.common.util.DateUtils;
import com.aizuda.easy.retry.server.common.util.GraphUtils;
import com.aizuda.easy.retry.server.job.task.dto.WorkflowPartitionTaskDTO;
import com.aizuda.easy.retry.server.job.task.dto.WorkflowTaskPrepareDTO;
import com.aizuda.easy.retry.server.job.task.support.WorkflowPrePareHandler;
import com.aizuda.easy.retry.server.job.task.support.WorkflowTaskConverter;
import com.aizuda.easy.retry.server.web.model.base.PageResult;
import com.aizuda.easy.retry.server.web.model.request.SceneConfigRequestVO;
import com.aizuda.easy.retry.server.web.model.request.UserSessionVO;
import com.aizuda.easy.retry.server.web.model.request.WorkflowQueryVO;
import com.aizuda.easy.retry.server.web.model.request.WorkflowRequestVO;
@ -34,6 +41,8 @@ import com.google.common.graph.GraphBuilder;
import com.google.common.graph.MutableGraph;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@ -55,6 +64,8 @@ public class WorkflowServiceImpl implements WorkflowService {
private final WorkflowNodeMapper workflowNodeMapper;
private final SystemProperties systemProperties;
private final WorkflowHandler workflowHandler;
@Lazy
private final WorkflowPrePareHandler terminalWorkflowPrepareHandler;
@Override
@Transactional
@ -93,6 +104,8 @@ public class WorkflowServiceImpl implements WorkflowService {
}
private static Long calculateNextTriggerAt(final WorkflowRequestVO workflowRequestVO, Long time) {
checkExecuteInterval(workflowRequestVO);
WaitStrategy waitStrategy = WaitStrategies.WaitStrategyEnum.getWaitStrategy(workflowRequestVO.getTriggerType());
WaitStrategies.WaitStrategyContext waitStrategyContext = new WaitStrategies.WaitStrategyContext();
waitStrategyContext.setTriggerInterval(workflowRequestVO.getTriggerInterval());
@ -100,6 +113,20 @@ public class WorkflowServiceImpl implements WorkflowService {
return waitStrategy.computeTriggerTime(waitStrategyContext);
}
private static void checkExecuteInterval(WorkflowRequestVO requestVO) {
if (Lists.newArrayList(WaitStrategies.WaitStrategyEnum.FIXED.getType(),
WaitStrategies.WaitStrategyEnum.RANDOM.getType())
.contains(requestVO.getTriggerType())) {
if (Integer.parseInt(requestVO.getTriggerInterval()) < 10) {
throw new EasyRetryServerException("触发间隔不得小于10");
}
} else if (requestVO.getTriggerType() == WaitStrategies.WaitStrategyEnum.CRON.getType()) {
if (CronUtils.getExecuteInterval(requestVO.getTriggerInterval()) < 10 * 1000) {
throw new EasyRetryServerException("触发间隔不得小于10");
}
}
}
@Override
public WorkflowDetailResponseVO getWorkflowDetail(Long id) {
@ -178,10 +205,11 @@ public class WorkflowServiceImpl implements WorkflowService {
workflow = new Workflow();
workflow.setId(workflowRequestVO.getId());
workflow.setVersion(version);
workflow.setNextTriggerAt(calculateNextTriggerAt(workflowRequestVO, DateUtils.toNowMilli()));
workflow.setFlowInfo(JsonUtil.toJsonString(GraphUtils.serializeGraphToJson(graph)));
Assert.isTrue(workflowMapper.update(workflow, new LambdaQueryWrapper<Workflow>()
.eq(Workflow::getId, workflow.getId())
.eq(Workflow::getVersion, version)
.eq(Workflow::getId, workflow.getId())
.eq(Workflow::getVersion, version)
) > 0, () -> new EasyRetryServerException("更新失败"));
return Boolean.TRUE;
@ -212,4 +240,19 @@ public class WorkflowServiceImpl implements WorkflowService {
return 1 == workflowMapper.updateById(workflow);
}
@Override
public Boolean trigger(Long id) {
Workflow workflow = workflowMapper.selectById(id);
Assert.notNull(workflow, () -> new EasyRetryServerException("workflow can not be null."));
WorkflowTaskPrepareDTO prepareDTO = WorkflowTaskConverter.INSTANCE.toWorkflowTaskPrepareDTO(workflow);
// 设置now表示立即执行
prepareDTO.setNextTriggerAt(DateUtils.toNowMilli());
prepareDTO.setTaskExecutorScene(JobTaskExecutorSceneEnum.MANUAL_WORKFLOW.getType());
terminalWorkflowPrepareHandler.handler(prepareDTO);
return Boolean.TRUE;
}
}

View File

@ -37,11 +37,27 @@ const jobApi = {
workflowBatchListPage: '/workflow/batch/page/list',
workflowBatchDetail: '/workflow/batch/',
updateStatus: '/workflow/update/status/',
delWorkflow: '/workflow/'
delWorkflow: '/workflow/',
triggerWorkflow: '/workflow/trigger/',
stopWorkflowBatch: '/workflow/batch/stop/'
}
export default jobApi
export function stopWorkflowBatch (id) {
return request({
url: jobApi.stopWorkflowBatch + id,
method: 'post'
})
}
export function triggerWorkflow (id) {
return request({
url: jobApi.triggerWorkflow + id,
method: 'post'
})
}
export function delWorkflow (id) {
return request({
url: jobApi.delWorkflow + id,

View File

@ -107,7 +107,7 @@
import ATextarea from 'ant-design-vue/es/input/TextArea'
import AInput from 'ant-design-vue/es/input/Input'
import { Drawer, STable } from '@/components'
import { workflowBatchListPage, jobNameList } from '@/api/jobApi'
import { workflowBatchListPage, jobNameList, stopWorkflowBatch } from '@/api/jobApi'
import { getAllGroupNameList } from '@/api/manage'
import JobBatchInfo from '@/views/job/JobBatchInfo'
const enums = require('@/utils/jobEnum')
@ -218,15 +218,6 @@ export default {
getAllGroupNameList().then((res) => {
this.groupNameList = res.data
})
// jobNameList({ jobId: jobId }).then(res => {
// this.jobNameList = res.data
// console.log(jobId)
// if (jobId) {
// this.queryParam['jobId'] = this.jobNameList[0].id
// this.$refs.table.refresh(true)
// }
// })
},
methods: {
handleSearch (value) {
@ -246,15 +237,15 @@ export default {
},
handleOk (record) {},
handleStop (record) {
// stop(record.id).then((res) => {
// const { status } = res
// if (status === 0) {
// this.$message.error('')
// } else {
// this.$refs.table.refresh(true)
// this.$message.success('')
// }
// })
stopWorkflowBatch(record.id).then((res) => {
const { status } = res
if (status === 0) {
this.$message.error('停止失败')
} else {
this.$refs.table.refresh(true)
this.$message.success('停止成功')
}
})
},
refreshTable (v) {
this.$refs.table.refresh(true)

View File

@ -151,7 +151,7 @@
import ATextarea from 'ant-design-vue/es/input/TextArea'
import AInput from 'ant-design-vue/es/input/Input'
import { STable, Drawer } from '@/components'
import { workflowListPage, triggerJob, updateWorkflowStatus, delWorkflow } from '@/api/jobApi'
import { workflowListPage, updateWorkflowStatus, delWorkflow, triggerWorkflow } from '@/api/jobApi'
import { getAllGroupNameList } from '@/api/manage'
import enums from '@/utils/jobEnum'
import JobInfo from '@/views/job/JobInfo'
@ -291,7 +291,7 @@ export default {
},
handleOk (record) {},
handleTrigger (record) {
triggerJob(record.id).then((res) => {
triggerWorkflow(record.id).then((res) => {
const { status } = res
if (status === 0) {
this.$message.error('执行失败')