工具篇--分布式定时任务springBoot 整合 elasticjob使用(3)

文章目录

  • 前言
  • 一、Springboot 整合:
    • 1.1 引入jar:
    • 1.2 配置zookeeper 注册中心:
    • 1.3 定义job 业务类:
    • 1.4 job 注册到zookeeper:
    • 1.5 项目启动:
      • 1.5.1 zookeeper 注册中心实例:
      • 1.5.2 任务执行日志输出:
  • 二、扩展:
    • 2.1 任务监听器:
    • 2. 2 DataflowJob 流工作:
    • 2.2.1 新建 DataflowJob:
    • 2.2.2 streaming.process 属性配置:
    • 2.2.3 执行效果:
  • 总结


前言

本文对springBoot 整合 elasticjob 进行介绍。 本文环境 jdk:1.8 ; zookeeper : 3.7


一、Springboot 整合:

1.1 引入jar:

<!-- https://mvnrepository.com/artifact/org.apache.shardingsphere.elasticjob/elasticjob-lite-core --><dependency><groupId>org.apache.shardingsphere.elasticjob</groupId><artifactId>elasticjob-lite-core</artifactId><!-- <version>3.0.4</version>--><version>3.0.1</version></dependency><dependency><groupId>org.yaml</groupId><artifactId>snakeyaml</artifactId><version>1.27</version></dependency><dependency><groupId>ch.qos.logback</groupId><artifactId>logback-classic</artifactId><version>1.2.12</version></dependency>

1.2 配置zookeeper 注册中心:

ElasticJobZookeeper:

import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperConfiguration;
import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperRegistryCenter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class ElasticJobZookeeper {@Bean(initMethod = "init")public  CoordinatorRegistryCenter createRegistryCenter() {ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration("localhost:2181", "elastic-job");zookeeperConfiguration.setConnectionTimeoutMilliseconds(10000);zookeeperConfiguration.setSessionTimeoutMilliseconds(10000);zookeeperConfiguration.setMaxSleepTimeMilliseconds(10000);CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(zookeeperConfiguration);return regCenter;}
}

1.3 定义job 业务类:

MyJob:

import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
import org.springframework.stereotype.Component;@Slf4j
@Component
public class MyJob implements SimpleJob {@Overridepublic void execute(ShardingContext shardingContext) {// 分片参数 0=text,1=image,2=radio,3=vedioString  shardingParameter= shardingContext.getShardingParameter();String  jobParameter= shardingContext.getJobParameter();log.debug("job 执行 error,job名称:{},分片数量:{},分片:{},分片参数:{},jobParamer:{}", shardingContext.getJobName(), shardingContext.getShardingTotalCount(),shardingContext.getShardingItem(), shardingParameter,jobParameter);if ("text".equals(jobParameter)) {// do something by sharding}switch (shardingContext.getShardingItem()) {case 0:// do something by sharding item 0break;case 1:// do something by sharding item 1break;case 2:// do something by sharding item 2break;// case n: ...}}
}

MyJob1:

import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
import org.springframework.stereotype.Component;@Slf4j
@Component
public class MyJob1 implements SimpleJob {@Overridepublic void execute(ShardingContext shardingContext) {log.debug("job 执行 error,job名称:{},分片数量:{}",shardingContext.getJobName(),shardingContext.getShardingTotalCount());switch (shardingContext.getShardingItem()) {case 0:// do something by sharding item 0break;case 1:// do something by sharding item 1break;case 2:// do something by sharding item 2break;// case n: ...}}
}

1.4 job 注册到zookeeper:

ElasticJobConfigure

import com.example.springelasticjob.quickstart.MyJob;
import com.example.springelasticjob.quickstart.MyJob1;
import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.ScheduleJobBootstrap;
import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;@Configuration
public class ElasticJobConfigure implements InitializingBean {@Autowiredprivate MyJob myJob;@Autowiredprivate MyJob1 myJob1;@Autowiredprivate CoordinatorRegistryCenter coordinatorRegistryCenter;public JobConfiguration createJobConfiguration(Class<? extends SimpleJob> JobClass, int shardingTotalCount, String cron, String shardingItemParameters) {// 创建作业配置JobConfiguration jobConfiguration = JobConfiguration.newBuilder(JobClass.getName(), shardingTotalCount).cron(cron).overwrite(true).shardingItemParameters(shardingItemParameters).jobListenerTypes().build();return jobConfiguration;}@Overridepublic void afterPropertiesSet() throws Exception {JobConfiguration jobConfiguration = createJobConfiguration(myJob.getClass(), 1, "0/10 * * * * ?", null);new ScheduleJobBootstrap(coordinatorRegistryCenter, myJob, jobConfiguration).schedule();JobConfiguration jobConfiguration1 = createJobConfiguration(myJob1.getClass(), 1, "0/1 * * * * ?", null);new ScheduleJobBootstrap(coordinatorRegistryCenter, myJob1, jobConfiguration1).schedule();}
}

1.5 项目启动:

1.5.1 zookeeper 注册中心实例:

在这里插入图片描述

1.5.2 任务执行日志输出:

在这里插入图片描述

二、扩展:

2.1 任务监听器:

  1. 定义监听器:
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.time.DateFormatUtils;
import org.apache.shardingsphere.elasticjob.infra.listener.ElasticJobListener;
import org.apache.shardingsphere.elasticjob.infra.listener.ShardingContexts;import java.util.Date;@Slf4j
public class MyElasticJobListener implements ElasticJobListener {private long beginTime = 0;@Overridepublic void beforeJobExecuted(ShardingContexts shardingContexts) {beginTime = System.currentTimeMillis();log.info("===>{} MyElasticJobListener BEGIN TIME: {} <===",shardingContexts.getJobName(),  DateFormatUtils.format(new Date(), "yyyy-MM-dd HH:mm:ss"));}@Overridepublic void afterJobExecuted(ShardingContexts shardingContexts) {long endTime = System.currentTimeMillis();log.info("===>{} MyElasticJobListener END TIME: {},TOTAL CAST: {} <===",shardingContexts.getJobName(), DateFormatUtils.format(new Date(), "yyyy-MM-dd HH:mm:ss"), endTime - beginTime);}@Overridepublic String getType() {return "myElasticJobListener";}}

2) 在项目resources 新建文件夹: META-INF\services
在这里插入图片描述
3)新建文件,名称为:org.apache.shardingsphere.elasticjob.infra.listener.ElasticJobListener
文集内容:

# 监听器实现类的 类全路径
com.example.springelasticjob.config.MyElasticJobListener

4)job 配置增加监听器:

// 创建作业配置JobConfiguration jobConfiguration = JobConfiguration.newBuilder("myjob-param", 1).cron("0/5 * * * * ?").overwrite(true).shardingItemParameters("0=Beijing,1=Shanghai,2=Guangzhou").jobParameter("0=a,1=b,2=c").jobListenerTypes("myElasticJobListener").build();

jobListenerTypes(“myElasticJobListener”) 中 “myElasticJobListener” 要和 MyElasticJobListener getType() 返回的保持一致,否则启动无法找到 监听器:
在这里插入图片描述

2. 2 DataflowJob 流工作:

2.2.1 新建 DataflowJob:


import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
import org.apache.shardingsphere.elasticjob.dataflow.job.DataflowJob;import java.util.ArrayList;
import java.util.List;/*** 流任务*/
@Slf4j
public class MyDataFlowJob implements DataflowJob {@Overridepublic List fetchData(ShardingContext shardingContext) {// 抓取数据// 分片参数 0=text,1=image,2=radio,3=vedioString jobParameter = shardingContext.getJobParameter();log.debug("job 执行 error,job名称:{},分片数量:{},分片:{},分片参数:{}", shardingContext.getJobName(), shardingContext.getShardingTotalCount(), shardingContext.getShardingItem(), jobParameter);List list = new ArrayList(1);list.add("lgx");return list;}@Overridepublic void processData(ShardingContext shardingContext, List list) {// 数据处理System.out.println("list.toString() = " + list.toString());}
}

2.2.2 streaming.process 属性配置:

 private static JobConfiguration createJobConfiguration() {JobConfiguration jobConfiguration = JobConfiguration.newBuilder("myjob-dataflow-param", 1).cron("0/30 * * * * ?").overwrite(true).shardingItemParameters("0=Beijing,1=Shanghai,2=Guangzhou").jobParameter("0=a,1=b,2=c")//  streaming.process 流处理设置为true.setProperty("streaming.process","true").build();return jobConfiguration;}

2.2.3 执行效果:

虽然任务是每隔30s 执行一次,但是因为 fetchData 可以一直获取到数据,使的 processData 方法可以一直被调用:
在这里插入图片描述


总结

本文对 springBoot 整合 elasticjob 整合,监听器的使用,任务的流式处理做介绍。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/740470.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Ubuntu 14.04:PaddleOCR基于PaddleServing的在线服务化部署(失败)

一、 二、安装 注&#xff1a; 安装 PaddleOCR 2.3 。 因为 PaddleOCR 2.4 的 推荐环境 PaddlePaddle > 2.1.2。 https://github.com/PaddlePaddle/PaddleOCR/blob/release/2.4/doc/doc_ch/environment.md 安装前的环境准备 在使用Paddle Serving之前&#xff0c;需要完…

flink重温笔记(十五): flinkSQL 顶层 API ——实时数据流转化为SQL表的操作

Flink学习笔记 前言&#xff1a;今天是学习 flink 的第 15 天啦&#xff01;学习了 flinkSQL 基础入门&#xff0c;主要是解决大数据领域数据处理采用表的方式&#xff0c;而不是写复杂代码逻辑&#xff0c;学会了如何初始化环境&#xff0c;鹅湖将流数据转化为表数据&#xff…

数据结构和算法:复杂度分析

算法效率评估 时间效率&#xff1a;算法运行速度的快慢。 空间效率&#xff1a;算法占用内存空间的大小。 效率评估方法主要分为两种&#xff1a;实际测试、理论估算 实际测试问题&#xff1a; 1.难以排除测试环境的干扰因素。 硬件配置会影响算法的性能。需要在各种机器上进…

Web 常用的 扩展开发框架

当谈到提升浏览器功能和用户体验时&#xff0c;浏览器扩展成了一股强大的力量&#xff0c;备受用户青睐。在众多的Web扩展开发框架中&#xff0c;WXT和Plasmo凭借其丰富的工具和特性&#xff0c;以及简化的开发流程&#xff0c;成为开发者们的首选。在本文中&#xff0c;我们将…

【嵌入式DIY实例】-DIY锂电池电压检测表

DIY锂电池电压检测表 文章目录 DIY锂电池电压检测表1、直流电压检测传感器介绍2、硬件准备3、代码实现4、OLED显示在电子应用中,通常需要使用到电池,电源管理是必不可少的部分。本文将详细介绍如何使用一个0-25V的直流电压传感器来检测锂电池的电压。 1、直流电压检测传感器介…

数据分析-Pandas如何画自相关图

数据分析-Pandas如何画自相关图 数据分析和处理中&#xff0c;难免会遇到各种数据&#xff0c;那么数据呈现怎样的规律呢&#xff1f;不管金融数据&#xff0c;风控数据&#xff0c;营销数据等等&#xff0c;莫不如此。如何通过图示展示数据的规律&#xff1f; 数据表&#x…

C#,文字排版的折行问题(Word-wrap problem)的算法与源代码

1、英文的折行问题 给定一个单词序列&#xff0c;以及一行中可以输入的字符数限制&#xff08;线宽&#xff09;。 在给定的顺序中放置换行符&#xff0c;以便打印整齐。 假设每个单词的长度小于线宽。 像MS word这样的文字处理程序负责放置换行符。 这个想法是要有平衡的线条。…

Android kotlin开启协程的几种方式

在Android开发中&#xff0c;使用Kotlin协程&#xff08;coroutines&#xff09;可以极大地简化异步编程的复杂性&#xff0c;提高代码的可读性和可维护性。以下是几种在Android Kotlin项目中开启协程的常用方式&#xff1a; 1. 使用GlobalScope.launch 这是最简单直接的开启…

2024.3.12-408学习笔记-C-C++

1、引用& #include <stdio.h>void modify_pointer(int* &p1, int* q1) {p1 q1; }int main() {int* p NULL;int i 10;int* q &i;modify_pointer(p, q);printf("after modify_pointer *p %d\n", *p);//after modify_pointer *p 10return 0; }…

专业140+总分430+西南交通大学924信号与系统考研经验电子信息与通信工程,真题,大纲,参考书

今年报考西南交通大学&#xff0c;考研分数专业课924信号与系统140&#xff0c;总分430&#xff0c;各门分数都还是比较均衡&#xff0c;经过一年的复习&#xff0c;有得有失&#xff0c;总结一下自己的复习经历&#xff0c;希望给大家有点帮助&#xff0c;在复习中做的更好&am…

Android 使用AIDL HAL

生成的目录结构 以audioControl 为例: 首先编写的是aidl文件。 其文件目录结构是:── android │ └── hardware │ └── automotive │ └── audiocontrol │ ├── AudioFocusChange.aidl │ ├── AudioGainConf…

LeetCode 1409.查询带键的排列

给定一个正整数数组 queries &#xff0c;其取值范围在 1 到 m 之间。 请你根据以下规则按顺序处理所有 queries[i]&#xff08;从 i0 到 iqueries.length-1&#xff09;&#xff1a; 首先&#xff0c;你有一个排列 P[1,2,3,…,m]。 对于当前的 i &#xff0c;找到 queries[i]…

Sklearn基本算法

sklearn&#xff08;Scikit-learn&#xff09;是一个非常流行的Python机器学习库&#xff0c;它提供了一系列简单高效的算法和工具&#xff0c;适用于各种机器学习任务。下面是一些基本的机器学习算法类别和对应的常用算法&#xff1a; 分类算法 逻辑回归&#xff08;Logisti…

请列出60个Python热点面试题目

以下是60个Python热点面试题目&#xff0c;涵盖了Python基础知识、数据类型、面向对象编程、函数和模块、文件操作、错误处理、并发编程、数据库操作、网络编程、框架和库等多个方面&#xff1a; 谈谈你对Python语言的理解&#xff0c;它有哪些主要特点&#xff1f;Python有哪…

钉钉平台“智”领宠物界,开启萌宠智能新时代!

在当前数字化转型的浪潮中&#xff0c;钉钉用便捷的数字化解决方案推动了宠物业界的智能升级。一家宠物用品公司采用无雀科技数字化管理系统&#xff0c;与钉钉平台结合&#xff0c;解决了小型企业普遍存在的财务管理不清晰、业务流程不规范、客户信息核对繁琐等痛点问题。 针对…

html5cssjs代码 012 我的像册

html5&css&js代码 012 我的像册 一、代码二、解释 这段HTML代码定义了一个简单的网页&#xff0c;实现了一个简洁、响应式的图片相册页面。 一、代码 <!DOCTYPE html> <html lang"zh-cn"> <head><title>编程笔记 html5&css&…

AHU 汇编 实验六

一、实验名称&#xff1a;实验6 输入一个16进制数&#xff0c;把它转换为10进制数输出 实验目的&#xff1a; 培养汇编中设计子程序的能力 实验过程&#xff1a; 源代码&#xff1a; data segmentbuff1 db Please input a number(H):$buff2 db 30,?,30 dup(?),13,10buff3 …

QT进阶---------pro项目文件中的常用命令 (第三天)

1、命令一 决定exe可执行程序的生成路径CONFIG 作用&#xff1a;不使用默认路径&#xff0c;方便移植 CONFIG(debug, debug|release) {DESTDIR $$_PRO_FILE_PWD_/../../../debugXXXsystem } else {DESTDIR $$_PRO_FILE_PWD_/../../../realeaseXXXsystem } 是用于 Qt 项目…

多维时序 | Matlab实现VMD-CNN-LSTM变分模态分解结合卷积神经网络结合长短期记忆神经网络多变量时间序列预测

多维时序 | Matlab实现VMD-CNN-LSTM变分模态分解结合卷积神经网络结合长短期记忆神经网络多变量时间序列预测 目录 多维时序 | Matlab实现VMD-CNN-LSTM变分模态分解结合卷积神经网络结合长短期记忆神经网络多变量时间序列预测预测效果基本介绍程序设计参考资料 预测效果 基本介…

大数据开发(HBase面试真题-卷二)

大数据开发&#xff08;HBase面试真题&#xff09; 1、HBase读写数据流程&#xff1f;2、HBase的读写缓存&#xff1f;3、在删除HBase中的一个数据的时候&#xff0c;它什么时候真正的进行删除呢&#xff1f;4、HBase的一个region由哪些东西组成&#xff1f;5、HBase的rowkey为…