分布式调度Elastic-job

分布式调度Elastic-job

1. 概述


1.1什么是任务调度

我们可以思考⼀下下⾯业务场景的解决⽅案:

  • 某电商平台需要每天上午10点,下午3点,晚上8点发放⼀批优惠券
  • 某银⾏系统需要在信⽤卡到期还款⽇的前三天进⾏短信提醒
  • 某财务系统需要在每天凌晨0:10分结算前⼀天的财务数据,统计汇总

以上场景就是任务调度所需要解决的问题

任务调度是为了自动完成特定任务,在约定的特定时刻去执行任务的过程

我们经常使用Spring中提供的定时任务注解@Scheduled 在业务类中⽅法中贴上这个注解

@Scheduled(cron = "0/20 * * * * ? ")
public void doWork(){
//doSomething 
}

然后在启动类上贴上 @EnableScheduling 注解

1.2 为什么需要分布式调度

感觉Spring给我们提供的这个注解可以完成任务调度的功能,好像已经完美解决问题了,为什么还需要 分布式呢?

主要有如下这⼏点原因:

1.单机处理极限:原本1分钟内需要处理1万个订单,但是现在需要1分钟内处理10万个订单;原来⼀个 统计需要1⼩时,现在业务⽅需要10分钟就统计出来。你也许会说,你也可以多线程、单机多进程处 理。的确,多线程并⾏处理可以提⾼单位时间的处理效率,但是单机能⼒毕竟有限(主要是CPU、内存 和磁盘),始终会有单机处理不过来的情况。

2.高可用:单机版的定式任务调度只能在⼀台机器上运⾏,如果程序或者系统出现异常就会导致功能不 可⽤。虽然可以在单机程序实现的⾜够稳定,但始终有机会遇到⾮程序引起的故障,⽽这个对于⼀个系 统的核⼼功能来说是不可接受的。

3.防止重复执行: 在单机模式下,定时任务是没什么问题的。但当我们部署了多台服务,同时⼜每台服务 ⼜有定时任务时,若不进⾏合理的控制在同⼀时间,只有⼀个定时任务启动执⾏,这时,定时执⾏的结 果就可能存在混乱和错误了

这个时候就需要分布式的任务调度来实现了。

1.3 Elastic-Job介绍

Elastic-Job是⼀个分布式调度的解决⽅案,由当当⽹开源,它由两个相互独⽴的⼦项⽬Elastic-job-Lite和 Elastic-Job-Cloud组成,使⽤Elastic-Job可以快速实现分布式任务调度。

Elastic-Job的地址: https://shardingsphere.apache.org/elasticjob/

功能列表:

  • 分布式调度协调

    • 在分布式环境中,任务能够按照指定的调度策略执⾏,并且能够避免同⼀任务多实例重复执⾏。
  • 丰富的调度策略

    • 基于成熟的定时任务作业框架Quartz cron表达式执⾏定时任务。
  • 弹性拓容缩容

    • 当集群中增加⼀个实例,它应当能够被选举被执⾏任务;当集群减少⼀个实例时,他所执⾏的任务 能被转移到别的示例中执⾏。
  • 失效转移

    • 某示例在任务执⾏失败后,会被转移到其他实例执⾏。
  • 错过执行任务重触发

    • 若因某种原因导致作业错过执⾏,⾃动记录错误执⾏的作业,并在下次次作业完成后⾃动触发。
  • ⽀持并行调度

    • ⽀持任务分⽚,任务分⽚是指将⼀个任务分成多个⼩任务在多个实例同时执⾏。
  • 作业分片一致性

    • 当任务被分⽚后,保证同⼀分⽚在分布式环境中仅⼀个执⾏实例。
  • 支持作业生命周期操作

  • 可以动态对任务进⾏开启及停⽌操作。

  • 丰富的作业类型

    • ⽀持Simple、DataFlow、Script三种作业类型

      在这里插入图片描述

    系统架构图

    在这里插入图片描述

2.Elastic-Job快速入门


2.1 环境搭建

2.1.1 版本02.要求
  • JDK 要求1.7以上版本

  • Maven 要求3.0.4及以上版本

  • Zookeeper 要求采取3.4.6以上版本

2.1.2 Zookeeper安装&运行
1. 解压zookeeper-3.4.11.tar.gz, 进入conf目录, 复制zoo_sample.cfg文件, 命名为:zoo.cfg
2. 进入bin目录, 运行zkServer.cmd就可以了.
3. 解压ZooInspector.zip, 运行jar文件

zookeeper客户端可视化工具

在这里插入图片描述

2.1.3 创建Maven项目

添加如下依赖

<dependency><groupId>com.dangdang</groupId><artifactId>elastic-job-lite-core</artifactId><version>2.1.5</version>
</dependency>

2.2 代码实现

2.2.1 任务类
package com.xiaoge;import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.simple.SimpleJob;import java.util.Date;public class MyElasticJob implements SimpleJob {public void execute(ShardingContext shardingContext) {System.out.println("定时任务开始====>" + new Date());}
}
2.2.2 配置类
package com.xiaoge;import com.dangdang.ddframe.job.config.JobCoreConfiguration;
import com.dangdang.ddframe.job.config.simple.SimpleJobConfiguration;
import com.dangdang.ddframe.job.lite.api.JobScheduler;
import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration;
import com.dangdang.ddframe.job.reg.base.CoordinatorRegistryCenter;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperConfiguration;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;public class JobDemo {public static void main(String[] args) {// JobScheduler(注册中心对象, 任务配置对象)new JobScheduler(createRegistryCenter(), createJobConfiguration()).init();}// 注册中心private static CoordinatorRegistryCenter createRegistryCenter() {// 配置zk地址,调度任务的组名ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration("localhost:2181", "elastic-job-demo");// 设置节点超时时间zookeeperConfiguration.setSessionTimeoutMilliseconds(100);// ZookeeperRegistryCenter("zookeeper地址", "项目名")CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(zookeeperConfiguration);regCenter.init();return regCenter;}// 定时任务配置private static LiteJobConfiguration createJobConfiguration() {// 定义作业核⼼配置 newBuilder("任务名称", "cron表达式", "分片数量")JobCoreConfiguration simpleCoreConfig =JobCoreConfiguration.newBuilder("myElasticJob", "0/10 * * * * ?", 1).build();// 定义SIMPLE类型配置 MyElasticJob.class.getCanonicalName()--->获取这个类的权限定类名SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(simpleCoreConfig, MyElasticJob.class.getCanonicalName());// 定义Lite作业根配置 (overwrite(true) 表示zookeeper里面的配置可以覆盖, 如果为false, 设置了一次cron表达式, 第二次修改表达式是不生效的)LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).overwrite(true).build();return simpleJobRootConfig;}
}
2.2.3 测试
  • 运行单个程序,查看是否按照cron表达式的内容进⾏任务的调度

  • 运行多个程序,查看是否只会有⼀个实例进⾏任务调度

  • 运行多个程序后,把正在进行任务调度的进程关掉,查看其它进程是否能继续进⾏任务调度

3.SpringBoot集成Elastic-Job


3.1 添加Maven依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.xiaoge</groupId><artifactId>elastic-job-boot</artifactId><version>1.0-SNAPSHOT</version><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.1.3.RELEASE</version></parent><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>com.dangdang</groupId><artifactId>elastic-job-lite-spring</artifactId><version>2.1.5</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency></dependencies></project>

3.2 相关配置

因为配置中心的地址并不是固定的,所以我们应该把这个地址信息配置在配置文件中,所以在配置⽂件 application.yml中添加配置如下:

elasticjob:url: localhost:2181group-name: elastic-job-boot

zk注册中心配置类:

@Bean
public CoordinatorRegistryCenter registryCenter(@Value("${elasticjob.url}") String zookeeperUrl, @Value("${elasticjob.group-name}") String groupName) {// 配置zk地址,调度任务的组名ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration(zookeeperUrl, groupName);// 设置节点超时时间zookeeperConfiguration.setSessionTimeoutMilliseconds(100);// ZookeeperRegistryCenter("zookeeper地址", "项目名")CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(zookeeperConfiguration);regCenter.init();return regCenter;
}

任务调度配置类:

/*** todo 注意这个方法不能交给 spring 管理, 你要让它是个公共的方法,*      传递不同的jobName(任务名称), cron(cron表达式), shardingTotalCount(分片数量) 生成不同的LiteJobConfiguration, 因为环境不同任务配置不同.*      也有可能别的任务需要这个方法创建* @return*/
public LiteJobConfiguration createJobConfiguration(Class<?> clazz, String cron, Integer shardingTotalCount, String shardingParam) {// 定义作业核⼼配置 newBuilder("任务名称", "cron表达式", "分片数量")JobCoreConfiguration.Builder jobBuilder = JobCoreConfiguration.newBuilder(clazz.getSimpleName(), cron, shardingTotalCount);if (!StringUtils.isEmpty(shardingParam)) {// 分片参数jobBuilder = jobBuilder.shardingItemParameters(shardingParam);}// SimpleJob配置// 定义SIMPLE类型配置 MyElasticJob.class.getCanonicalName()--->获取这个类的权限定类名SimpleJobConfiguration simpleJobConfiguration = new SimpleJobConfiguration(jobBuilder.build(), clazz.getCanonicalName());// 定义Lite作业根配置 (overwrite(true) 表示zookeeper里面的配置可以覆盖, 如果为false, 设置了一次cron表达式, 第二次修改表达式是不生效的)LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfiguration).overwrite(true).build();return simpleJobRootConfig;
}

4.案例需求


需求:数据库中有⼀些列的数据,需要对这些数据进行备份操作,备份完之后,修改数据的状态,标记已 经备份了.

4.1 初始化数据

在数据库中导⼊ elastic-job-demo.sql 数据

4.2 集成Druid&MyBatis

4.2.1 添加依赖
<dependency><groupId>com.alibaba</groupId><artifactId>druid</artifactId><version>1.1.10</version>
</dependency>
<dependency><groupId>org.mybatis.spring.boot</groupId><artifactId>mybatis-spring-boot-starter</artifactId><version>1.2.0</version>
</dependency>
<!--mysql驱动-->
<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId>
</dependency>
4.2.2 添加配置
spring:datasource:url: jdbc:mysql://localhost:3306/elastic-job-demo?serverTimezone=GMT%2B8driverClassName: com.mysql.jdbc.Drivertype: com.alibaba.druid.pool.DruidDataSourceusername: rootpassword: root
4.2.3 添加实体类

package com.xiaoge.domain;import lombok.Data;@Data
public class FileCustom {//唯⼀标识private Long id;//⽂件名private String name;//⽂件类型private String type;//⽂件内容private String content;//是否已备份private Boolean backedUp = false;public FileCustom() {}public FileCustom(Long id, String name, String type, String content) {this.id = id;this.name = name;this.type = type;this.content = content;}
}
4.2.4 添加Mapper处理类
package com.xiaoge.mapper;import com.xiaoge.domain.FileCustom;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Select;
import org.apache.ibatis.annotations.Update;import java.util.List;@Mapper
public interface FileCustomMapper {@Select("select * from t_file_custom where backedUp = 0")List<FileCustom> selectAll();@Update("update t_file_custom set backedUp = #{state} where id = #{id}")int changeState(@Param("id") Long id, @Param("state") int state);
}

4.3 业务功能实现

4.3.1 添加任务类
package com.xiaoge.service;import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.simple.SimpleJob;
import com.xiaoge.domain.FileCustom;
import com.xiaoge.mapper.FileCustomMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.util.List;
import java.util.concurrent.TimeUnit;@Slf4j
@Component
public class FileCustomElasticJob implements SimpleJob {@Autowiredprivate FileCustomMapper fileCustomMapper;@Overridepublic void execute(ShardingContext shardingContext) {doWork();}private void doWork(){List<FileCustom> fileList = fileCustomMapper.selectAll();System.out.println("需要备份⽂件个数:"+fileList.size());for(FileCustom fileCustom:fileList){backUpFile(fileCustom);}}private void backUpFile(FileCustom fileCustom){try {//模拟备份动作TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("执⾏⽂件备份====>"+fileCustom);fileCustomMapper.changeState(fileCustom.getId(),1);}
}
4.3.2 添加任务调度配置

在配置类中新增这个Bean

/*** todo 注意一个ElasticJob里面不管有多少实例, 只会有一个被调度, 那就是zookeeper选出来的leader* @param myElasticJob* @param regCenter* @return*/
@Bean(initMethod = "init")
public SpringJobScheduler initSpringScheduler(ElasticJob myElasticJob, CoordinatorRegistryCenter regCenter) {LiteJobConfiguration simpleJobRootConfig = createJobConfiguration(myElasticJob.getClass(), "0/10 * * * * ?", 1);return new SpringJobScheduler(myElasticJob, regCenter, simpleJobRootConfig);
}

4.4 测试&问题

为了高可用,我们会对这个项⽬做集群的操作,可以保证其中⼀台挂了,另外⼀台可以继续⼯作.但是在集 群的情况下,调度任务只在⼀台机器上运行,如果单个任务调度⽐较耗时,耗资源的情况下,对这台机器 的消耗还是比较大的, 但是这个时候,其他机器却是空闲着的.如何合理的利用集群的其他机器且如何让任务执行得更快些呢? 这时候Elastic-Job提供了任务调度分片的功能.

5.分片概念


作业分片是指任务的分布式执行,需要将⼀个任务拆分为多个独立的任务项,然后由分布式的应用实 例分别执行某⼀个或者几个分布项。

例如:Elastic-Job快速入门中文件备份的案例,现有两台服务器,每台服务器分别跑⼀个应用实例。 为了快速执行作业,那么可以讲任务分成4片,每个应⽤实例都执行两片。作业遍历数据逻辑应为:实例 1查找text和image类型⽂件执⾏备份,实例2查找radio和vedio类型⽂件执⾏备份。如果由于服务器拓 容应⽤实例数量增加为4,则作业遍历数据的逻辑应为: 4个实例分别处理text,image,radio,video类型的 ⽂件。

例如:Elastic-Job快速入门中文件备份的案例,现有两台服务器,每台服务器分别跑⼀个应⽤实例。 为了快速执行作业,那么可以讲任务分成4片,每个应⽤实例都执行两片。作业遍历数据逻辑应为:实例 1查找text和image类型文件执行备份,实例2查找radio和vedio类型文件执行备份。如果由于服务器拓 容应⽤实例数量增加为4,则作业遍历数据的逻辑应为: 4个实例分别处理text,image,radio,video类型的 文件。

分片项与业务处理解耦

Elastic-Job并不直接提供数据处理的功能,框架只会将分⽚项分配⾄各个运⾏中的作业服务器,开发者 需要自行处理分⽚项与真实数据的对应关系

最大限度利用资源

将分片项设置大于服务器的数据,最好是⼤于服务器倍数的数量,作业将会合理利⽤分布式资源,动态 的分配分片项.

例如: 3台服务器,分成10片,则分片项结果为服务器A=0,1,2;服务器B=3,4,5;服务器C=6,7,8,9.如果 服务器C奔溃,则分片项分配结果为服务器A=0,1,2,3,4;服务器B=5,6,7,8,9.在不丢失分⽚项的情况下,最大限度利⽤现有的资源提高吞吐量.

6.案例改造成任务分片


6.1 配置类修改

在任务配置类中增加分片个数以及分片参数.

@Bean(initMethod = "init")
public SpringJobScheduler initFileCustomElasticJob(FileCustomElasticJobfileCustomElasticJob){SpringJobScheduler springJobScheduler = new SpringJobScheduler(fileCustomElasticJob,registryCenter,createJobConfiguration(FileCustomElasticJob.class,"0 0/1 * * *?",4,"0=text,1=image,2=radio,3=vedio"));return springJobScheduler;
}

6.2 新增作业分片逻辑

package com.xiaoge.service;import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.simple.SimpleJob;
import com.xiaoge.domain.FileCustom;
import com.xiaoge.mapper.FileCustomMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.util.List;
import java.util.concurrent.TimeUnit;@Slf4j
@Component
public class FileCustomElasticJob implements SimpleJob {@Autowiredprivate FileCustomMapper fileCustomMapper;@Overridepublic void execute(ShardingContext shardingContext) {long threadId = Thread.currentThread().getId();log.info("线程ID: {}, 任务的名称: {}, 任务参数: {}, 分片个数: {}, 分片索引号: {}, 分片参数: {}",threadId,shardingContext.getJobName(),shardingContext.getJobParameter(),shardingContext.getShardingTotalCount(),shardingContext.getShardingItem(),shardingContext.getShardingParameter());doWork(shardingContext.getShardingParameter());}private void doWork(String shardingParameter) {List<FileCustom> fileList = fileCustomMapper.selectFileCustomByType(shardingParameter);log.info("需要备份⽂件个数{}: {}", shardingParameter, fileList.size());for (FileCustom fileCustom : fileList) {backUpFile(fileCustom);}}private void backUpFile(FileCustom fileCustom) {try {//模拟备份动作TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("执⾏⽂件备份====>" + fileCustom);fileCustomMapper.changeState(fileCustom.getId(), 1);}
}

6.3 Mapper类修改

package com.xiaoge.mapper;import com.xiaoge.domain.FileCustom;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Select;
import org.apache.ibatis.annotations.Update;import java.util.List;@Mapper
public interface FileCustomMapper {@Select("select * from t_file_custom where backedUp = 0")List<FileCustom> selectAll();@Select("select * from t_file_custom where backedUp = 0 and type = #{type}")List<FileCustom> selectFileCustomByType(@Param("type") String type);@Update("update t_file_custom set backedUp = #{state} where id = #{id}")int changeState(@Param("id") Long id, @Param("state") int state);
}

6.4 测试

  • 只有⼀台机器的情况下,任务分片是如何执行的

  • 有多台机器的情况下,任务分片是如何执行的

7.Dataflow类型调度任务


Dataflow类型的定时任务需要实现Dataflowjob接⼝,该接⼝提供2个⽅法供覆盖,分别⽤于抓取 (fetchData)和处理(processData)数据,我们继续对例⼦进⾏改造。

Dataflow类型的定时任务需要实现Dataflowjob接⼝,该接⼝提供2个⽅法供覆盖,分别⽤于抓取 (fetchData)和处理(processData)数据,我们继续对例子进行改造。

7.1 任务类

package com.xiaoge.service;import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.dataflow.DataflowJob;
import com.xiaoge.domain.FileCustom;
import com.xiaoge.mapper.FileCustomMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;import java.util.List;
import java.util.concurrent.TimeUnit;/*** TODO 处理大数据量的时候用那个DataFlow这种方式** @author <a href="mailto:1330137071@qq.com">Zhang Xiao</a>* @since*/
@Component
public class FileDataFlowJob implements DataflowJob<FileCustom> {@Autowiredprivate FileCustomMapper fileCustomMapper;// 抓取数据@Overridepublic List<FileCustom> fetchData(ShardingContext shardingContext) {System.out.println("开始抓取数据...........");return fileCustomMapper.selectLimit(shardingContext.getShardingParameter(), 2);}// 处理数据@Overridepublic void processData(ShardingContext shardingContext, List<FileCustom> fileCustomList) {fileCustomList.forEach(fileCustom -> {backUpFile(fileCustom);});}private void backUpFile(FileCustom fileCustom) {System.out.println("备份的方法名: " + fileCustom.getName() + "备份的类型: " + fileCustom.getType());try {//模拟备份动作TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("执⾏⽂件备份====>" + fileCustom);fileCustomMapper.changeState(fileCustom.getId(), 1);}
}

7.2 配置类

package com.xiaoge.config;import com.dangdang.ddframe.job.config.JobCoreConfiguration;
import com.dangdang.ddframe.job.config.JobTypeConfiguration;
import com.dangdang.ddframe.job.config.dataflow.DataflowJobConfiguration;
import com.dangdang.ddframe.job.config.simple.SimpleJobConfiguration;
import com.dangdang.ddframe.job.event.JobEventConfiguration;
import com.dangdang.ddframe.job.event.rdb.JobEventRdbConfiguration;
import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration;
import com.dangdang.ddframe.job.lite.spring.api.SpringJobScheduler;
import com.dangdang.ddframe.job.reg.base.CoordinatorRegistryCenter;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperConfiguration;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;
import com.xiaoge.service.FileCustomElasticJob;
import com.xiaoge.service.FileDataFlowJob;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import javax.sql.DataSource;/*** TODO** @author <a href="mailto:1330137071@qq.com">Zhang Xiao</a>* @since*/
@Configuration
public class ElasticJobConfig {/*** todo 注意一个ElasticJob里面不管有多少实例, 只会有一个被调度, 那就是zookeeper选出来的leader* @param myElasticJob* @param regCenter* @return*/
//    @Bean(initMethod = "init")
//    public SpringJobScheduler testScheduler(ElasticJob myElasticJob, CoordinatorRegistryCenter regCenter) {
//        LiteJobConfiguration simpleJobRootConfig = createJobConfiguration(myElasticJob.getClass(), "0/10 * * * * ?", 1);
//        return new SpringJobScheduler(myElasticJob, regCenter, simpleJobRootConfig);
//    }//    @Bean(initMethod = "init")
//    public SpringJobScheduler fileScheduler(FileCustomElasticJob fileCustomElasticJob, CoordinatorRegistryCenter regCenter){
//        SpringJobScheduler springJobScheduler = new SpringJobScheduler(fileCustomElasticJob,regCenter,createJobConfiguration(fileCustomElasticJob.getClass(),"0 0/1 * * * ?",4, "0=text,1=image,2=radio,3=vedio", false));
//        return springJobScheduler;
//    }@Bean(initMethod = "init")public SpringJobScheduler fileDataFlowScheduler(FileDataFlowJob fileDataFlowJob, CoordinatorRegistryCenter regCenter){SpringJobScheduler springJobScheduler = new SpringJobScheduler(fileDataFlowJob,regCenter,createJobConfiguration(fileDataFlowJob.getClass(),"0 0/1 * * * ?",4, "0=text,1=image,2=radio,3=vedio", true));return springJobScheduler;}//    @Bean(initMethod = "init")
//    public SpringJobScheduler test1Scheduler(ElasticJob myElasticJob1, CoordinDataRevisionatorRegistryCenter regCenter) {
//        LiteJobConfiguration simpleJobRootConfig = createJobConfiguration(myElasticJob1.getClass(), "0/3 * * * * ?", 1);
//        return new SpringJobScheduler(myElasticJob1, regCenter, simpleJobRootConfig);
//    }@Beanpublic CoordinatorRegistryCenter registryCenter(@Value("${elasticjob.url}") String zookeeperUrl, @Value("${elasticjob.group-name}") String groupName) {// 配置zk地址,调度任务的组名ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration(zookeeperUrl, groupName);// 设置节点超时时间zookeeperConfiguration.setSessionTimeoutMilliseconds(100);// ZookeeperRegistryCenter("zookeeper地址", "项目名")CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(zookeeperConfiguration);regCenter.init();return regCenter;}/*** todo 注意这个方法不能交给 spring 管理, 你要让它是个公共的方法,*      传递不同的jobName(任务名称), cron(cron表达式), shardingTotalCount(分片数量) 生成不同的LiteJobConfiguration, 因为环境不同任务配置不同.*      也有可能别的任务需要这个方法创建* @return*/public LiteJobConfiguration createJobConfiguration(Class<?> clazz, String cron, Integer shardingTotalCount, String shardingParam, boolean isDataFlow) {// 定义作业核⼼配置 newBuilder("任务名称", "cron表达式", "分片数量")JobCoreConfiguration.Builder jobBuilder = JobCoreConfiguration.newBuilder(clazz.getSimpleName(), cron, shardingTotalCount);if (!StringUtils.isEmpty(shardingParam)) {// 分片参数jobBuilder = jobBuilder.shardingItemParameters(shardingParam);}JobTypeConfiguration jobConfiguration;if (isDataFlow) {// DataflowJob配置jobConfiguration = new DataflowJobConfiguration(jobBuilder.build(), clazz.getCanonicalName(), true);} else {// SimpleJob配置// 定义SIMPLE类型配置 MyElasticJob.class.getCanonicalName()--->获取这个类的权限定类名jobConfiguration = new SimpleJobConfiguration(jobBuilder.build(), clazz.getCanonicalName());}// 定义Lite作业根配置 (overwrite(true) 表示zookeeper里面的配置可以覆盖, 如果为false, 设置了一次cron表达式, 第二次修改表达式是不生效的)LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(jobConfiguration).overwrite(true).build();return simpleJobRootConfig;}}

7.3 测试

8.运维管理


8.1 事件追踪

Elastic-Job-Lite在配置中提供了JobEventConfiguration,⽀持数据库⽅式配置,会在数据库中⾃动创建 JOB_EXECUTION_LOG和JOB_STATUS_TRACE_LOG两张表以及若⼲索引来近路作业的相关信息。

8.1.1 修改Elastic-Job配置类

在ElasticJobConfig配置类中注⼊DataSource

@Configuration
public class ElasticJobConfig {@Autowiredprivate DataSource dataSource;......
}

在任务配置中增加事件追踪配置

@Bean(initMethod = "init")public SpringJobScheduler fileDataFlowScheduler(FileDataFlowJob fileDataFlowJob, CoordinatorRegistryCenter regCenter){// 日志监控, 它会自动在数据库生成两张表job_execution_log/job_status_trace_log// 配置会在任务执行的时间将任务执行的情况存储到数据源中JobEventConfiguration jobEventConfiguration = new JobEventRdbConfiguration(dataSource);SpringJobScheduler springJobScheduler = new SpringJobScheduler(fileDataFlowJob,regCenter,createJobConfiguration(fileDataFlowJob.getClass(),"0 0/1 * * * ?",4, "0=text,1=image,2=radio,3=vedio", true), jobEventConfiguration);return springJobScheduler;
}
8.1.2 日志信息表

启动后会发现在elastic-job-demo数据库中新增以下两张表

job_execution_log

在这里插入图片描述

记录每次作业的执行历史,分为两个步骤:

1.作业开始执⾏时间想数据库插⼊数据.

2.作业完成执⾏时向数据库更新数据,更新is_success,complete_time和failure_cause(如果任务执行失败)

job_status_trace_log

在这里插入图片描述

记录作业状态变更痕迹表,可通过每次作业运行的task_id查询作业状态变化的⽣命轨迹和运行轨迹.

8.2 运维控制台

elastic-job中提供了⼀个elastic-job-lite-console控制台

设计理念

1.本 控制台和Elastic-Job并⽆直接关系,是通过读取Elastic-Job的注册中心数据展示作业状态,或更新注 册中心数据修改全局配置。

2.控制台只能控制任务本身是否运行,但不能控制作业进程的启停,因为控制台和作业本身服务器是完 全分布式的,控制台并不能控制作业服务器。

主要功能:

1.查看作业以及服务器状态

2.快捷的修改以及删除作业配置

3.启用和禁用作业

4.跨注册中心查看作业

5.查看作业运行轨迹和运行状态

不支持项

1.添加作业,因为作业都是在首次运行时自动添加,使用控制台添加作业并无必要.直接在作业服务器启 动包含Elasitc-Job的作业进程即可。

8.2.1 搭建步骤
  • 解压缩 elastic-job-lite-console-2.1.5.tar

  • 进⼊bin⽬录,并执⾏:

    bin\start.bat
    
  • 打开浏览器访问 http://localhost:8899 ⽤户名: root 密码: root,进⼊之后界⾯如下:

    在这里插入图片描述

    提供两种⽤户:管理员和访客,管理员拥有全部操作权限,访客仅拥有查看权限。默认管理员账号和密码是root/root,访客⽤户名和密码是guest/guest,通过conf\auth.properties可以修改管理员以及访客⽤ 户名及密码

8.2.2 配置及使用
  • 配置注册中心地址 先启动zookeeper然后再注册中心配置界面,点添加

    在这里插入图片描述

  • 点击提交后,然后点连接(zookeeper必须处于启动状态)

    在这里插入图片描述

  • 连接成功后,在作业纬度下可以显示该命名空间作业名称,分⽚数量及该作业的cron表达式等信息 在服务器纬度可以查看到服务器ip,当前运⾏的是实例数,作业总数等信息。

    在这里插入图片描述

  • 添加数据库连接之后可以查看任务的执行结果

    在这里插入图片描述

  • 然后在作业历史中就可以看到任务执行历史了。

    在这里插入图片描述
    demo下载地址: https://download.csdn.net/download/zsx1314lovezyf/88282573

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/72183.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

PostMan传时间参数一次性发送多次请求

文章目录 1. Date类型的参数&#xff0c; "date": "2023-09-07 22:01:51"格式会报错2. 在Pre-request Script预置时间3. 使用postman一次性发送多次请求 1. Date类型的参数&#xff0c; “date”: "2023-09-07 22:01:51"格式会报错 2. 在Pre-req…

算法 数据结构 斐波那契数列 递归实现斐波那契数列 斐波那契递归的优化 斐波那契数列递归求解 多路递归实现 斐波那契算法系列 数据结构(十一)

1. 什么是斐波那契数列&#xff1a; 之前的例子是每个递归函数只包含一个自身的调用&#xff0c;这称之为 single recursion 如果每个递归函数例包含多个自身调用&#xff0c;称之为 multi recursion 递推关系 下面的表格列出了数列的前几项 F0F1F2F3F4F5F6F7F8F9F10F11F12…

前端 JS 经典:上传文件

重点&#xff1a;multipart/form-data 后端识别上传类型必填 1. form 表单上传 <!-- enctype"multipart/form-data" 这个必填 --> <form action"http://127.0.0.1:8080/users/avatar" method"post" enctype"multipart/form-data…

SQL语言的分类:DDL(数据库、表的增、删、改)、DML(数据的增、删、改)

数据库管理系统&#xff08;数据库软件&#xff09;功能非常多&#xff0c;不仅仅是存储数据&#xff0c;还要包含&#xff1a;数据的管理、表的管理、库的管理、账户管理、权限管理等。 操作数据库的SQL语言&#xff0c;基于功能&#xff0c;划分为4类&#xff1a; 1、数据定…

使用半导体材料制作霍尔元件的优点

霍尔元件是一种基于霍尔效应的传感器&#xff0c;可以测量磁场强度和电流等物理量。霍尔效应是指&#xff0c;当电流通过一块导体时&#xff0c;如果该导体置于垂直于电流方向的磁场中&#xff0c;就会在导体两侧出现一定的电势差&#xff0c;这就是霍尔效应。霍尔元件可以利用…

PHP8函数包含文件-PHP8知识详解

在php中&#xff0c;可以使用以下函数来包含其他文件&#xff1a;include()、include_once()、require()、require_once()。 1、include(): 包含并运行指定文件中的代码。如果文件不存在或包含过程中出现错误&#xff0c;将发出警告。 <?php include filename.php; ?>…

与 vmx86 驱动程序的版本不匹配: 预期为 410.0,实际为 401.0

与 vmx86 驱动程序的版本不匹配: 预期为 410.0&#xff0c;实际为 401.0。 驱动程序“vmx86.sys”的版本不正确。请尝试重新安装 VMware Workstation。 我电脑历史上装过几个版本的vmware workstation: 怀疑是不兼容版本生成的vmx.86.sys 在系统中和该软件冲突&#xff0c;又没…

Redis总结(三)

目录 什么是缓存预热、缓存雪崩、缓存击穿、缓存穿透&#xff1f; 缓存预热 缓存雪崩 解决方案 针对Redis故障宕机 针对大量key同时过期 缓存击穿 解决方案 缓存穿透 解决方案 总结 数据库和缓存如何保证一致性&#xff1f; 先更新缓存还是先更新数据库&#xff1…

【sgLazyCascader】自定义组件:基于el-cascader的懒加载级联菜单,支持异步加载子级菜单

sgLazyCascader源码 <template><div :class"$options.name"><el-cascader :props"props" v-model"model" :placeholder"placeholder || 请选择" :options"options"></el-cascader></div> &l…

【Docker】镜像的创建、管理与发布

镜像的获取 镜像可以从以下方式获得&#xff1a; 从远程镜像仓库拉取&#xff0c;可以是公有仓库&#xff0c;也可以是私有仓库从Dockerfile构建从文件导入&#xff08;离线&#xff09;从容器提交 镜像的基本操作 跟镜像相关的命令如下&#xff1a; $ docker image --help…

大数据-玩转数据-Flink状态编程(上)

一、Flink状态编程 有状态的计算是流处理框架要实现的重要功能&#xff0c;因为稍复杂的流处理场景都需要记录状态&#xff0c;然后在新流入数据的基础上不断更新状态。 SparkStreaming在状态管理这块做的不好, 很多时候需要借助于外部存储(例如Redis)来手动管理状态, 增加了编…

macbookpro怎么删除软件没有鼠标

macbookpro怎么删除软件没有鼠标,macbookpro触摸板可以替代鼠标进行操作。左右键功能与鼠标相同&#xff0c;可用于执行删除操作。此外&#xff0c;还可以利用键盘上的Delete键来删除选中的文件。 删除软件方法 方法1、打开应用程序&#xff0c;键盘按住control&#xff0c;加点…

数据结构与算法之贪心动态规划

一&#xff1a;思考 1.某天早上公司领导找你解决一个问题&#xff0c;明天公司有N个同等级的会议需要使用同一个会议室&#xff0c;现在给你这个N个会议的开始和结束 时间&#xff0c;你怎么样安排才能使会议室最大利用&#xff1f;即安排最多场次的会议&#xff1f;电影的话 那…

高等数学教材重难点题型总结(四)不定积分

难点在于量级&#xff0c;不定积分一定要多练多见才能游刃有余~ 1.利用求导公式验证等式 2.计算不定积分

C语言——指针完全版

目录 一、指针的运算 1.1指针 - 整数 1.2指针 - 指针 二、指针遍历数组 2.1指针遍历数组 1.了解数组名称的含义&#xff08;&数组名和数组名的区别&#xff09;。 2.用指针遍历数组 三、指针数组、数组指针、函数指针 3.1指针数组 3.1.1指针数组的形式 3.1.2指针…

【力扣每日一题】2023.9.7 修车的最少时间

目录 题目&#xff1a; 示例&#xff1a; 分析&#xff1a; 代码&#xff1a; 题目&#xff1a; 示例&#xff1a; 分析&#xff1a; 题目给我们一个数值&#xff0c;数组里每个元素表示一个老师傅&#xff0c;老师傅修车花费的时间等于数值乘上车辆数的平方。 问我们修理…

编程语言排行榜

以下是2023年的编程语言排行榜&#xff08;按照流行度排序&#xff09;&#xff1a; Python&#xff1a;Python一直以来都是非常受欢迎的编程语言&#xff0c;它简洁、易读且功能强大。在数据科学、机器学习、人工智能等领域有广泛应用。 JavaScript&#xff1a;作为前端开发…

idea:java: Compilation failed: internal java compiler error

java: Compilation failed: internal java compiler error错误 检查下面2个即可&#xff1a;

docker 生成镜像的几个问题

docker 生成镜像的几个问题 根据jdk8.tar.gz 打包Jdk8 镜像失败运行镜像报错差不多是网络ip错误,在网上说重启docker即可解决运行mysql5.7.25 镜像失败向daemon.json文件添加内容导致docker重启失败docker run 命令常用参数根据jdk8.tar.gz 打包Jdk8 镜像失败 首选做准备工作…

卡牌类游戏推荐,卡牌类三国手游排行榜

以下是小编要推荐给大家的关于卡牌类三国手游排行榜的内容。这里有来自各个历史阶段的名将和美女&#xff0c;让你体验最真实的三国战役。你可以将各种战略思维运用到其中&#xff0c;感受步步为营的喜悦&#xff0c;最终赢得战火纷飞的三国&#xff0c;如果想了解每个游戏的具…