分布式调度Elastic-job

分布式调度Elastic-job

1. 概述


1.1什么是任务调度

我们可以思考⼀下下⾯业务场景的解决⽅案:

  • 某电商平台需要每天上午10点,下午3点,晚上8点发放⼀批优惠券
  • 某银⾏系统需要在信⽤卡到期还款⽇的前三天进⾏短信提醒
  • 某财务系统需要在每天凌晨0:10分结算前⼀天的财务数据,统计汇总

以上场景就是任务调度所需要解决的问题

任务调度是为了自动完成特定任务,在约定的特定时刻去执行任务的过程

我们经常使用Spring中提供的定时任务注解@Scheduled 在业务类中⽅法中贴上这个注解

@Scheduled(cron = "0/20 * * * * ? ")
public void doWork(){
//doSomething 
}

然后在启动类上贴上 @EnableScheduling 注解

1.2 为什么需要分布式调度

感觉Spring给我们提供的这个注解可以完成任务调度的功能,好像已经完美解决问题了,为什么还需要 分布式呢?

主要有如下这⼏点原因:

1.单机处理极限:原本1分钟内需要处理1万个订单,但是现在需要1分钟内处理10万个订单;原来⼀个 统计需要1⼩时,现在业务⽅需要10分钟就统计出来。你也许会说,你也可以多线程、单机多进程处 理。的确,多线程并⾏处理可以提⾼单位时间的处理效率,但是单机能⼒毕竟有限(主要是CPU、内存 和磁盘),始终会有单机处理不过来的情况。

2.高可用:单机版的定式任务调度只能在⼀台机器上运⾏,如果程序或者系统出现异常就会导致功能不 可⽤。虽然可以在单机程序实现的⾜够稳定,但始终有机会遇到⾮程序引起的故障,⽽这个对于⼀个系 统的核⼼功能来说是不可接受的。

3.防止重复执行: 在单机模式下,定时任务是没什么问题的。但当我们部署了多台服务,同时⼜每台服务 ⼜有定时任务时,若不进⾏合理的控制在同⼀时间,只有⼀个定时任务启动执⾏,这时,定时执⾏的结 果就可能存在混乱和错误了

这个时候就需要分布式的任务调度来实现了。

1.3 Elastic-Job介绍

Elastic-Job是⼀个分布式调度的解决⽅案,由当当⽹开源,它由两个相互独⽴的⼦项⽬Elastic-job-Lite和 Elastic-Job-Cloud组成,使⽤Elastic-Job可以快速实现分布式任务调度。

Elastic-Job的地址: https://shardingsphere.apache.org/elasticjob/

功能列表:

  • 分布式调度协调

    • 在分布式环境中,任务能够按照指定的调度策略执⾏,并且能够避免同⼀任务多实例重复执⾏。
  • 丰富的调度策略

    • 基于成熟的定时任务作业框架Quartz cron表达式执⾏定时任务。
  • 弹性拓容缩容

    • 当集群中增加⼀个实例,它应当能够被选举被执⾏任务;当集群减少⼀个实例时,他所执⾏的任务 能被转移到别的示例中执⾏。
  • 失效转移

    • 某示例在任务执⾏失败后,会被转移到其他实例执⾏。
  • 错过执行任务重触发

    • 若因某种原因导致作业错过执⾏,⾃动记录错误执⾏的作业,并在下次次作业完成后⾃动触发。
  • ⽀持并行调度

    • ⽀持任务分⽚,任务分⽚是指将⼀个任务分成多个⼩任务在多个实例同时执⾏。
  • 作业分片一致性

    • 当任务被分⽚后,保证同⼀分⽚在分布式环境中仅⼀个执⾏实例。
  • 支持作业生命周期操作

  • 可以动态对任务进⾏开启及停⽌操作。

  • 丰富的作业类型

    • ⽀持Simple、DataFlow、Script三种作业类型

      在这里插入图片描述

    系统架构图

    在这里插入图片描述

2.Elastic-Job快速入门


2.1 环境搭建

2.1.1 版本02.要求
  • JDK 要求1.7以上版本

  • Maven 要求3.0.4及以上版本

  • Zookeeper 要求采取3.4.6以上版本

2.1.2 Zookeeper安装&运行
1. 解压zookeeper-3.4.11.tar.gz, 进入conf目录, 复制zoo_sample.cfg文件, 命名为:zoo.cfg
2. 进入bin目录, 运行zkServer.cmd就可以了.
3. 解压ZooInspector.zip, 运行jar文件

zookeeper客户端可视化工具

在这里插入图片描述

2.1.3 创建Maven项目

添加如下依赖

<dependency><groupId>com.dangdang</groupId><artifactId>elastic-job-lite-core</artifactId><version>2.1.5</version>
</dependency>

2.2 代码实现

2.2.1 任务类
package com.xiaoge;import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.simple.SimpleJob;import java.util.Date;public class MyElasticJob implements SimpleJob {public void execute(ShardingContext shardingContext) {System.out.println("定时任务开始====>" + new Date());}
}
2.2.2 配置类
package com.xiaoge;import com.dangdang.ddframe.job.config.JobCoreConfiguration;
import com.dangdang.ddframe.job.config.simple.SimpleJobConfiguration;
import com.dangdang.ddframe.job.lite.api.JobScheduler;
import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration;
import com.dangdang.ddframe.job.reg.base.CoordinatorRegistryCenter;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperConfiguration;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;public class JobDemo {public static void main(String[] args) {// JobScheduler(注册中心对象, 任务配置对象)new JobScheduler(createRegistryCenter(), createJobConfiguration()).init();}// 注册中心private static CoordinatorRegistryCenter createRegistryCenter() {// 配置zk地址,调度任务的组名ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration("localhost:2181", "elastic-job-demo");// 设置节点超时时间zookeeperConfiguration.setSessionTimeoutMilliseconds(100);// ZookeeperRegistryCenter("zookeeper地址", "项目名")CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(zookeeperConfiguration);regCenter.init();return regCenter;}// 定时任务配置private static LiteJobConfiguration createJobConfiguration() {// 定义作业核⼼配置 newBuilder("任务名称", "cron表达式", "分片数量")JobCoreConfiguration simpleCoreConfig =JobCoreConfiguration.newBuilder("myElasticJob", "0/10 * * * * ?", 1).build();// 定义SIMPLE类型配置 MyElasticJob.class.getCanonicalName()--->获取这个类的权限定类名SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(simpleCoreConfig, MyElasticJob.class.getCanonicalName());// 定义Lite作业根配置 (overwrite(true) 表示zookeeper里面的配置可以覆盖, 如果为false, 设置了一次cron表达式, 第二次修改表达式是不生效的)LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).overwrite(true).build();return simpleJobRootConfig;}
}
2.2.3 测试
  • 运行单个程序,查看是否按照cron表达式的内容进⾏任务的调度

  • 运行多个程序,查看是否只会有⼀个实例进⾏任务调度

  • 运行多个程序后,把正在进行任务调度的进程关掉,查看其它进程是否能继续进⾏任务调度

3.SpringBoot集成Elastic-Job


3.1 添加Maven依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.xiaoge</groupId><artifactId>elastic-job-boot</artifactId><version>1.0-SNAPSHOT</version><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.1.3.RELEASE</version></parent><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>com.dangdang</groupId><artifactId>elastic-job-lite-spring</artifactId><version>2.1.5</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency></dependencies></project>

3.2 相关配置

因为配置中心的地址并不是固定的,所以我们应该把这个地址信息配置在配置文件中,所以在配置⽂件 application.yml中添加配置如下:

elasticjob:url: localhost:2181group-name: elastic-job-boot

zk注册中心配置类:

@Bean
public CoordinatorRegistryCenter registryCenter(@Value("${elasticjob.url}") String zookeeperUrl, @Value("${elasticjob.group-name}") String groupName) {// 配置zk地址,调度任务的组名ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration(zookeeperUrl, groupName);// 设置节点超时时间zookeeperConfiguration.setSessionTimeoutMilliseconds(100);// ZookeeperRegistryCenter("zookeeper地址", "项目名")CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(zookeeperConfiguration);regCenter.init();return regCenter;
}

任务调度配置类:

/*** todo 注意这个方法不能交给 spring 管理, 你要让它是个公共的方法,*      传递不同的jobName(任务名称), cron(cron表达式), shardingTotalCount(分片数量) 生成不同的LiteJobConfiguration, 因为环境不同任务配置不同.*      也有可能别的任务需要这个方法创建* @return*/
public LiteJobConfiguration createJobConfiguration(Class<?> clazz, String cron, Integer shardingTotalCount, String shardingParam) {// 定义作业核⼼配置 newBuilder("任务名称", "cron表达式", "分片数量")JobCoreConfiguration.Builder jobBuilder = JobCoreConfiguration.newBuilder(clazz.getSimpleName(), cron, shardingTotalCount);if (!StringUtils.isEmpty(shardingParam)) {// 分片参数jobBuilder = jobBuilder.shardingItemParameters(shardingParam);}// SimpleJob配置// 定义SIMPLE类型配置 MyElasticJob.class.getCanonicalName()--->获取这个类的权限定类名SimpleJobConfiguration simpleJobConfiguration = new SimpleJobConfiguration(jobBuilder.build(), clazz.getCanonicalName());// 定义Lite作业根配置 (overwrite(true) 表示zookeeper里面的配置可以覆盖, 如果为false, 设置了一次cron表达式, 第二次修改表达式是不生效的)LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfiguration).overwrite(true).build();return simpleJobRootConfig;
}

4.案例需求


需求:数据库中有⼀些列的数据,需要对这些数据进行备份操作,备份完之后,修改数据的状态,标记已 经备份了.

4.1 初始化数据

在数据库中导⼊ elastic-job-demo.sql 数据

4.2 集成Druid&MyBatis

4.2.1 添加依赖
<dependency><groupId>com.alibaba</groupId><artifactId>druid</artifactId><version>1.1.10</version>
</dependency>
<dependency><groupId>org.mybatis.spring.boot</groupId><artifactId>mybatis-spring-boot-starter</artifactId><version>1.2.0</version>
</dependency>
<!--mysql驱动-->
<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId>
</dependency>
4.2.2 添加配置
spring:datasource:url: jdbc:mysql://localhost:3306/elastic-job-demo?serverTimezone=GMT%2B8driverClassName: com.mysql.jdbc.Drivertype: com.alibaba.druid.pool.DruidDataSourceusername: rootpassword: root
4.2.3 添加实体类

package com.xiaoge.domain;import lombok.Data;@Data
public class FileCustom {//唯⼀标识private Long id;//⽂件名private String name;//⽂件类型private String type;//⽂件内容private String content;//是否已备份private Boolean backedUp = false;public FileCustom() {}public FileCustom(Long id, String name, String type, String content) {this.id = id;this.name = name;this.type = type;this.content = content;}
}
4.2.4 添加Mapper处理类
package com.xiaoge.mapper;import com.xiaoge.domain.FileCustom;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Select;
import org.apache.ibatis.annotations.Update;import java.util.List;@Mapper
public interface FileCustomMapper {@Select("select * from t_file_custom where backedUp = 0")List<FileCustom> selectAll();@Update("update t_file_custom set backedUp = #{state} where id = #{id}")int changeState(@Param("id") Long id, @Param("state") int state);
}

4.3 业务功能实现

4.3.1 添加任务类
package com.xiaoge.service;import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.simple.SimpleJob;
import com.xiaoge.domain.FileCustom;
import com.xiaoge.mapper.FileCustomMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.util.List;
import java.util.concurrent.TimeUnit;@Slf4j
@Component
public class FileCustomElasticJob implements SimpleJob {@Autowiredprivate FileCustomMapper fileCustomMapper;@Overridepublic void execute(ShardingContext shardingContext) {doWork();}private void doWork(){List<FileCustom> fileList = fileCustomMapper.selectAll();System.out.println("需要备份⽂件个数:"+fileList.size());for(FileCustom fileCustom:fileList){backUpFile(fileCustom);}}private void backUpFile(FileCustom fileCustom){try {//模拟备份动作TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("执⾏⽂件备份====>"+fileCustom);fileCustomMapper.changeState(fileCustom.getId(),1);}
}
4.3.2 添加任务调度配置

在配置类中新增这个Bean

/*** todo 注意一个ElasticJob里面不管有多少实例, 只会有一个被调度, 那就是zookeeper选出来的leader* @param myElasticJob* @param regCenter* @return*/
@Bean(initMethod = "init")
public SpringJobScheduler initSpringScheduler(ElasticJob myElasticJob, CoordinatorRegistryCenter regCenter) {LiteJobConfiguration simpleJobRootConfig = createJobConfiguration(myElasticJob.getClass(), "0/10 * * * * ?", 1);return new SpringJobScheduler(myElasticJob, regCenter, simpleJobRootConfig);
}

4.4 测试&问题

为了高可用,我们会对这个项⽬做集群的操作,可以保证其中⼀台挂了,另外⼀台可以继续⼯作.但是在集 群的情况下,调度任务只在⼀台机器上运行,如果单个任务调度⽐较耗时,耗资源的情况下,对这台机器 的消耗还是比较大的, 但是这个时候,其他机器却是空闲着的.如何合理的利用集群的其他机器且如何让任务执行得更快些呢? 这时候Elastic-Job提供了任务调度分片的功能.

5.分片概念


作业分片是指任务的分布式执行,需要将⼀个任务拆分为多个独立的任务项,然后由分布式的应用实 例分别执行某⼀个或者几个分布项。

例如:Elastic-Job快速入门中文件备份的案例,现有两台服务器,每台服务器分别跑⼀个应用实例。 为了快速执行作业,那么可以讲任务分成4片,每个应⽤实例都执行两片。作业遍历数据逻辑应为:实例 1查找text和image类型⽂件执⾏备份,实例2查找radio和vedio类型⽂件执⾏备份。如果由于服务器拓 容应⽤实例数量增加为4,则作业遍历数据的逻辑应为: 4个实例分别处理text,image,radio,video类型的 ⽂件。

例如:Elastic-Job快速入门中文件备份的案例,现有两台服务器,每台服务器分别跑⼀个应⽤实例。 为了快速执行作业,那么可以讲任务分成4片,每个应⽤实例都执行两片。作业遍历数据逻辑应为:实例 1查找text和image类型文件执行备份,实例2查找radio和vedio类型文件执行备份。如果由于服务器拓 容应⽤实例数量增加为4,则作业遍历数据的逻辑应为: 4个实例分别处理text,image,radio,video类型的 文件。

分片项与业务处理解耦

Elastic-Job并不直接提供数据处理的功能,框架只会将分⽚项分配⾄各个运⾏中的作业服务器,开发者 需要自行处理分⽚项与真实数据的对应关系

最大限度利用资源

将分片项设置大于服务器的数据,最好是⼤于服务器倍数的数量,作业将会合理利⽤分布式资源,动态 的分配分片项.

例如: 3台服务器,分成10片,则分片项结果为服务器A=0,1,2;服务器B=3,4,5;服务器C=6,7,8,9.如果 服务器C奔溃,则分片项分配结果为服务器A=0,1,2,3,4;服务器B=5,6,7,8,9.在不丢失分⽚项的情况下,最大限度利⽤现有的资源提高吞吐量.

6.案例改造成任务分片


6.1 配置类修改

在任务配置类中增加分片个数以及分片参数.

@Bean(initMethod = "init")
public SpringJobScheduler initFileCustomElasticJob(FileCustomElasticJobfileCustomElasticJob){SpringJobScheduler springJobScheduler = new SpringJobScheduler(fileCustomElasticJob,registryCenter,createJobConfiguration(FileCustomElasticJob.class,"0 0/1 * * *?",4,"0=text,1=image,2=radio,3=vedio"));return springJobScheduler;
}

6.2 新增作业分片逻辑

package com.xiaoge.service;import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.simple.SimpleJob;
import com.xiaoge.domain.FileCustom;
import com.xiaoge.mapper.FileCustomMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.util.List;
import java.util.concurrent.TimeUnit;@Slf4j
@Component
public class FileCustomElasticJob implements SimpleJob {@Autowiredprivate FileCustomMapper fileCustomMapper;@Overridepublic void execute(ShardingContext shardingContext) {long threadId = Thread.currentThread().getId();log.info("线程ID: {}, 任务的名称: {}, 任务参数: {}, 分片个数: {}, 分片索引号: {}, 分片参数: {}",threadId,shardingContext.getJobName(),shardingContext.getJobParameter(),shardingContext.getShardingTotalCount(),shardingContext.getShardingItem(),shardingContext.getShardingParameter());doWork(shardingContext.getShardingParameter());}private void doWork(String shardingParameter) {List<FileCustom> fileList = fileCustomMapper.selectFileCustomByType(shardingParameter);log.info("需要备份⽂件个数{}: {}", shardingParameter, fileList.size());for (FileCustom fileCustom : fileList) {backUpFile(fileCustom);}}private void backUpFile(FileCustom fileCustom) {try {//模拟备份动作TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("执⾏⽂件备份====>" + fileCustom);fileCustomMapper.changeState(fileCustom.getId(), 1);}
}

6.3 Mapper类修改

package com.xiaoge.mapper;import com.xiaoge.domain.FileCustom;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Select;
import org.apache.ibatis.annotations.Update;import java.util.List;@Mapper
public interface FileCustomMapper {@Select("select * from t_file_custom where backedUp = 0")List<FileCustom> selectAll();@Select("select * from t_file_custom where backedUp = 0 and type = #{type}")List<FileCustom> selectFileCustomByType(@Param("type") String type);@Update("update t_file_custom set backedUp = #{state} where id = #{id}")int changeState(@Param("id") Long id, @Param("state") int state);
}

6.4 测试

  • 只有⼀台机器的情况下,任务分片是如何执行的

  • 有多台机器的情况下,任务分片是如何执行的

7.Dataflow类型调度任务


Dataflow类型的定时任务需要实现Dataflowjob接⼝,该接⼝提供2个⽅法供覆盖,分别⽤于抓取 (fetchData)和处理(processData)数据,我们继续对例⼦进⾏改造。

Dataflow类型的定时任务需要实现Dataflowjob接⼝,该接⼝提供2个⽅法供覆盖,分别⽤于抓取 (fetchData)和处理(processData)数据,我们继续对例子进行改造。

7.1 任务类

package com.xiaoge.service;import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.dataflow.DataflowJob;
import com.xiaoge.domain.FileCustom;
import com.xiaoge.mapper.FileCustomMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;import java.util.List;
import java.util.concurrent.TimeUnit;/*** TODO 处理大数据量的时候用那个DataFlow这种方式** @author <a href="mailto:1330137071@qq.com">Zhang Xiao</a>* @since*/
@Component
public class FileDataFlowJob implements DataflowJob<FileCustom> {@Autowiredprivate FileCustomMapper fileCustomMapper;// 抓取数据@Overridepublic List<FileCustom> fetchData(ShardingContext shardingContext) {System.out.println("开始抓取数据...........");return fileCustomMapper.selectLimit(shardingContext.getShardingParameter(), 2);}// 处理数据@Overridepublic void processData(ShardingContext shardingContext, List<FileCustom> fileCustomList) {fileCustomList.forEach(fileCustom -> {backUpFile(fileCustom);});}private void backUpFile(FileCustom fileCustom) {System.out.println("备份的方法名: " + fileCustom.getName() + "备份的类型: " + fileCustom.getType());try {//模拟备份动作TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("执⾏⽂件备份====>" + fileCustom);fileCustomMapper.changeState(fileCustom.getId(), 1);}
}

7.2 配置类

package com.xiaoge.config;import com.dangdang.ddframe.job.config.JobCoreConfiguration;
import com.dangdang.ddframe.job.config.JobTypeConfiguration;
import com.dangdang.ddframe.job.config.dataflow.DataflowJobConfiguration;
import com.dangdang.ddframe.job.config.simple.SimpleJobConfiguration;
import com.dangdang.ddframe.job.event.JobEventConfiguration;
import com.dangdang.ddframe.job.event.rdb.JobEventRdbConfiguration;
import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration;
import com.dangdang.ddframe.job.lite.spring.api.SpringJobScheduler;
import com.dangdang.ddframe.job.reg.base.CoordinatorRegistryCenter;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperConfiguration;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;
import com.xiaoge.service.FileCustomElasticJob;
import com.xiaoge.service.FileDataFlowJob;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import javax.sql.DataSource;/*** TODO** @author <a href="mailto:1330137071@qq.com">Zhang Xiao</a>* @since*/
@Configuration
public class ElasticJobConfig {/*** todo 注意一个ElasticJob里面不管有多少实例, 只会有一个被调度, 那就是zookeeper选出来的leader* @param myElasticJob* @param regCenter* @return*/
//    @Bean(initMethod = "init")
//    public SpringJobScheduler testScheduler(ElasticJob myElasticJob, CoordinatorRegistryCenter regCenter) {
//        LiteJobConfiguration simpleJobRootConfig = createJobConfiguration(myElasticJob.getClass(), "0/10 * * * * ?", 1);
//        return new SpringJobScheduler(myElasticJob, regCenter, simpleJobRootConfig);
//    }//    @Bean(initMethod = "init")
//    public SpringJobScheduler fileScheduler(FileCustomElasticJob fileCustomElasticJob, CoordinatorRegistryCenter regCenter){
//        SpringJobScheduler springJobScheduler = new SpringJobScheduler(fileCustomElasticJob,regCenter,createJobConfiguration(fileCustomElasticJob.getClass(),"0 0/1 * * * ?",4, "0=text,1=image,2=radio,3=vedio", false));
//        return springJobScheduler;
//    }@Bean(initMethod = "init")public SpringJobScheduler fileDataFlowScheduler(FileDataFlowJob fileDataFlowJob, CoordinatorRegistryCenter regCenter){SpringJobScheduler springJobScheduler = new SpringJobScheduler(fileDataFlowJob,regCenter,createJobConfiguration(fileDataFlowJob.getClass(),"0 0/1 * * * ?",4, "0=text,1=image,2=radio,3=vedio", true));return springJobScheduler;}//    @Bean(initMethod = "init")
//    public SpringJobScheduler test1Scheduler(ElasticJob myElasticJob1, CoordinDataRevisionatorRegistryCenter regCenter) {
//        LiteJobConfiguration simpleJobRootConfig = createJobConfiguration(myElasticJob1.getClass(), "0/3 * * * * ?", 1);
//        return new SpringJobScheduler(myElasticJob1, regCenter, simpleJobRootConfig);
//    }@Beanpublic CoordinatorRegistryCenter registryCenter(@Value("${elasticjob.url}") String zookeeperUrl, @Value("${elasticjob.group-name}") String groupName) {// 配置zk地址,调度任务的组名ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration(zookeeperUrl, groupName);// 设置节点超时时间zookeeperConfiguration.setSessionTimeoutMilliseconds(100);// ZookeeperRegistryCenter("zookeeper地址", "项目名")CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(zookeeperConfiguration);regCenter.init();return regCenter;}/*** todo 注意这个方法不能交给 spring 管理, 你要让它是个公共的方法,*      传递不同的jobName(任务名称), cron(cron表达式), shardingTotalCount(分片数量) 生成不同的LiteJobConfiguration, 因为环境不同任务配置不同.*      也有可能别的任务需要这个方法创建* @return*/public LiteJobConfiguration createJobConfiguration(Class<?> clazz, String cron, Integer shardingTotalCount, String shardingParam, boolean isDataFlow) {// 定义作业核⼼配置 newBuilder("任务名称", "cron表达式", "分片数量")JobCoreConfiguration.Builder jobBuilder = JobCoreConfiguration.newBuilder(clazz.getSimpleName(), cron, shardingTotalCount);if (!StringUtils.isEmpty(shardingParam)) {// 分片参数jobBuilder = jobBuilder.shardingItemParameters(shardingParam);}JobTypeConfiguration jobConfiguration;if (isDataFlow) {// DataflowJob配置jobConfiguration = new DataflowJobConfiguration(jobBuilder.build(), clazz.getCanonicalName(), true);} else {// SimpleJob配置// 定义SIMPLE类型配置 MyElasticJob.class.getCanonicalName()--->获取这个类的权限定类名jobConfiguration = new SimpleJobConfiguration(jobBuilder.build(), clazz.getCanonicalName());}// 定义Lite作业根配置 (overwrite(true) 表示zookeeper里面的配置可以覆盖, 如果为false, 设置了一次cron表达式, 第二次修改表达式是不生效的)LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(jobConfiguration).overwrite(true).build();return simpleJobRootConfig;}}

7.3 测试

8.运维管理


8.1 事件追踪

Elastic-Job-Lite在配置中提供了JobEventConfiguration,⽀持数据库⽅式配置,会在数据库中⾃动创建 JOB_EXECUTION_LOG和JOB_STATUS_TRACE_LOG两张表以及若⼲索引来近路作业的相关信息。

8.1.1 修改Elastic-Job配置类

在ElasticJobConfig配置类中注⼊DataSource

@Configuration
public class ElasticJobConfig {@Autowiredprivate DataSource dataSource;......
}

在任务配置中增加事件追踪配置

@Bean(initMethod = "init")public SpringJobScheduler fileDataFlowScheduler(FileDataFlowJob fileDataFlowJob, CoordinatorRegistryCenter regCenter){// 日志监控, 它会自动在数据库生成两张表job_execution_log/job_status_trace_log// 配置会在任务执行的时间将任务执行的情况存储到数据源中JobEventConfiguration jobEventConfiguration = new JobEventRdbConfiguration(dataSource);SpringJobScheduler springJobScheduler = new SpringJobScheduler(fileDataFlowJob,regCenter,createJobConfiguration(fileDataFlowJob.getClass(),"0 0/1 * * * ?",4, "0=text,1=image,2=radio,3=vedio", true), jobEventConfiguration);return springJobScheduler;
}
8.1.2 日志信息表

启动后会发现在elastic-job-demo数据库中新增以下两张表

job_execution_log

在这里插入图片描述

记录每次作业的执行历史,分为两个步骤:

1.作业开始执⾏时间想数据库插⼊数据.

2.作业完成执⾏时向数据库更新数据,更新is_success,complete_time和failure_cause(如果任务执行失败)

job_status_trace_log

在这里插入图片描述

记录作业状态变更痕迹表,可通过每次作业运行的task_id查询作业状态变化的⽣命轨迹和运行轨迹.

8.2 运维控制台

elastic-job中提供了⼀个elastic-job-lite-console控制台

设计理念

1.本 控制台和Elastic-Job并⽆直接关系,是通过读取Elastic-Job的注册中心数据展示作业状态,或更新注 册中心数据修改全局配置。

2.控制台只能控制任务本身是否运行,但不能控制作业进程的启停,因为控制台和作业本身服务器是完 全分布式的,控制台并不能控制作业服务器。

主要功能:

1.查看作业以及服务器状态

2.快捷的修改以及删除作业配置

3.启用和禁用作业

4.跨注册中心查看作业

5.查看作业运行轨迹和运行状态

不支持项

1.添加作业,因为作业都是在首次运行时自动添加,使用控制台添加作业并无必要.直接在作业服务器启 动包含Elasitc-Job的作业进程即可。

8.2.1 搭建步骤
  • 解压缩 elastic-job-lite-console-2.1.5.tar

  • 进⼊bin⽬录,并执⾏:

    bin\start.bat
    
  • 打开浏览器访问 http://localhost:8899 ⽤户名: root 密码: root,进⼊之后界⾯如下:

    在这里插入图片描述

    提供两种⽤户:管理员和访客,管理员拥有全部操作权限,访客仅拥有查看权限。默认管理员账号和密码是root/root,访客⽤户名和密码是guest/guest,通过conf\auth.properties可以修改管理员以及访客⽤ 户名及密码

8.2.2 配置及使用
  • 配置注册中心地址 先启动zookeeper然后再注册中心配置界面,点添加

    在这里插入图片描述

  • 点击提交后,然后点连接(zookeeper必须处于启动状态)

    在这里插入图片描述

  • 连接成功后,在作业纬度下可以显示该命名空间作业名称,分⽚数量及该作业的cron表达式等信息 在服务器纬度可以查看到服务器ip,当前运⾏的是实例数,作业总数等信息。

    在这里插入图片描述

  • 添加数据库连接之后可以查看任务的执行结果

    在这里插入图片描述

  • 然后在作业历史中就可以看到任务执行历史了。

    在这里插入图片描述
    demo下载地址: https://download.csdn.net/download/zsx1314lovezyf/88282573

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/72183.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

PostMan传时间参数一次性发送多次请求

文章目录 1. Date类型的参数&#xff0c; "date": "2023-09-07 22:01:51"格式会报错2. 在Pre-request Script预置时间3. 使用postman一次性发送多次请求 1. Date类型的参数&#xff0c; “date”: "2023-09-07 22:01:51"格式会报错 2. 在Pre-req…

算法 数据结构 斐波那契数列 递归实现斐波那契数列 斐波那契递归的优化 斐波那契数列递归求解 多路递归实现 斐波那契算法系列 数据结构(十一)

1. 什么是斐波那契数列&#xff1a; 之前的例子是每个递归函数只包含一个自身的调用&#xff0c;这称之为 single recursion 如果每个递归函数例包含多个自身调用&#xff0c;称之为 multi recursion 递推关系 下面的表格列出了数列的前几项 F0F1F2F3F4F5F6F7F8F9F10F11F12…

前端 JS 经典:上传文件

重点&#xff1a;multipart/form-data 后端识别上传类型必填 1. form 表单上传 <!-- enctype"multipart/form-data" 这个必填 --> <form action"http://127.0.0.1:8080/users/avatar" method"post" enctype"multipart/form-data…

SQL语言的分类:DDL(数据库、表的增、删、改)、DML(数据的增、删、改)

数据库管理系统&#xff08;数据库软件&#xff09;功能非常多&#xff0c;不仅仅是存储数据&#xff0c;还要包含&#xff1a;数据的管理、表的管理、库的管理、账户管理、权限管理等。 操作数据库的SQL语言&#xff0c;基于功能&#xff0c;划分为4类&#xff1a; 1、数据定…

2023国赛数学建模E题思路代码 黄河水沙监测数据分析

E题最大的难度是数据处理&#xff0c;可以做一个假设&#xff0c;假设一定时间内流量跟含沙量不变&#xff0c;那么我们可以对数据进行向下填充&#xff0c;把所有的数据进行合并之后可以对其进行展开特性分析&#xff0c;在研究调水调沙的实际效果时&#xff0c;可以先通过分析…

企业微信、飞书、钉钉机器人消息发送工具类

1、实例化WebClient对象 其实你也可以使用RestTemplate&#xff0c;我这里主要是用到了webflux框架&#xff0c;所以需要实例化客户端请求对象 Bean public WebClient webClient(){HttpClient httpClient getHttpClient();return WebClient.builder().clientConnector(new R…

使用半导体材料制作霍尔元件的优点

霍尔元件是一种基于霍尔效应的传感器&#xff0c;可以测量磁场强度和电流等物理量。霍尔效应是指&#xff0c;当电流通过一块导体时&#xff0c;如果该导体置于垂直于电流方向的磁场中&#xff0c;就会在导体两侧出现一定的电势差&#xff0c;这就是霍尔效应。霍尔元件可以利用…

PHP8函数包含文件-PHP8知识详解

在php中&#xff0c;可以使用以下函数来包含其他文件&#xff1a;include()、include_once()、require()、require_once()。 1、include(): 包含并运行指定文件中的代码。如果文件不存在或包含过程中出现错误&#xff0c;将发出警告。 <?php include filename.php; ?>…

与 vmx86 驱动程序的版本不匹配: 预期为 410.0,实际为 401.0

与 vmx86 驱动程序的版本不匹配: 预期为 410.0&#xff0c;实际为 401.0。 驱动程序“vmx86.sys”的版本不正确。请尝试重新安装 VMware Workstation。 我电脑历史上装过几个版本的vmware workstation: 怀疑是不兼容版本生成的vmx.86.sys 在系统中和该软件冲突&#xff0c;又没…

Redis总结(三)

目录 什么是缓存预热、缓存雪崩、缓存击穿、缓存穿透&#xff1f; 缓存预热 缓存雪崩 解决方案 针对Redis故障宕机 针对大量key同时过期 缓存击穿 解决方案 缓存穿透 解决方案 总结 数据库和缓存如何保证一致性&#xff1f; 先更新缓存还是先更新数据库&#xff1…

【sgLazyCascader】自定义组件:基于el-cascader的懒加载级联菜单,支持异步加载子级菜单

sgLazyCascader源码 <template><div :class"$options.name"><el-cascader :props"props" v-model"model" :placeholder"placeholder || 请选择" :options"options"></el-cascader></div> &l…

【Docker】镜像的创建、管理与发布

镜像的获取 镜像可以从以下方式获得&#xff1a; 从远程镜像仓库拉取&#xff0c;可以是公有仓库&#xff0c;也可以是私有仓库从Dockerfile构建从文件导入&#xff08;离线&#xff09;从容器提交 镜像的基本操作 跟镜像相关的命令如下&#xff1a; $ docker image --help…

Android 11.0 ota升级之Systemui下拉状态栏quick_settings_tiles_default值减少时更新的功能实现

1.前言 在11.0的系统rom定制化开发中,在定制功能需求中,在进行systemui的下拉状态栏定制以后,当需要ota升级的时候,发现在systemui下拉状态栏的快捷功能键部分去掉的 一些快捷功能并没有减少,这是因为systemui有缓存造成的只有清理缓存或者恢复出厂设置后才正常,所以今天…

【算法题】2651. 计算列车到站时间

题目&#xff1a; 给你一个正整数 arrivalTime 表示列车正点到站的时间&#xff08;单位&#xff1a;小时&#xff09;&#xff0c;另给你一个正整数 delayedTime 表示列车延误的小时数。 返回列车实际到站的时间。 注意&#xff0c;该问题中的时间采用 24 小时制。 示例 1…

大数据-玩转数据-Flink状态编程(上)

一、Flink状态编程 有状态的计算是流处理框架要实现的重要功能&#xff0c;因为稍复杂的流处理场景都需要记录状态&#xff0c;然后在新流入数据的基础上不断更新状态。 SparkStreaming在状态管理这块做的不好, 很多时候需要借助于外部存储(例如Redis)来手动管理状态, 增加了编…

macbookpro怎么删除软件没有鼠标

macbookpro怎么删除软件没有鼠标,macbookpro触摸板可以替代鼠标进行操作。左右键功能与鼠标相同&#xff0c;可用于执行删除操作。此外&#xff0c;还可以利用键盘上的Delete键来删除选中的文件。 删除软件方法 方法1、打开应用程序&#xff0c;键盘按住control&#xff0c;加点…

数据结构与算法之贪心动态规划

一&#xff1a;思考 1.某天早上公司领导找你解决一个问题&#xff0c;明天公司有N个同等级的会议需要使用同一个会议室&#xff0c;现在给你这个N个会议的开始和结束 时间&#xff0c;你怎么样安排才能使会议室最大利用&#xff1f;即安排最多场次的会议&#xff1f;电影的话 那…

高等数学教材重难点题型总结(四)不定积分

难点在于量级&#xff0c;不定积分一定要多练多见才能游刃有余~ 1.利用求导公式验证等式 2.计算不定积分

C语言——指针完全版

目录 一、指针的运算 1.1指针 - 整数 1.2指针 - 指针 二、指针遍历数组 2.1指针遍历数组 1.了解数组名称的含义&#xff08;&数组名和数组名的区别&#xff09;。 2.用指针遍历数组 三、指针数组、数组指针、函数指针 3.1指针数组 3.1.1指针数组的形式 3.1.2指针…

【自学笔记】如何在 Python 中使用 YAML 文件? 了解 YAML 格式和规范

文章目录 如何在 Python 中使用 YAML 文件YAML 的格式、规范和需要注意的点YAML 的缩进对象块其语法规范在 Python 中使用 PyYAML 模块安装 PyYAML 模块使用 PyYAML 模块读取和写入 YAML 文件读取 YAML 文件写入 YAML 文件load() 和 safe_load() 的区别总结如何在 Python 中使用…