feat:2.4.0

1. 修复执行器失败导致事务回滚问题
This commit is contained in:
byteblogs168 2023-10-30 00:14:42 +08:00
parent b4836413f2
commit fcf360bbdd
52 changed files with 99 additions and 153 deletions

View File

@ -268,8 +268,8 @@ CREATE TABLE `job_task` (
`retry_count` int(11) NOT NULL DEFAULT '0' COMMENT '重试次数',
`client_info` varchar(255) DEFAULT NULL COMMENT '客户端地址 clientId#ip:port',
`result_message` text NOT NULL COMMENT '执行结果',
`args_str` text NOT NULL COMMENT '执行方法参数',
`args_type` varchar(16) NOT NULL DEFAULT '' COMMENT '参数类型 text/json',
`args_str` text DEFAULT NULL COMMENT '执行方法参数',
`args_type` tinyint(4) NOT NULL DEFAULT '1' COMMENT '参数类型 ',
`ext_attrs` varchar(256) NULL default '' COMMENT '扩展字段',
`create_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',

View File

@ -16,6 +16,7 @@ import com.google.common.util.concurrent.FutureCallback;
import lombok.extern.slf4j.Slf4j;
import java.util.Objects;
import java.util.concurrent.CancellationException;
/**
* @author: www.byteblogs.com
@ -71,8 +72,16 @@ public class JobExecutorFutureCallback implements FutureCallback<ExecuteResult>
// 上报执行失败
log.error("任务执行失败 jobTask:[{}]", jobContext.getTaskId(), t);
try {
ExecuteResult failure = ExecuteResult.failure();
if (t instanceof CancellationException) {
failure.setMessage("任务被取消");
} else {
failure.setMessage(t.getMessage());
}
CLIENT.dispatchResult(
buildDispatchJobResultRequest(ExecuteResult.failure(t.getMessage()), JobTaskStatusEnum.FAIL.getStatus())
buildDispatchJobResultRequest(failure, JobTaskStatusEnum.FAIL.getStatus())
);
} catch (Exception e) {
log.error("执行结果上报异常.[{}]", jobContext.getTaskId(), e);

View File

@ -22,6 +22,7 @@ public enum JobOperationReasonEnum {
JOB_DISCARD(4, "任务丢弃"),
JOB_OVERLAY(5, "任务被覆盖"),
NOT_EXECUTE_TASK(6, "无可执行任务项"),
TASK_EXECUTE_ERROR(7, "任务执行期间发生非预期异常"),
;
private final int reason;

View File

@ -8,7 +8,6 @@ import com.aizuda.easy.retry.common.core.enums.StatusEnum;
import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.server.common.akka.ActorGenerator;
import com.aizuda.easy.retry.server.common.cache.CacheRegisterTable;
import com.aizuda.easy.retry.server.common.dto.RegisterNodeInfo;
import com.aizuda.easy.retry.server.common.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.job.task.dto.JobTimerTaskDTO;
import com.aizuda.easy.retry.server.job.task.dto.TaskExecuteDTO;
@ -41,7 +40,6 @@ import org.springframework.util.CollectionUtils;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
/**
@ -73,6 +71,8 @@ public class JobExecutorActor extends AbstractActor {
} catch (Exception e) {
LogUtils.error(log, "job executor exception. [{}]", taskExecute, e);
handlerTaskBatch(taskExecute, JobTaskBatchStatusEnum.FAIL.getStatus(), JobOperationReasonEnum.TASK_EXECUTE_ERROR.getReason());
// TODO 告警通知
} finally {
getContext().stop(getSelf());
}
@ -82,13 +82,27 @@ public class JobExecutorActor extends AbstractActor {
private void doExecute(final TaskExecuteDTO taskExecute) {
Job job = jobMapper.selectOne(new LambdaQueryWrapper<Job>()
.eq(Job::getJobStatus, StatusEnum.YES.getStatus())
.eq(Job::getId, taskExecute.getJobId())
.eq(Job::getJobStatus, StatusEnum.YES.getStatus())
.eq(Job::getId, taskExecute.getJobId())
);
try {
if (!handlerTaskBatch(taskExecute, job)) {
int taskStatus = JobTaskBatchStatusEnum.RUNNING.getStatus();
int operationReason = JobOperationReasonEnum.NONE.getReason();
if (Objects.isNull(job)) {
taskStatus = JobTaskBatchStatusEnum.CANCEL.getStatus();
operationReason = JobOperationReasonEnum.JOB_CLOSED.getReason();
} else if (CollectionUtils.isEmpty(CacheRegisterTable.getServerNodeSet(taskExecute.getGroupName()))) {
taskStatus = JobTaskBatchStatusEnum.CANCEL.getStatus();
operationReason = JobOperationReasonEnum.NOT_CLIENT.getReason();
}
// 更新状态
handlerTaskBatch(taskExecute, taskStatus, operationReason);
// 不是运行中的不需要生产任务
if (taskStatus != JobTaskBatchStatusEnum.RUNNING.getStatus()) {
return;
}
@ -101,7 +115,7 @@ public class JobExecutorActor extends AbstractActor {
} finally {
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
@Override
public void afterCommit() {
public void afterCompletion(int status) {
// 清除时间轮的缓存
JobTimerWheel.clearCache(taskExecute.getTaskBatchId());
//方法内容
@ -112,17 +126,7 @@ public class JobExecutorActor extends AbstractActor {
}
private boolean handlerTaskBatch(final TaskExecuteDTO taskExecute, final Job job) {
int taskStatus = JobTaskBatchStatusEnum.RUNNING.getStatus();
int operationReason = JobOperationReasonEnum.NONE.getReason();
if (Objects.isNull(job)) {
taskStatus = JobTaskBatchStatusEnum.CANCEL.getStatus();
operationReason = JobOperationReasonEnum.JOB_CLOSED.getReason();
} else if (CollectionUtils.isEmpty(CacheRegisterTable.getServerNodeSet(taskExecute.getGroupName()))) {
taskStatus = JobTaskBatchStatusEnum.CANCEL.getStatus();
operationReason = JobOperationReasonEnum.NOT_CLIENT.getReason();
}
private void handlerTaskBatch(TaskExecuteDTO taskExecute, int taskStatus, int operationReason) {
JobTaskBatch jobTaskBatch = new JobTaskBatch();
jobTaskBatch.setId(taskExecute.getTaskBatchId());
@ -130,9 +134,8 @@ public class JobExecutorActor extends AbstractActor {
jobTaskBatch.setTaskBatchStatus(taskStatus);
jobTaskBatch.setOperationReason(operationReason);
Assert.isTrue(1 == jobTaskBatchMapper.updateById(jobTaskBatch),
() -> new EasyRetryServerException("更新任务失败"));
() -> new EasyRetryServerException("更新任务失败"));
return taskStatus == JobTaskBatchStatusEnum.RUNNING.getStatus();
}
private void doHandlerResidentTask(Job job, TaskExecuteDTO taskExecuteDTO) {

View File

@ -1,8 +1,12 @@
package com.aizuda.easy.retry.server.job.task.support.idempotent;
import com.aizuda.easy.retry.server.common.IdempotentStrategy;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import java.time.LocalDateTime;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.TimeUnit;
/**
* @author www.byteblogs.com
@ -11,11 +15,20 @@ import java.util.concurrent.CopyOnWriteArraySet;
*/
public class TimerIdempotent implements IdempotentStrategy<Long, Long> {
private static final CopyOnWriteArraySet<Long> cache = new CopyOnWriteArraySet<>();
private static final Cache<Long, Long> cache;
static {
cache = CacheBuilder.newBuilder()
.concurrencyLevel(8) // 并发级别
// 设置过期时间避免由于异常情况导致时间轮的缓存没有删除
.expireAfterWrite(20, TimeUnit.SECONDS)
.build();
}
@Override
public boolean set(Long key, Long value) {
return cache.add(key);
cache.put(key, value);
return Boolean.TRUE;
}
@Override
@ -25,11 +38,12 @@ public class TimerIdempotent implements IdempotentStrategy<Long, Long> {
@Override
public boolean isExist(Long key, Long value) {
return cache.contains(key);
return cache.asMap().containsKey(key);
}
@Override
public boolean clear(Long key, Long value) {
return cache.removeIf(l -> l.equals(key));
cache.invalidate(key);
return Boolean.TRUE;
}
}

View File

@ -33,6 +33,7 @@ public class WaitJobPrepareHandler extends AbstractJobPrePareHandler {
// 若时间轮中数据不存在则重新加入
if (!JobTimerWheel.isExisted(jobPrepareDTO.getTaskBatchId())) {
log.info("存在待处理任务且时间轮中不存在 taskBatchId:[{}]", jobPrepareDTO.getTaskBatchId());
// 进入时间轮
long delay = jobPrepareDTO.getNextTriggerAt().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli()

View File

@ -52,13 +52,6 @@ public class ReportDispatchResultPostHttpRequestHandler extends PostHttpRequestH
ClientCallbackContext context = JobTaskConverter.INSTANCE.toClientCallbackContext(dispatchJobResultRequest);
clientCallback.callback(context);
// JobLogDTO jobLogDTO = JobTaskConverter.INSTANCE.toJobLogDTO(dispatchJobResultRequest);
// ExecuteResult executeResult = dispatchJobResultRequest.getExecuteResult();
// jobLogDTO.setMessage(executeResult.getMessage());
// jobLogDTO.setClientId(hostId);
// ActorRef actorRef = ActorGenerator.jobLogActor();
// actorRef.tell(jobLogDTO, actorRef);
return JsonUtil.toJsonString(new NettyResult(StatusEnum.YES.getStatus(), "Report Dispatch Result Processed Successfully", Boolean.TRUE, retryRequest.getReqId()));
}
}

View File

@ -17,7 +17,7 @@ import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import java.time.LocalDateTime;
import java.util.Optional;
import java.util.Objects;
/**
* @author: www.byteblogs.com
@ -81,7 +81,7 @@ public class BlockStrategies {
JobTaskBatchGenerator jobTaskBatchGenerator = SpringContext.getBeanByType(JobTaskBatchGenerator.class);
JobTaskBatchGeneratorContext jobTaskBatchGeneratorContext = JobTaskConverter.INSTANCE.toJobTaskGeneratorContext(context);
jobTaskBatchGeneratorContext.setTaskBatchStatus(JobTaskBatchStatusEnum.FAIL.getStatus());
jobTaskBatchGeneratorContext.setOperationReason(JobOperationReasonEnum.JOB_OVERLAY.getReason());
jobTaskBatchGeneratorContext.setOperationReason(JobOperationReasonEnum.JOB_DISCARD.getReason());
jobTaskBatchGenerator.generateJobTaskBatch(jobTaskBatchGeneratorContext);
}
}
@ -100,7 +100,13 @@ public class BlockStrategies {
// 停止任务
JobTaskStopHandler instanceInterrupt = JobTaskStopFactory.getJobTaskStop(context.taskType);
TaskStopJobContext stopJobContext = JobTaskConverter.INSTANCE.toStopJobContext(context);
stopJobContext.setJobOperationReason(Optional.ofNullable(context.getOperationReason()).orElse(JobOperationReasonEnum.JOB_DISCARD.getReason()));
Integer operationReason = context.getOperationReason();
if (Objects.isNull(context.getOperationReason()) || context.getOperationReason() == JobOperationReasonEnum.NONE.getReason()) {
operationReason = JobOperationReasonEnum.JOB_OVERLAY.getReason();
}
stopJobContext.setJobOperationReason(operationReason);
stopJobContext.setNeedUpdateTaskStatus(Boolean.TRUE);
instanceInterrupt.stop(stopJobContext);

View File

@ -131,7 +131,7 @@ public abstract class AbstractScanGroup extends AbstractActor {
long delay = partitionTask.getNextTriggerAt().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli()
- System.currentTimeMillis();
RetryTimerWheel.register(retryTask.getGroupName(), retryTask.getUniqueId(), timerTask(partitionTask), delay,
RetryTimerWheel.register(partitionTask.getGroupName(), partitionTask.getUniqueId(), timerTask(partitionTask), delay,
TimeUnit.MILLISECONDS);
}

View File

@ -1,5 +1,6 @@
package com.aizuda.easy.retry.server.retry.task.support.timer;
import cn.hutool.core.util.StrUtil;
import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.server.common.Lifecycle;
import com.aizuda.easy.retry.server.retry.task.support.idempotent.TimerIdempotent;
@ -47,6 +48,9 @@ public class RetryTimerWheel implements Lifecycle {
}
public static boolean isExisted(String groupName, String uniqueId) {
if (StrUtil.isNotBlank(groupName) || StrUtil.isNotBlank(uniqueId)) {
return Boolean.FALSE;
}
return idempotent.isExist(groupName, uniqueId);
}

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@ -1 +0,0 @@
(window["webpackJsonp"]=window["webpackJsonp"]||[]).push([["chunk-7e5ff423"],{"3b7a":function(t,e,r){"use strict";r.d(e,"g",(function(){return u})),r.d(e,"j",(function(){return a})),r.d(e,"a",(function(){return s})),r.d(e,"l",(function(){return i})),r.d(e,"f",(function(){return b})),r.d(e,"h",(function(){return c})),r.d(e,"e",(function(){return d})),r.d(e,"d",(function(){return l})),r.d(e,"c",(function(){return f})),r.d(e,"b",(function(){return j})),r.d(e,"i",(function(){return h})),r.d(e,"k",(function(){return m}));var n=r("b775"),o={jobList:"/job/list",jobDetail:"/job/",saveJob:"/job/",updateJob:"/job/",updateJobStatus:"/job/status",delJob:"/job/",timeByCron:"/job/cron",jobNameList:"/job/job-name/list",jobBatchList:"/job/batch/list",jobBatchDetail:"/job/batch/",jobTaskList:"/job/task/list",jobLogList:"/job/log/list"};function u(t){return Object(n["b"])({url:o.jobNameList,method:"get",params:t})}function a(t){return Object(n["b"])({url:o.timeByCron,method:"get",params:t})}function s(t){return Object(n["b"])({url:o.delJob+t,method:"delete"})}function i(t){return Object(n["b"])({url:o.updateJobStatus,method:"put",data:t})}function b(t){return Object(n["b"])({url:o.jobLogList,method:"get",params:t})}function c(t){return Object(n["b"])({url:o.jobTaskList,method:"get",params:t})}function d(t){return Object(n["b"])({url:o.jobBatchList,method:"get",params:t})}function l(t){return Object(n["b"])({url:o.jobBatchDetail+t,method:"get"})}function f(t){return Object(n["b"])({url:o.jobList,method:"get",params:t})}function j(t){return Object(n["b"])({url:o.jobDetail+t,method:"get"})}function h(t){return Object(n["b"])({url:o.saveJob,method:"post",data:t})}function m(t){return Object(n["b"])({url:o.updateJob,method:"put",data:t})}},a03c:function(t,e,r){"use strict";r.r(e);var n=function(){var t=this,e=t._self._c;return e("div",[e("a-card",[e("s-table",{ref:"table",attrs:{size:"default",rowKey:"key",columns:t.columns,data:t.loadData},scopedSlots:t._u([{key:"serial",fn:function(r,n){return e("span",{},[t._v(" "+t._s(n.id)+" ")])}}])})],1)],1)},o=[],u=r("c1df"),a=r.n(u),s=r("2af9"),i=r("3b7a"),b={name:"JobLogList",components:{STable:s["j"]},data:function(){var t=this;return{columns:[{title:"#",scopedSlots:{customRender:"serial"},width:"5%"},{title:"信息",dataIndex:"message",width:"50%"},{title:"触发时间",dataIndex:"createDt",sorter:!0,customRender:function(t){return a()(t).format("YYYY-MM-DD HH:mm:ss")},width:"10%"}],queryParam:{},loadData:function(e){return Object(i["f"])(Object.assign(e,t.queryParam)).then((function(e){return t.total=e.total,e}))},total:0}},created:function(){var t=this.$route.query.taskBatchId,e=this.$route.query.jobId;t&&e?(this.queryParam={taskBatchId:t,jobId:e},this.$refs.table.refresh(!0)):this.$router.push({path:"/404"})},methods:{refreshTable:function(t){this.queryParam=t,this.$refs.table.refresh(!0)}}},c=b,d=r("2877"),l=Object(d["a"])(c,n,o,!1,null,"35165b21",null);e["default"]=l.exports}}]);

File diff suppressed because one or more lines are too long

View File

@ -171,13 +171,6 @@ export const asyncRouterMap = [
hidden: true,
component: () => import('@/views/job/JobTaskList'),
meta: { title: '任务项', icon: 'profile', permission: ['jobBatch'] }
},
{
path: '/job/log/list',
name: 'JobLogMessageList',
hidden: true,
component: () => import('@/views/job/JobLogMessageList'),
meta: { title: '任务调度日志', icon: 'profile', permission: ['jobBatch'] }
}
]
},

View File

@ -124,6 +124,10 @@ const enums = {
'6': {
'name': '无可执行任务项',
'color': '#23c28a'
},
'7': {
'name': '任务执行期间发生非预期异常',
'color': '#bdc223'
}
},
taskStatus: {

View File

@ -1,89 +0,0 @@
<template>
<div>
<a-card>
<s-table
ref="table"
size="default"
rowKey="key"
:columns="columns"
:data="loadData"
>
<span slot="serial" slot-scope="text, record">
{{ record.id }}
</span>
</s-table>
</a-card>
</div>
</template>
<script>
import moment from 'moment'
import { STable } from '@/components'
import { jobLogList } from '@/api/jobApi'
export default {
name: 'JobLogList',
components: {
STable
},
data () {
return {
//
columns: [
{
title: '#',
scopedSlots: { customRender: 'serial' },
width: '5%'
},
{
title: '信息',
dataIndex: 'message',
width: '50%'
},
{
title: '触发时间',
dataIndex: 'createDt',
sorter: true,
customRender: (text) => moment(text).format('YYYY-MM-DD HH:mm:ss'),
width: '10%'
}
],
queryParam: {},
// Promise
loadData: parameter => {
return jobLogList(Object.assign(parameter, this.queryParam))
.then(res => {
this.total = res.total
return res
})
},
total: 0
}
},
created () {
const taskBatchId = this.$route.query.taskBatchId
const jobId = this.$route.query.jobId
if (taskBatchId && jobId) {
this.queryParam = {
taskBatchId: taskBatchId,
jobId: jobId
}
this.$refs.table.refresh(true)
} else {
this.$router.push({ path: '/404' })
}
},
methods: {
refreshTable (v) {
this.queryParam = v
this.$refs.table.refresh(true)
}
}
}
</script>
<style scoped>
</style>

View File

@ -114,7 +114,6 @@ import AInput from 'ant-design-vue/es/input/Input'
import { STable } from '@/components'
import { jobLogList, jobTaskList } from '@/api/jobApi'
import enums from '@/utils/jobEnum'
import JobLogMessageList from './JobLogMessageList'
import moment from 'moment/moment'
export default {
@ -122,8 +121,7 @@ export default {
components: {
AInput,
ATextarea,
STable,
JobLogMessageList
STable
},
data () {
return {

View File

@ -157,10 +157,11 @@
<a-form-item label="任务类型">
<a-select
placeholder="请选择任务类型"
@change="handleTaskTypeChange"
v-decorator="[
'taskType',
{
initialValue: '1',
initialValue: taskTypeValue,
rules: [{ required: true, message: '请选择任务类型'}]
}
]" >
@ -177,7 +178,7 @@
@click="handleBlur"
v-decorator="[
'argsStr',
{rules: [{ required: false, message: '请输入方法参数', whitespace: true}]}
{rules: [{ required: this.taskTypeValue === '3', message: '请输入方法参数', whitespace: true}]}
]" />
</a-form-item>
</a-col>
@ -259,7 +260,7 @@
style="text-align: center"
>
<a-button htmlType="submit" type="primary">提交</a-button>
<a-button style="margin-left: 8px">重置</a-button>
<a-button style="margin-left: 8px" @click="reset">重置</a-button>
</a-form-item>
</a-form>
</a-card>
@ -355,7 +356,8 @@ export default {
loading: false,
visible: false,
count: 0,
triggerTypeValue: '2'
triggerTypeValue: '2',
taskTypeValue: '1'
}
},
beforeCreate () {
@ -380,12 +382,15 @@ export default {
},
methods: {
handleChange (value) {
console.log(value)
this.triggerTypeValue = value
this.form.setFieldsValue({
triggerInterval: null
})
},
handleTaskTypeChange (value) {
console.log(value)
this.taskTypeValue = value
},
handlerCron () {
const triggerType = this.form.getFieldValue('triggerType')
if (triggerType === '1') {
@ -520,7 +525,12 @@ export default {
this.triggerTypeValue = formData.triggerType
form.setFieldsValue(formData)
})
},
reset () {
this.form.resetFields()
this.dynamicForm.resetFields()
}
}
}
</script>