feat: 3.1.1
1. 修改日志级别
This commit is contained in:
parent
479d5c5ac5
commit
d4050d5095
@ -110,7 +110,7 @@ public class RpcClientInvokeHandler implements InvocationHandler {
|
||||
// 最多调用size次
|
||||
int size = serverNodeSet.size();
|
||||
for (int count = 1; count <= size; count++) {
|
||||
log.info("Start request client. count:[{}] clientId:[{}] clientAddr:[{}:{}] serverIp:[{}]", count, hostId,
|
||||
log.debug("Start request client. count:[{}] clientId:[{}] clientAddr:[{}:{}] serverIp:[{}]", count, hostId,
|
||||
hostIp, hostPort, NetUtil.getLocalIpStr());
|
||||
Result result = requestRemote(method, args, annotation, count);
|
||||
if (Objects.nonNull(result)) {
|
||||
@ -156,7 +156,7 @@ public class RpcClientInvokeHandler implements InvocationHandler {
|
||||
return Objects.requireNonNull(response.getBody());
|
||||
});
|
||||
|
||||
log.info("Request client success. count:[{}] clientId:[{}] clientAddr:[{}:{}] serverIp:[{}]", count, hostId,
|
||||
log.debug("Request client success. count:[{}] clientId:[{}] clientAddr:[{}:{}] serverIp:[{}]", count, hostId,
|
||||
hostIp, hostPort, NetUtil.getLocalIpStr());
|
||||
|
||||
return result;
|
||||
|
@ -9,8 +9,6 @@ import com.aizuda.easy.retry.template.datasource.persistence.mapper.SequenceAllo
|
||||
import com.aizuda.easy.retry.template.datasource.persistence.po.SequenceAlloc;
|
||||
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.perf4j.StopWatch;
|
||||
import org.perf4j.slf4j.Slf4JStopWatch;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
@ -112,7 +110,6 @@ public class SegmentIdGenerator implements IdGenerator, Lifecycle {
|
||||
}
|
||||
|
||||
private void updateCacheFromDb() {
|
||||
StopWatch sw = new Slf4JStopWatch();
|
||||
try {
|
||||
List<SequenceAlloc> sequenceAllocs = sequenceAllocMapper
|
||||
.selectList(new LambdaQueryWrapper<SequenceAlloc>().select(SequenceAlloc::getGroupName));
|
||||
@ -142,7 +139,7 @@ public class SegmentIdGenerator implements IdGenerator, Lifecycle {
|
||||
segment.setMax(0);
|
||||
segment.setStep(0);
|
||||
cache.put(tag, buffer);
|
||||
EasyRetryLog.LOCAL.info("Add tag {} from db to IdCache, SegmentBuffer {}", tag, buffer);
|
||||
EasyRetryLog.LOCAL.debug("Add tag {} from db to IdCache, SegmentBuffer {}", tag, buffer);
|
||||
}
|
||||
//cache中已失效的tags从cache删除
|
||||
for (int i = 0; i < dbTags.size(); i++) {
|
||||
@ -153,12 +150,10 @@ public class SegmentIdGenerator implements IdGenerator, Lifecycle {
|
||||
}
|
||||
for (Pair<String, String> tag : removeTagsSet) {
|
||||
cache.remove(tag);
|
||||
EasyRetryLog.LOCAL.info("Remove tag {} from IdCache", tag);
|
||||
EasyRetryLog.LOCAL.debug("Remove tag {} from IdCache", tag);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
EasyRetryLog.LOCAL.warn("update cache from db exception", e);
|
||||
} finally {
|
||||
sw.stop("updateCacheFromDb");
|
||||
EasyRetryLog.LOCAL.error("update cache from db exception", e);
|
||||
}
|
||||
}
|
||||
|
||||
@ -175,10 +170,10 @@ public class SegmentIdGenerator implements IdGenerator, Lifecycle {
|
||||
if (!buffer.isInitOk()) {
|
||||
try {
|
||||
updateSegmentFromDb(key, buffer.getCurrent());
|
||||
EasyRetryLog.LOCAL.info("Init buffer. Update key {} {} from db", key, buffer.getCurrent());
|
||||
EasyRetryLog.LOCAL.debug("Init buffer. Update key {} {} from db", key, buffer.getCurrent());
|
||||
buffer.setInitOk(true);
|
||||
} catch (Exception e) {
|
||||
EasyRetryLog.LOCAL.warn("Init buffer {} exception", buffer.getCurrent(), e);
|
||||
EasyRetryLog.LOCAL.error("Init buffer {} exception", buffer.getCurrent(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -216,7 +211,7 @@ public class SegmentIdGenerator implements IdGenerator, Lifecycle {
|
||||
} else {
|
||||
nextStep = nextStep / 2 >= buffer.getMinStep() ? nextStep / 2 : nextStep;
|
||||
}
|
||||
EasyRetryLog.LOCAL.info("leafKey[{}], step[{}], duration[{}mins], nextStep[{}]", key, buffer.getStep(), String.format("%.2f", ((double) duration / (1000 * 60))), nextStep);
|
||||
EasyRetryLog.LOCAL.debug("leafKey[{}], step[{}], duration[{}mins], nextStep[{}]", key, buffer.getStep(), String.format("%.2f", ((double) duration / (1000 * 60))), nextStep);
|
||||
|
||||
sequenceAllocMapper.updateMaxIdByCustomStep(nextStep, key.getKey(), key.getValue());
|
||||
sequenceAlloc = sequenceAllocMapper
|
||||
@ -244,7 +239,7 @@ public class SegmentIdGenerator implements IdGenerator, Lifecycle {
|
||||
try {
|
||||
updateSegmentFromDb(buffer.getKey(), next);
|
||||
updateOk = true;
|
||||
EasyRetryLog.LOCAL.info("update segment {} from db {}", buffer.getKey(), next);
|
||||
EasyRetryLog.LOCAL.debug("update segment {} from db {}", buffer.getKey(), next);
|
||||
} catch (Exception e) {
|
||||
EasyRetryLog.LOCAL.warn(buffer.getKey() + " updateSegmentFromDb exception", e);
|
||||
} finally {
|
||||
|
@ -79,7 +79,7 @@ public class JobExecutorActor extends AbstractActor {
|
||||
public Receive createReceive() {
|
||||
return receiveBuilder().match(TaskExecuteDTO.class, taskExecute -> {
|
||||
try {
|
||||
log.info("准备执行任务. [{}] [{}]", LocalDateTime.now(), JsonUtil.toJsonString(taskExecute));
|
||||
log.debug("准备执行任务. [{}] [{}]", LocalDateTime.now(), JsonUtil.toJsonString(taskExecute));
|
||||
transactionTemplate.execute(new TransactionCallbackWithoutResult() {
|
||||
@Override
|
||||
protected void doInTransactionWithoutResult(final TransactionStatus status) {
|
||||
@ -148,7 +148,7 @@ public class JobExecutorActor extends AbstractActor {
|
||||
JobExecutor jobExecutor = JobExecutorFactory.getJobExecutor(job.getTaskType());
|
||||
jobExecutor.execute(buildJobExecutorContext(taskExecute, job, taskList));
|
||||
} finally {
|
||||
log.info("准备执行任务完成.[{}]", JsonUtil.toJsonString(taskExecute));
|
||||
log.debug("准备执行任务完成.[{}]", JsonUtil.toJsonString(taskExecute));
|
||||
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
|
||||
@Override
|
||||
public void afterCompletion(int status) {
|
||||
@ -224,7 +224,7 @@ public class JobExecutorActor extends AbstractActor {
|
||||
// 获取时间差的毫秒数
|
||||
long milliseconds = nextTriggerAt - preTriggerAt;
|
||||
|
||||
log.info("常驻任务监控. 任务时间差:[{}] 取余:[{}]", milliseconds, DateUtils.toNowMilli() % 1000);
|
||||
log.debug("常驻任务监控. 任务时间差:[{}] 取余:[{}]", milliseconds, DateUtils.toNowMilli() % 1000);
|
||||
job.setNextTriggerAt(nextTriggerAt);
|
||||
|
||||
JobTimerWheel.register(SyetemTaskTypeEnum.JOB.getType(), jobTimerTaskDTO.getTaskBatchId(), timerTask, milliseconds - DateUtils.toNowMilli() % 1000, TimeUnit.MILLISECONDS);
|
||||
|
@ -58,7 +58,7 @@ public class JobExecutorResultActor extends AbstractActor {
|
||||
@Override
|
||||
public Receive createReceive() {
|
||||
return receiveBuilder().match(JobExecutorResultDTO.class, result -> {
|
||||
log.info("更新任务状态. 参数:[{}]", JsonUtil.toJsonString(result));
|
||||
log.debug("更新任务状态. 参数:[{}]", JsonUtil.toJsonString(result));
|
||||
try {
|
||||
JobTask jobTask = new JobTask();
|
||||
jobTask.setTaskStatus(result.getTaskStatus());
|
||||
|
@ -76,7 +76,7 @@ public class ScanJobTaskActor extends AbstractActor {
|
||||
long total = PartitionTaskUtils.process(startId -> listAvailableJobs(startId, scanTask),
|
||||
this::processJobPartitionTasks, 0);
|
||||
|
||||
log.info("job scan end. total:[{}]", total);
|
||||
log.debug("job scan end. total:[{}]", total);
|
||||
}
|
||||
|
||||
private void processJobPartitionTasks(List<? extends PartitionTask> partitionTasks) {
|
||||
|
@ -63,7 +63,7 @@ public class WorkflowExecutorActor extends AbstractActor {
|
||||
@Override
|
||||
public Receive createReceive() {
|
||||
return receiveBuilder().match(WorkflowNodeTaskExecuteDTO.class, taskExecute -> {
|
||||
log.info("工作流开始执行. [{}]", JsonUtil.toJsonString(taskExecute));
|
||||
log.debug("工作流开始执行. [{}]", JsonUtil.toJsonString(taskExecute));
|
||||
try {
|
||||
|
||||
doExecutor(taskExecute);
|
||||
@ -195,14 +195,14 @@ public class WorkflowExecutorActor extends AbstractActor {
|
||||
List<JobTaskBatch> jobTaskBatches = jobTaskBatchMap.get(nodeId);
|
||||
// 说明此节点未执行, 继续等待执行完成
|
||||
if (CollectionUtils.isEmpty(jobTaskBatches)) {
|
||||
EasyRetryLog.LOCAL.info("存在未完成的兄弟节点. [{}]", nodeId);
|
||||
EasyRetryLog.LOCAL.debug("存在未完成的兄弟节点. [{}]", nodeId);
|
||||
return Boolean.FALSE;
|
||||
}
|
||||
|
||||
boolean isCompleted = jobTaskBatches.stream().anyMatch(
|
||||
jobTaskBatch -> JobTaskBatchStatusEnum.NOT_COMPLETE.contains(jobTaskBatch.getTaskBatchStatus()));
|
||||
if (isCompleted) {
|
||||
EasyRetryLog.LOCAL.info("存在未完成的兄弟节点. [{}]", nodeId);
|
||||
EasyRetryLog.LOCAL.debug("存在未完成的兄弟节点. [{}]", nodeId);
|
||||
return Boolean.FALSE;
|
||||
}
|
||||
}
|
||||
|
@ -97,7 +97,7 @@ public class DecisionWorkflowExecutor extends AbstractWorkflowExecutor {
|
||||
}
|
||||
}
|
||||
|
||||
log.info("执行条件表达式:[{}],参数: [{}] 结果:[{}]", decisionConfig.getNodeExpression(), jobTask.getResultMessage(), result);
|
||||
log.debug("执行条件表达式:[{}],参数: [{}] 结果:[{}]", decisionConfig.getNodeExpression(), jobTask.getResultMessage(), result);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -64,7 +64,7 @@ public class DistributedLockHandler {
|
||||
}
|
||||
}
|
||||
|
||||
EasyRetryLog.LOCAL.info("第【{}】次尝试获取锁. lockName:[{}] result:[{}] treadName:[{}]",
|
||||
EasyRetryLog.LOCAL.debug("第【{}】次尝试获取锁. lockName:[{}] result:[{}] treadName:[{}]",
|
||||
attempt.getAttemptNumber(), lockName, result, Thread.currentThread().getName());
|
||||
}
|
||||
}).build();
|
||||
@ -89,7 +89,7 @@ public class DistributedLockHandler {
|
||||
} finally {
|
||||
if (lock) {
|
||||
lockProvider.unlock();
|
||||
EasyRetryLog.LOCAL.info("[{}] 锁已释放", lockName);
|
||||
EasyRetryLog.LOCAL.debug("[{}] 锁已释放", lockName);
|
||||
} else {
|
||||
// 未获取到锁直接清除线程中存储的锁信息
|
||||
LockManager.clear();
|
||||
@ -122,7 +122,7 @@ public class DistributedLockHandler {
|
||||
} finally {
|
||||
if (lock) {
|
||||
lockProvider.unlock();
|
||||
EasyRetryLog.LOCAL.info("[{}] 锁已释放", lockName);
|
||||
EasyRetryLog.LOCAL.debug("[{}] 锁已释放", lockName);
|
||||
} else {
|
||||
// 未获取到锁直接清除线程中存储的锁信息
|
||||
LockManager.clear();
|
||||
|
@ -40,7 +40,7 @@ public class RunningJobPrepareHandler extends AbstractJobPrePareHandler {
|
||||
|
||||
@Override
|
||||
protected void doHandler(JobTaskPrepareDTO prepare) {
|
||||
log.info("存在运行中的任务. prepare:[{}]", JsonUtil.toJsonString(prepare));
|
||||
log.debug("存在运行中的任务. prepare:[{}]", JsonUtil.toJsonString(prepare));
|
||||
|
||||
// 若存在所有的任务都是完成,但是批次上的状态为运行中,则是并发导致的未把批次状态变成为终态,此处做一次兜底处理
|
||||
int blockStrategy = prepare.getBlockStrategy();
|
||||
|
@ -35,7 +35,7 @@ public class TerminalJobPrepareHandler extends AbstractJobPrePareHandler {
|
||||
|
||||
@Override
|
||||
protected void doHandler(JobTaskPrepareDTO jobPrepareDTO) {
|
||||
log.info("无处理中的数据. jobId:[{}]", jobPrepareDTO.getJobId());
|
||||
log.debug("无处理中的数据. jobId:[{}]", jobPrepareDTO.getJobId());
|
||||
|
||||
// 生成任务批次
|
||||
jobTaskBatchGenerator.generateJobTaskBatch(JobTaskConverter.INSTANCE.toJobTaskGeneratorContext(jobPrepareDTO));
|
||||
|
@ -30,7 +30,7 @@ public class WaitJobPrepareHandler extends AbstractJobPrePareHandler {
|
||||
|
||||
@Override
|
||||
protected void doHandler(JobTaskPrepareDTO jobPrepareDTO) {
|
||||
log.info("存在待处理任务. taskBatchId:[{}]", jobPrepareDTO.getTaskBatchId());
|
||||
log.debug("存在待处理任务. taskBatchId:[{}]", jobPrepareDTO.getTaskBatchId());
|
||||
|
||||
// 若时间轮中数据不存在则重新加入
|
||||
if (!JobTimerWheel.isExisted(SyetemTaskTypeEnum.JOB.getType(), jobPrepareDTO.getTaskBatchId())) {
|
||||
|
@ -36,7 +36,7 @@ public class RunningWorkflowPrepareHandler extends AbstractWorkflowPrePareHandle
|
||||
|
||||
@Override
|
||||
protected void doHandler(WorkflowTaskPrepareDTO prepare) {
|
||||
log.info("存在运行中的任务. prepare:[{}]", JsonUtil.toJsonString(prepare));
|
||||
log.debug("存在运行中的任务. prepare:[{}]", JsonUtil.toJsonString(prepare));
|
||||
|
||||
|
||||
// 1. 若DAG已经支持完成了,由于异常原因导致的没有更新成终态此次进行一次更新操作
|
||||
|
@ -26,7 +26,7 @@ public class TerminalWorkflowPrepareHandler extends AbstractWorkflowPrePareHandl
|
||||
|
||||
@Override
|
||||
protected void doHandler(final WorkflowTaskPrepareDTO jobPrepareDTO) {
|
||||
log.info("无处理中的工作流数据. workflowId:[{}]", jobPrepareDTO.getWorkflowId());
|
||||
log.debug("无处理中的工作流数据. workflowId:[{}]", jobPrepareDTO.getWorkflowId());
|
||||
workflowBatchGenerator.generateJobTaskBatch(WorkflowTaskConverter.INSTANCE.toWorkflowTaskBatchGeneratorContext(jobPrepareDTO));
|
||||
}
|
||||
}
|
||||
|
@ -31,7 +31,7 @@ public class WaiWorkflowPrepareHandler extends AbstractWorkflowPrePareHandler {
|
||||
|
||||
@Override
|
||||
protected void doHandler(WorkflowTaskPrepareDTO workflowTaskPrepareDTO) {
|
||||
log.info("存在待处理任务. workflowTaskBatchId:[{}]", workflowTaskPrepareDTO.getWorkflowTaskBatchId());
|
||||
log.debug("存在待处理任务. workflowTaskBatchId:[{}]", workflowTaskPrepareDTO.getWorkflowTaskBatchId());
|
||||
|
||||
// 若时间轮中数据不存在则重新加入
|
||||
if (!JobTimerWheel.isExisted(SyetemTaskTypeEnum.WORKFLOW.getType(), workflowTaskPrepareDTO.getWorkflowTaskBatchId())) {
|
||||
|
@ -41,7 +41,7 @@ public class ReportDispatchResultPostHttpRequestHandler extends PostHttpRequestH
|
||||
|
||||
@Override
|
||||
public String doHandler(String content, UrlQuery query, HttpHeaders headers) {
|
||||
EasyRetryLog.LOCAL.info("Client Callback Request. content:[{}]", content);
|
||||
EasyRetryLog.LOCAL.debug("Client Callback Request. content:[{}]", content);
|
||||
|
||||
EasyRetryRequest retryRequest = JsonUtil.parseObject(content, EasyRetryRequest.class);
|
||||
Object[] args = retryRequest.getArgs();
|
||||
|
@ -25,7 +25,7 @@ public class JobTimerTask implements TimerTask {
|
||||
@Override
|
||||
public void run(final Timeout timeout) throws Exception {
|
||||
// 执行任务调度
|
||||
log.info("开始执行任务调度. 当前时间:[{}] taskId:[{}]", LocalDateTime.now(), jobTimerTaskDTO.getTaskBatchId());
|
||||
log.debug("开始执行任务调度. 当前时间:[{}] taskId:[{}]", LocalDateTime.now(), jobTimerTaskDTO.getTaskBatchId());
|
||||
|
||||
try {
|
||||
TaskExecuteDTO taskExecuteDTO = new TaskExecuteDTO();
|
||||
|
@ -44,7 +44,7 @@ public class JobTimerWheel implements Lifecycle {
|
||||
public static void register(Integer taskType, Long uniqueId, TimerTask task, long delay, TimeUnit unit) {
|
||||
|
||||
if (!isExisted(taskType, uniqueId)) {
|
||||
log.info("加入时间轮. delay:[{}ms] taskType:[{}] uniqueId:[{}]", delay, taskType, uniqueId);
|
||||
log.debug("加入时间轮. delay:[{}ms] taskType:[{}] uniqueId:[{}]", delay, taskType, uniqueId);
|
||||
delay = delay < 0 ? 0 : delay;
|
||||
try {
|
||||
timer.newTimeout(task, delay, unit);
|
||||
|
@ -26,7 +26,7 @@ public class WorkflowTimerTask implements TimerTask {
|
||||
@Override
|
||||
public void run(final Timeout timeout) throws Exception {
|
||||
// 执行任务调度
|
||||
log.info("开始执行任务调度. 当前时间:[{}] taskId:[{}]", LocalDateTime.now(), workflowTimerTaskDTO.getWorkflowTaskBatchId());
|
||||
log.debug("开始执行任务调度. 当前时间:[{}] taskId:[{}]", LocalDateTime.now(), workflowTimerTaskDTO.getWorkflowTaskBatchId());
|
||||
|
||||
try {
|
||||
|
||||
|
@ -54,7 +54,7 @@ public abstract class AbstractGenerator implements TaskGenerator {
|
||||
@Override
|
||||
@Transactional
|
||||
public void taskGenerator(TaskContext taskContext) {
|
||||
EasyRetryLog.LOCAL.info("received report data. {}", JsonUtil.toJsonString(taskContext));
|
||||
EasyRetryLog.LOCAL.debug("received report data. {}", JsonUtil.toJsonString(taskContext));
|
||||
|
||||
SceneConfig sceneConfig = checkAndInitScene(taskContext);
|
||||
|
||||
|
@ -59,7 +59,7 @@ public class FailureActor extends AbstractActor {
|
||||
@Override
|
||||
public Receive createReceive() {
|
||||
return receiveBuilder().match(RetryTask.class, retryTask -> {
|
||||
EasyRetryLog.LOCAL.info("FailureActor params:[{}]", retryTask);
|
||||
EasyRetryLog.LOCAL.debug("FailureActor params:[{}]", retryTask);
|
||||
|
||||
|
||||
try {
|
||||
|
@ -53,7 +53,7 @@ public class FinishActor extends AbstractActor {
|
||||
@Override
|
||||
public Receive createReceive() {
|
||||
return receiveBuilder().match(RetryTask.class, retryTask -> {
|
||||
EasyRetryLog.LOCAL.info("FinishActor params:[{}]", retryTask);
|
||||
EasyRetryLog.LOCAL.debug("FinishActor params:[{}]", retryTask);
|
||||
|
||||
retryTask.setRetryStatus(RetryStatusEnum.FINISH.getStatus());
|
||||
|
||||
|
@ -63,7 +63,7 @@ public class ReportRetryInfoHttpRequestHandler extends PostHttpRequestHandler {
|
||||
@Override
|
||||
@Transactional
|
||||
public String doHandler(String content, UrlQuery urlQuery, HttpHeaders headers) {
|
||||
EasyRetryLog.LOCAL.info("Batch Report Retry Data. content:[{}]", content);
|
||||
EasyRetryLog.LOCAL.debug("Batch Report Retry Data. content:[{}]", content);
|
||||
|
||||
EasyRetryRequest retryRequest = JsonUtil.parseObject(content, EasyRetryRequest.class);
|
||||
Object[] args = retryRequest.getArgs();
|
||||
|
@ -25,7 +25,7 @@ public abstract class AbstractTimerTask implements TimerTask {
|
||||
|
||||
@Override
|
||||
public void run(Timeout timeout) throws Exception {
|
||||
log.info("开始执行重试任务. 当前时间:[{}] groupName:[{}] uniqueId:[{}] namespaceId:[{}]", LocalDateTime.now(), groupName,
|
||||
log.debug("开始执行重试任务. 当前时间:[{}] groupName:[{}] uniqueId:[{}] namespaceId:[{}]", LocalDateTime.now(), groupName,
|
||||
uniqueId, namespaceId);
|
||||
try {
|
||||
doRun(timeout);
|
||||
|
@ -33,7 +33,7 @@ public class CallbackTimerTask extends AbstractTimerTask {
|
||||
|
||||
@Override
|
||||
protected void doRun(final Timeout timeout) {
|
||||
log.info("回调任务执行 {}", LocalDateTime.now());
|
||||
log.debug("回调任务执行 {}", LocalDateTime.now());
|
||||
AccessTemplate accessTemplate = SpringContext.getBeanByType(AccessTemplate.class);
|
||||
TaskAccess<RetryTask> retryTaskAccess = accessTemplate.getRetryTaskAccess();
|
||||
RetryTask retryTask = retryTaskAccess.one(context.getGroupName(), context.getNamespaceId(),
|
||||
|
Loading…
Reference in New Issue
Block a user