Compare commits

...

18 Commits
grpc ... dev

Author SHA1 Message Date
8ac38bef71 更改1.5.0-beta1 2025-05-08 09:41:19 +08:00
opensnail
7eb1c1efdc 修复测试问题 2025-02-21 21:43:42 +08:00
opensnail
01909eb7b3 测试1.4.0-beta1 2025-02-18 23:22:11 +08:00
opensnail
7e81fe9b73 测试1.4.0-beta1 2025-02-17 21:36:42 +08:00
dhb52
384007f7ff 发布1.3.0-beta1.1 2025-01-12 23:20:07 +08:00
dhb52
0e45a51fd6 feat(1.3.0-beta2): 更新 mysql connector 依赖 2025-01-03 21:16:47 +08:00
dhb52
df4b88550f fix(1.3.0-beta1): grpc 模式需要排除 mysql 的 protobuf 依赖冲突 2025-01-01 17:06:50 +08:00
opensnail
dfe66f6ba2 Merge remote-tracking branch 'origin/master'
# Conflicts:
#	pom.xml
2025-01-01 11:33:34 +08:00
opensnail
93016523cc 发布1.3.0-beta1 2025-01-01 11:32:55 +08:00
byteblogs168
499ecc13e5 发布1.2.0 2024-11-16 10:57:24 +08:00
byteblogs168
b04a938372 修改描述 2024-11-02 17:21:42 +08:00
dhb52
d98c7ddcb9 fix(sj_1.2.0-beta2): 使用 snailjob.version 变量 2024-10-28 22:50:09 +08:00
byteblogs168
b76cf9efa0 Merge branch 'refs/heads/1.2.0-beta2' 2024-10-25 12:02:43 +08:00
byteblogs168
72599676e1 补充更新功能 2024-10-21 23:08:18 +08:00
byteblogs168
503c401e09 测试openapi 2024-10-20 12:59:50 +08:00
srzou
04b6ba9d4a 新增 query、trigger、updateStatus 相关demo 2024-10-19 21:12:02 +08:00
byteblogs168
64ac13a1fd 测试openapi 2024-10-19 19:08:53 +08:00
ploat
05c944557e !4 提交异步日志打印demo
* 提交异步日志打印demo
2024-09-02 03:37:28 +00:00
13 changed files with 534 additions and 23 deletions

23
pom.xml
View File

@ -5,18 +5,19 @@
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.2.2</version>
<version>3.3.5</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>example1</artifactId>
<groupId>com.opensnail</groupId>
<artifactId>snailjob-demo</artifactId>
<version>1.0.0</version>
<name>example</name>
<description>Demo project for Spring Boot</description>
<name>SnailJob-Demo</name>
<description>Demo project for SnailJob</description>
<properties>
<java.version>17</java.version>
<snailjob.version>1.5.0-beta1</snailjob.version>
</properties>
<dependencies>
@ -44,17 +45,17 @@
<dependency>
<groupId>com.aizuda</groupId>
<artifactId>snail-job-client-starter</artifactId>
<version>1.1.1</version>
<version>${snailjob.version}</version>
</dependency>
<dependency>
<groupId>com.aizuda</groupId>
<artifactId>snail-job-client-retry-core</artifactId>
<version>1.1.1</version>
<version>${snailjob.version}</version>
</dependency>
<dependency>
<groupId>com.aizuda</groupId>
<artifactId>snail-job-client-job-core</artifactId>
<version>1.1.1</version>
<version>${snailjob.version}</version>
</dependency>
<dependency>
<groupId>com.googlecode.aviator</groupId>
@ -87,9 +88,9 @@
<version>5.8.19</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.30</version>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
<version>8.4.0</version>
</dependency>
<dependency>
<groupId>org.springdoc</groupId>

View File

@ -23,16 +23,16 @@ public class SwaggerConfig {
return new OpenAPI()
.info(new Info()
.title("Snail Job Example")
.description("<h1>SnailJob是致力提高分布式业务系统一致性的分布式重试平台</h1> \n" +
"<h3>官网地址: https://www.easyretry.com/</h3>" +
"<h3>在线体验地址: http://preview.easyretry.com/</h3> " +
"<h3>源码地址: https://gitee.com/byteblogs168/easy-retry-demo</h3>" +
.description("<h1>SnailJob是一个灵活,可靠和快速的分布式任务重试和分布式任务调度平台</h1> \n" +
"<h3>官网地址: https://snailjob.opensnail.com/</h3>" +
"<h3>在线体验地址: https://preview.snailjob.opensnail.com/</h3> " +
"<h3>源码地址: https://gitee.com/opensnail/snail-job-demo</h3>" +
"<h3>特别提醒: 🌻在您使用测试案例之前请认真的阅读官网.</h3>")
.version(SnailJobVersion.getVersion())
.license(new License().name("Apache 2.0").url("https://www.easyretry.com/")))
.license(new License().name("Apache 2.0").url("https://snailjob.opensnail.com/")))
.externalDocs(new ExternalDocumentation()
.description("视频教程:从0到1快速了解分布式重试组件EasyRetry")
.url("https://www.ixigua.com/pseries/7272009348824433213/"))
.description("视频教程:以小白视角的SnailJob入门级视频教程")
.url("https://www.bilibili.com/video/BV1pvtBerEmV/?vd_source=ec323e2347232ea82321f54aba036b63"))
;
}

View File

@ -0,0 +1,130 @@
package com.example.snailjob.controller;
import com.aizuda.snailjob.client.job.core.dto.JobResponseVO;
import com.example.snailjob.handler.*;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.RequiredArgsConstructor;
import org.springframework.web.bind.annotation.*;
/**
* @author opensnail
* @date 2024-10-19 10:41:25
* @since sj_1.2.0-beta2
*/
@RestController
@RequestMapping("/open-api/job")
@Tag(name = "JobOpenApi", description = "通过OpenApi可以灵活的实现对Job的增、改、查功能")
@RequiredArgsConstructor
public class JobController {
private final TestAddJobHandler testAddJobHandler;
private final TestUpdateJobHandler testUpdateJobHandler;
private final TestQueryJobHandler testQueryJobHandler;
private final TestTriggerJobHandler testTriggerJobHandler;
private final TestUpdateJobStatusHandler testUpdateJobStatusHandler;
@Operation(
description = "添加集群模式的定时任务"
)
@PostMapping("/custer/add")
public Long addClusterJob(@RequestBody String jobName) {
return testAddJobHandler.addClusterJob(jobName);
}
@Operation(
description = "添加广播模式的定时任务"
)
@PostMapping("/broadcast/add")
public Long addBroadcastJob(@RequestBody String jobName) {
return testAddJobHandler.addBroadcastJob(jobName);
}
@Operation(
description = "添加静态分片模式的定时任务"
)
@PostMapping("/sharding/add")
public Long addShardingJob(@RequestBody String jobName) {
return testAddJobHandler.addShardingJob(jobName);
}
@Operation(
description = "添加Map模式的定时任务"
)
@PostMapping("/map/add")
public Long addMapJob(@RequestBody String jobName) {
return testAddJobHandler.addMapJob(jobName);
}
@Operation(
description = "添加MapReduce模式的定时任务"
)
@PostMapping("/map-reduce/add")
public Long addMapReduceJob(@RequestBody String jobName) {
return testAddJobHandler.addMapReduceJob(jobName);
}
@Operation(
description = "更新集群模式的定时任务"
)
@PutMapping("/custer/update")
public Boolean updateClusterJob(@RequestBody Long id) {
return testUpdateJobHandler.updateClusterJob(id);
}
@Operation(
description = "更新广播模式的定时任务"
)
@PutMapping("/broadcast/update")
public Boolean updateBroadcastJob(@RequestBody Long id) {
return testUpdateJobHandler.updateBroadcastJob(id);
}
@Operation(
description = "更新静态分片模式的定时任务"
)
@PutMapping("/sharding/update")
public Boolean addShardingJob(@RequestBody Long id) {
return testUpdateJobHandler.updateShardingJob(id);
}
@Operation(
description = "更新Map模式的定时任务"
)
@PutMapping("/map/update")
public Boolean updateMapJob(@RequestBody Long id) {
return testUpdateJobHandler.updateMapJob(id);
}
@Operation(
description = "更新MapReduce模式的定时任务"
)
@PutMapping("/map-reduce/update")
public Boolean updateMapReduceJob(@RequestBody Long id) {
return testUpdateJobHandler.updateMapReduceJob(id);
}
@Operation(
description = "通过任务id查询任务的详情"
)
@GetMapping("/detail/{id}")
public JobResponseVO addMapReduceJob(@PathVariable("id") Long id) {
return testQueryJobHandler.queryJob(id);
}
@Operation(
description = "手动触发任务"
)
@PostMapping("/trigger/{id}")
public Boolean triggerJob(@PathVariable("id") Long id) {
return testTriggerJobHandler.triggerJob(id);
}
@Operation(
description = "根据id更新任务的状态",
summary = "0:关闭 1:开启"
)
@PutMapping("/update/status/{id}/{status}")
public Boolean updateJob(@PathVariable("id") Long id, @PathVariable("status") Long status) {
return testUpdateJobStatusHandler.updateJobStatus(id, status);
}
}

View File

@ -0,0 +1,40 @@
package com.example.snailjob.controller;
import com.example.snailjob.handler.TestTriggerJobHandler;
import com.example.snailjob.handler.TestUpdateJobStatusHandler;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.RequiredArgsConstructor;
import org.springframework.web.bind.annotation.*;
/**
* @author opensnail
* @date 2024-10-19 10:41:25
* @since sj_1.2.0-beta2
*/
@RestController
@RequestMapping("/open-api/workflow")
@Tag(name = "JobOpenApi", description = "通过OpenApi可以灵活的实现对的Workflow触发和更新状态功能")
@RequiredArgsConstructor
public class WorkflowController {
private final TestTriggerJobHandler testTriggerJobHandler;
private final TestUpdateJobStatusHandler testUpdateJobStatusHandler;
@Operation(
description = "手动触发任务"
)
@PostMapping("/trigger/{id}")
public Boolean triggerJob(@PathVariable("id") Long id) {
return testTriggerJobHandler.triggerWorkFlow(id);
}
@Operation(
description = "根据id更新任务的状态",
summary = "0:关闭 1:开启"
)
@PutMapping("/update/status/{id}/{status}")
public Boolean updateJob(@PathVariable("id") Long id, @PathVariable("status") Long status) {
return testUpdateJobStatusHandler.updateWorkFlowStatus(id, status);
}
}

View File

@ -1,7 +1,7 @@
package com.example.snailjob.customized;
import cn.hutool.json.JSONUtil;
import com.aizuda.snailjob.client.core.callback.RetryCompleteCallback;
import com.aizuda.snailjob.client.core.callback.complete.RetryCompleteCallback;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.example.snailjob.dao.FailOrderBaseMapper;

View File

@ -45,13 +45,13 @@ public class OnlyRemoteRetryHandler {
}
}
@Retryable(scene = "localRetryWithRequires", retryStrategy = RetryType.ONLY_LOCAL)
@Retryable(scene = "localRetryWithRequires", retryStrategy = RetryType.ONLY_REMOTE)
public void localRetryWithRequires(String params) {
System.out.println("local retry 方法开始执行");
double i = 1 / 0;
}
@Retryable(scene = "localRetryWithRequiresNew", retryStrategy = RetryType.ONLY_LOCAL, propagation = Propagation.REQUIRES_NEW)
@Retryable(scene = "localRetryWithRequiresNew", retryStrategy = RetryType.ONLY_REMOTE, propagation = Propagation.REQUIRES_NEW)
public void localRetryWithRequiresNew(String params) {
System.out.println("local retry 方法开始执行");
double i = 1 / 0;

View File

@ -0,0 +1,126 @@
package com.example.snailjob.handler;
import com.aizuda.snailjob.client.job.core.enums.AllocationAlgorithmEnum;
import com.aizuda.snailjob.client.job.core.enums.TriggerTypeEnum;
import com.aizuda.snailjob.client.job.core.openapi.SnailJobOpenApi;
import com.aizuda.snailjob.common.core.enums.JobBlockStrategyEnum;
import org.springframework.stereotype.Component;
@Component
public class TestAddJobHandler {
/**
* 新增集群模式的任务
*
* @param jobName 任务名称
* @return 任务id
*/
public Long addClusterJob(String jobName) {
return SnailJobOpenApi.addClusterJob()
.setRouteKey(AllocationAlgorithmEnum.RANDOM)
.setJobName(jobName)
.setExecutorInfo("testJobExecutor")
.setExecutorTimeout(30)
.setDescription("add")
.setBlockStrategy(JobBlockStrategyEnum.DISCARD)
.setMaxRetryTimes(1)
.setTriggerType(TriggerTypeEnum.SCHEDULED_TIME)
.setTriggerInterval(String.valueOf(60))
.addArgsStr("测试数据", 123)
.addArgsStr("addArg", "args")
.setRetryInterval(3)
.execute();
}
/**
* 新增集群模式的任务
*
* @param jobName 任务名称
* @return 任务id
*/
public Long addBroadcastJob(String jobName) {
return SnailJobOpenApi.addBroadcastJob()
.setJobName(jobName)
.setExecutorInfo("testJobExecutor")
.setExecutorTimeout(30)
.setDescription("add")
.setBlockStrategy(JobBlockStrategyEnum.DISCARD)
.setMaxRetryTimes(1)
.setTriggerType(TriggerTypeEnum.CRON)
.setTriggerInterval("afas")
.addArgsStr("测试数据", 123)
.addArgsStr("addArg", "args")
.setRetryInterval(3)
.execute();
}
/**
* 新增Sharding模式的任务
*
* @param jobName 任务名称
* @return 任务id
*/
public Long addShardingJob(String jobName) {
return SnailJobOpenApi.addShardingJob()
.setJobName(jobName)
.setExecutorInfo("testJobExecutor")
.setExecutorTimeout(30)
.setDescription("add")
.setBlockStrategy(JobBlockStrategyEnum.DISCARD)
.setMaxRetryTimes(1)
.setTriggerType(TriggerTypeEnum.SCHEDULED_TIME)
.setTriggerInterval(60)
.addShardingArgs("分片1", "分片2", "分片3")
.setParallelNum(1)
.setRetryInterval(3)
.execute();
}
/**
* 新增MapReduce模式的任务
*
* @param jobName 任务名称
* @return 任务id
*/
public Long addMapJob(String jobName) {
return SnailJobOpenApi.addMapJob()
.setJobName(jobName)
.setExecutorInfo("testJobExecutor")
.setExecutorTimeout(30)
.setDescription("add")
.setBlockStrategy(JobBlockStrategyEnum.DISCARD)
.setMaxRetryTimes(1)
.setTriggerType(TriggerTypeEnum.SCHEDULED_TIME)
.setTriggerInterval(String.valueOf(60))
.setParallelNum(3)
.setRetryInterval(3)
.execute();
}
/**
* 新增MapReduce模式的任务
*
* @param jobName 任务名称
* @return 任务id
*/
public Long addMapReduceJob(String jobName) {
return SnailJobOpenApi.addMapReduceJob()
.setJobName(jobName)
.setExecutorInfo("testJobExecutor")
.setExecutorTimeout(30)
.setDescription("add")
.setBlockStrategy(JobBlockStrategyEnum.DISCARD)
.setMaxRetryTimes(1)
.setTriggerType(TriggerTypeEnum.SCHEDULED_TIME)
.setTriggerInterval(String.valueOf(60))
.setParallelNum(3)
.setShardNum(2)
.setRetryInterval(3)
.execute();
}
}

View File

@ -0,0 +1,19 @@
package com.example.snailjob.handler;
import com.aizuda.snailjob.client.job.core.dto.JobResponseVO;
import com.aizuda.snailjob.client.job.core.openapi.SnailJobOpenApi;
import org.springframework.stereotype.Component;
@Component
public class TestQueryJobHandler {
/**
* 查看任务详情
*
* @param jobId
* @return 任务详情
*/
public JobResponseVO queryJob(Long jobId){
return SnailJobOpenApi.getJobDetail(jobId).execute();
}
}

View File

@ -0,0 +1,28 @@
package com.example.snailjob.handler;
import com.aizuda.snailjob.client.job.core.openapi.SnailJobOpenApi;
import org.springframework.stereotype.Component;
@Component
public class TestTriggerJobHandler {
/**
* 手动调度任务
*
* @param jobId 任务ID
* @return
*/
public Boolean triggerJob(Long jobId) {
return SnailJobOpenApi.triggerClusterJob(jobId).execute();
}
/**
* 手动调度工作流任务
*
* @param workFlowId 工作流任务ID
* @return
*/
public Boolean triggerWorkFlow(Long workFlowId){
return SnailJobOpenApi.triggerWorkFlow(workFlowId).execute();
}
}

View File

@ -0,0 +1,77 @@
package com.example.snailjob.handler;
import com.aizuda.snailjob.client.job.core.enums.TriggerTypeEnum;
import com.aizuda.snailjob.client.job.core.openapi.SnailJobOpenApi;
import org.springframework.stereotype.Component;
import java.util.Random;
@Component
public class TestUpdateJobHandler {
/**
* 新增集群模式的任务
*
* @return 任务id
*/
public Boolean updateClusterJob(Long jobId) {
return SnailJobOpenApi.updateClusterJob(jobId)
.setMaxRetryTimes(1)
.setTriggerType(TriggerTypeEnum.SCHEDULED_TIME)
.setTriggerInterval(String.valueOf(60))
.addArgsStr("update测试数据", new Random().nextInt(1000))
.addArgsStr("updateArg", "args")
.setRetryInterval(3)
.execute();
}
/**
* 新增集群模式的任务
*
* @return 任务id
*/
public Boolean updateBroadcastJob(Long jobId) {
return SnailJobOpenApi.updateBroadcastJob(jobId)
.addArgsStr("update测试数据", new Random().nextInt(1000))
.execute();
}
/**
* 新增Sharding模式的任务
*
* @return 任务id
*/
public Boolean updateShardingJob(Long jobId) {
return SnailJobOpenApi.updateShardingJob(jobId)
.addShardingArgs("update分片1", "update分片2", "update分片3")
.execute();
}
/**
* 新增MapReduce模式的任务
*
* @return 任务id
*/
public Boolean updateMapJob(Long jobId) {
return SnailJobOpenApi.updateMapJob(jobId)
.addArgsStr("update测试数据", new Random().nextInt(1000))
.setParallelNum(3)
.execute();
}
/**
* 新增MapReduce模式的任务
*
* @return 任务id
*/
public Boolean updateMapReduceJob(Long jobId) {
return SnailJobOpenApi.updateMapReduceJob(jobId)
.addArgsStr("update测试数据", new Random().nextInt(1000))
.execute();
}
}

View File

@ -0,0 +1,36 @@
package com.example.snailjob.handler;
import com.aizuda.snailjob.client.job.core.openapi.SnailJobOpenApi;
import com.aizuda.snailjob.common.core.enums.StatusEnum;
import org.springframework.stereotype.Component;
@Component
public class TestUpdateJobStatusHandler {
/**
* 更新定时任务状态
*
* @param jobId 定时任务ID
* @return
*/
public Boolean updateJobStatus(Long jobId, Long status) {
return SnailJobOpenApi
.updateJobStatus(jobId)
.setStatus(StatusEnum.YES.getStatus().equals(status.intValue()) ? StatusEnum.YES : StatusEnum.NO)
.execute();
}
/**
* 更新工作流任务状态
*
* @param workFlowId 工作流ID
* @param status
* @return
*/
public Boolean updateWorkFlowStatus(Long workFlowId, Long status) {
return SnailJobOpenApi
.updateWorkFlowStatus(workFlowId)
.setStatus(StatusEnum.YES.getStatus().equals(status.intValue()) ? StatusEnum.YES : StatusEnum.NO)
.execute();
}
}

View File

@ -0,0 +1,53 @@
package com.example.snailjob.job;
import com.aizuda.snailjob.client.common.log.report.LogMeta;
import com.aizuda.snailjob.client.common.log.support.SnailJobLogManager;
import com.aizuda.snailjob.client.job.core.annotation.JobExecutor;
import com.aizuda.snailjob.client.job.core.dto.JobArgs;
import com.aizuda.snailjob.client.job.core.executor.JobContextManager;
import com.aizuda.snailjob.client.model.ExecuteResult;
import com.aizuda.snailjob.common.core.model.JobContext;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.common.log.enums.LogTypeEnum;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;
/**
* @author PT
* @date 2024-09-02 上午10:50
* @Description
* 线程异步情况下日志打印场景定时调用接口更新数据
* 由于数据量每天大小不一致无法界定执行时间所以需要异步执行但是异步执行时日志也需要进行打印
*/
@Slf4j
@Component
public class TestSyncLogPrintExecutor {
@JobExecutor(name = "TestSyncLogPrintExecutor")
public ExecuteResult jobExecute(JobArgs jobArgs) {
JobContext jobContext = JobContextManager.getJobContext();
LogMeta logMeta = SnailJobLogManager.getLogMeta();
LogTypeEnum logType = SnailJobLogManager.getLogType();
//此处简单模拟开启新线程执行实际生产建议使用线程池执行
new Thread(() -> {
JobContextManager.setJobContext(jobContext);
SnailJobLogManager.setLogMeta(logMeta);
SnailJobLogManager.setLogType(logType);
//执行业务
doSomething();
});
return ExecuteResult.success();
}
private void doSomething() {
try {
TimeUnit.SECONDS.sleep(10);
SnailJobLog.REMOTE.info("测试异步打印");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}

View File

@ -46,7 +46,7 @@ snail-job:
# 服务器IP地址或域名集群时建议通过 nginx 做负载均衡
host: 127.0.0.1
# 服务器通讯端口(不是后台管理页面服务端口)
port: 1788
port: 17888
# 命名空间
namespace: 764d604ec6fc45f68cd92514c40e9e1a
# 接入组名
@ -56,4 +56,5 @@ snail-job:
# 客户端绑定IP必须服务器可以访问到默认自动推断在服务器无法调度客户端时需要手动配置
host: 127.0.0.1
# 客户端通讯端口,默认 1789
port: 1789
port: 17889
rpc-type: grpc