admin 发表于 2021-1-19 11:28:36

ElasticJob 快速上手

1.ElasticJob 是什么ElasticJob 是一个分布式调度解决方案,由两个相互独立的子项目 ElasticJob-Lite 和 ElasticJob-Cloud 组成。 ElasticJob-Lite 定位为轻量级无中心化解决方案,使用jar的形式提供分布式任务的协调服务。ElasticJob 已于2020年5月28日成为 Apache ShardingSphere 的子项目。 ElasticJob特性:
[*]弹性调度
[*]支持任务在分布式场景下的分片和高可用
[*]能够水平扩展任务的吞吐量和执行效率
[*]任务处理能力随资源配备弹性伸缩

[*]资源分配
[*]在适合的时间将适合的资源分配给任务并使其生效
[*]相同任务聚合至相同的执行器统一处理
[*]动态调配追加资源至新分配的任务

[*]作业治理
[*]失效转移
[*]错过作业重新执行
[*]自诊断修复

[*]作业开放生态
[*]可扩展的作业类型统一接口
[*]丰富的作业类型库,如数据流、脚本、HTTP、文件、大数据等
[*]易于对接业务作业,能够与 Spring 依赖注入无缝整合

[*]可视化管控端
[*]作业管控端
[*]作业执行历史数据追踪
[*]注册中心管理

2.实例演示这里采用最新版本 3.0.0-RC1 1、启动zookeeper服务首先,下载zookeeper-3.6.0版本,解压后复制一份zoo_sample.cfg,重命名未zoo.cfg,保持默认配置即可https://img2020.cnblogs.com/blog/874963/202101/874963-20210118191415469-1326853060.png注意,zookeeper-3.6.0启动以后会占用三个端口,其中包括8080哦https://img2020.cnblogs.com/blog/874963/202101/874963-20210118192906156-1531174797.png2、编写定时任务业务逻辑pom.xml<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">    <modelVersion>4.0.0</modelVersion>    <parent>      <groupId>org.springframework.boot</groupId>      <artifactId>spring-boot-starter-parent</artifactId>      <version>2.4.1</version>      <relativePath/> <!-- lookup parent from repository -->    </parent>    <groupId>com.example</groupId>    <artifactId>elasticjob-demo</artifactId>    <version>0.0.1-SNAPSHOT</version>    <properties>      <java.version>1.8</java.version>      <elasticjob-lite.version>3.0.0-RC1</elasticjob-lite.version>    </properties>    <dependencies>      <dependency>            <groupId>org.springframework.boot</groupId>            <artifactId>spring-boot-starter-web</artifactId>      </dependency>      <dependency>            <groupId>org.apache.shardingsphere.elasticjob</groupId>            <artifactId>elasticjob-lite-spring-boot-starter</artifactId>            <version>${elasticjob-lite.version}</version>            <exclusions>                <exclusion>                  <groupId>org.springframework.boot</groupId>                  <artifactId>spring-boot-starter-jdbc</artifactId>                </exclusion>            </exclusions>      </dependency>      <dependency>            <groupId>org.apache.shardingsphere.elasticjob</groupId>            <artifactId>elasticjob-error-handler-dingtalk</artifactId>            <version>${elasticjob-lite.version}</version>      </dependency>    </dependencies>    <build>      <plugins>            <plugin>                <groupId>org.springframework.boot</groupId>                <artifactId>spring-boot-maven-plugin</artifactId>            </plugin>      </plugins>    </build></project>
application.ymlelasticjob:regCenter:    serverLists: 192.168.100.15:2181    namespace: elasticjob-demo    baseSleepTimeMilliseconds: 2000    maxSleepTimeMilliseconds: 4000    maxRetries: 3jobs:    firstJob:      elasticJobClass: com.example.job.FirstJob      cron: 0/6 * * * * ?      shardingTotalCount: 3      jobErrorHandlerType: DINGTALK      props:      dingtalk:          webhook: https://oapi.dingtalk.com/robot/send?access_token=xxx          secret: ASDF          connectTimeout: 3000          readTimeout: 5000    secondJob:      elasticJobClass: com.example.job.SecondJob      cron: 0/10 * * * * ?      shardingTotalCount: 1      jobErrorHandlerType: DINGTALK      props:      dingtalk:          webhook: https://oapi.dingtalk.com/robot/send?access_token=xxx          secret: ASDF          connectTimeout: 3000          readTimeout: 5000
两个定时任务FirstJob.javapackage com.example.job;import org.apache.shardingsphere.elasticjob.api.ShardingContext;import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;import org.springframework.stereotype.Component;/** * @author ChengJianSheng * @date 2021/1/13 */@Componentpublic class FirstJob implements SimpleJob {    @Override    public void execute(ShardingContext shardingContext) {      switch (shardingContext.getShardingItem()) {            case 0:                // do something by sharding item 0                System.out.println(0);                // int a = 1 / 0;                break;            case 1:                // do something by sharding item 1                System.out.println(1);                break;            case 2:                // do something by sharding item 2                System.out.println(2);                break;            // case n: ...      }    }}
SecondJob.javapackage com.example.job;import org.apache.shardingsphere.elasticjob.api.ShardingContext;import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;import org.springframework.stereotype.Component;/** * @author ChengJianSheng * @date 2021/1/18 */@Componentpublic class SecondJob implements SimpleJob {    @Override    public void execute(ShardingContext shardingContext) {      System.out.println("hello");    }}
项目结构https://img2020.cnblogs.com/blog/874963/202101/874963-20210118201021616-900039628.png运行项目即可https://img2020.cnblogs.com/blog/874963/202101/874963-20210118200729913-336146493.pnghttps://img2020.cnblogs.com/blog/874963/202101/874963-20210119094301009-178964628.png通过 ElasticJob-UI 查看任务https://shardingsphere.apache.org/elasticjob/current/cn/downloads/https://img2020.cnblogs.com/blog/874963/202101/874963-20210118202818653-1800436329.pnghttps://img2020.cnblogs.com/blog/874963/202101/874963-20210118202827904-2029878627.pnghttps://img2020.cnblogs.com/blog/874963/202101/874963-20210118202838161-1514974437.png3.启动报错排查项目启动过程中,可能会报如下错误org.apache.zookeeper.ClientCnxn$EndOfStreamException: Unable to read additional data from server sessionid 0x1000bdf48160002, likely server has closed socketorg.apache.shardingsphere.elasticjob.reg.exception.RegException: org.apache.zookeeper.KeeperException$OperationTimeoutException: KeeperErrorCode = OperationTimeoutCaused by: org.apache.zookeeper.KeeperException$OperationTimeoutException: KeeperErrorCode = OperationTimeout最开始,我以为是zookeeper版本的问题,后来换了版本也不行,防火墙关了也不行然后,我怀疑是开发环境问题,于是在本地运行zookeeper,程序连127.0.0.1:2181,居然可以了于是我陷入了沉思,为今之计,只剩下一个办法了,打断点调试找到了异常抛出的位置,如下图https://img2020.cnblogs.com/blog/874963/202101/874963-20210119095117618-332066614.pnghttps://img2020.cnblogs.com/blog/874963/202101/874963-20210119095126380-1637738408.pnghttps://img2020.cnblogs.com/blog/874963/202101/874963-20210119095616750-1795253110.pnghttps://img2020.cnblogs.com/blog/874963/202101/874963-20210119095623681-895907092.pngbaseSleepTimeMilliseconds 表示 等待重试的间隔时间的初始值maxSleepTimeMilliseconds表示 等待重试的间隔时间的最大值maxRetries 表示 最大重试次数根据代码中意思,如果在 maxSleepTimeMilliseconds * maxRetries 毫秒内还没有连接成功,则连接关闭,并抛出操作超时异常联想到,连接本地zookeeper可以,连开发环境zk就不行,再加上观察日志从连接开始到抛异常的时间间隔,我猜到应该是maxSleepTimeMilliseconds设置太短了于是,application.yml配置文件中将maxSleepTimeMilliseconds设置为4000,baseSleepTimeMilliseconds设置为2000然后好使回想刚开始报的那些错,其实根本就还没有连上zookeeper4.作业分片ElasticJob 中任务分片项的概念,使得任务可以在分布式的环境下运行,每台任务服务器只运行分配给该服务器的分片。 随着服务器的增加或宕机,ElasticJob 会近乎实时的感知服务器数量的变更,从而重新为分布式的任务服务器分配更加合理的任务分片项,使得任务可以随着资源的增加而提升效率。任务的分布式执行,需要将一个任务拆分为多个独立的任务项,然后由分布式的服务器分别执行某一个或几个分片项。也就是说,分片是为了在分布式环境下高效合理利用任务服务器资源的。简单地来讲,一个定时任务,我们运行多台服务器,这意味着有多个实例在执行同一项任务,分片就是为了告诉这些实例各自该处理那些数据,最大限度的降低数据重复处理的问题,同时加快任务处理速度。每个任务实例该处理哪些数据,是根据分片项来的,在任务代码层面,就可以根据分片项来进行逻辑判断。举例说明,如果作业分为 4 片,用两台服务器执行,则每个服务器分到 2 片,分别负责作业的 50% 的负载https://img2020.cnblogs.com/blog/874963/202101/874963-20210119102058246-1182296576.png分片项ElasticJob 并不直接提供数据处理的功能,而是将分片项分配至各个运行中的作业服务器,开发者需要自行处理分片项与业务的对应关系。 分片项为数字,始于 0 而终于分片总数减 1。个性化分片参数个性化参数可以和分片项匹配对应关系,用于将分片项的数字转换为更加可读的业务代码。 合理使用个性化参数可以让代码更可读。例如,如果配置为 0=北京,1=上海,2=广州,那么代码中直接使用北京,上海,广州的枚举值即可完成分片项和业务逻辑的对应关系。分片策略https://img2020.cnblogs.com/blog/874963/202101/874963-20210119102850464-1749033282.png平均分片策略根据分片项平均分片。如果作业服务器数量与分片总数无法整除,多余的分片将会顺序的分配至每一个作业服务器。举例说明:
[*]如果 3 台作业服务器且分片总数为9, 则分片结果为:1=, 2=, 3=
[*]如果 3 台作业服务器且分片总数为8, 则分片结果为:1=, 2=, 3=
[*]如果 3 台作业服务器且分片总数为10,则分片结果为:1=, 2=, 3=
奇偶分片策略 根据作业名称哈希值的奇偶数决定按照作业服务器 IP 升序或是降序的方式分片。如果作业名称哈希值是偶数,则按照 IP 地址进行升序分片; 如果作业名称哈希值是奇数,则按照 IP 地址进行降序分片。 可用于让服务器负载在多个作业共同运行时分配的更加均匀。举例说明:
[*]如果 3 台作业服务器,分片总数为2且作业名称的哈希值为偶数,则分片结果为:1 = , 2 = , 3 = []
[*]如果 3 台作业服务器,分片总数为2且作业名称的哈希值为奇数,则分片结果为:3 = , 2 = , 1 = []
轮询分片策略根据作业名称轮询分片。
页: [1]
查看完整版本: ElasticJob 快速上手