使用Ruoyi的定时任务组件结合XxlCrawler进行数据增量同步实战-以中国地震台网为例

目录

前言

一、数据增量更新机制

1、全量更新机制

2、增量更新机制

二、功能时序图设计

1、原始请求分析

2、业务时序图

三、后台定时任务的设计与实现

四、Ruoyi自动任务配置

1、Ruoyi自动任务配置

2、任务调度

 总结


前言

        在之前的相关文章中,发表文章列表:在Java中使用XxlCrawler时防止被反爬的几种方式,基于Java的XxlCrawler网络信息爬取实战-以中国地震台网为例。在这两篇博客当中,我们介绍了XxlCralwer组件,以及如何进行爬虫反爬对抗。这两篇博客都是讲解的全量更新,即第一次全部抓取数据。在我们实际的信息爬取过程当中,肯定会有增量更新的问题,比如定期更新。不要爬取全部的数据,而只是抓取更新的数据,在进行多次的增量更新之后,就能实现数据源的数据一致性。这样的需求很常见,但是在以往的博客中很少进行提及。

        本文就是在这样的需求背景下诞生的,我们需要使用XxlCrawler组件对中国地震外网的地震信息进行增量同步。在第一次全量获取数据后,后面在系统运行过程当中,不需要人工干预,我们采用自动任务的方式,将信息抓取的过程完全有程序来完成。文章首先讲解了一般数据同步的方法,然后使用代码的方式介绍在本文中使用的数据同步方法,如何进行重复数据去重,最后讲解如何进行入库,结合Ruoyi中的定时任务组件,讲解如何进行定时任务计划的制定和运行,讲解如何将定时任务和爬虫关联起来。对有对信息进行增量更新的需求有一定的参考意义。

一、数据增量更新机制

        要想实现数据的动态更新,一般包含两种更新方式。即全量更新方式和增量更新方式,全量更新模式顾名思义,就是每次进行数据同步时都是全量数据同步。增量更新是基于全量更新基础之上的,每次的数据同步都采用增量的数据同步,即将变化的数据,新增的或者修改的,删除的数据同步到下游系统。下面结合中国地震台网数据,使用两种更新方式进行说明。

1、全量更新机制

        全量更新通常应用在数据的首次同步上,往往第一次需要上游系统的所有数据。因此有必要对上游系统进行全量同步。同时也是因为首次同步时,下游系统中往往还没有数据,因此不需要考虑数据重复的问题。只要将爬取的数据进行新增即可。全量更新的技术难度较低,再此不再进行赘述。

2、增量更新机制

        在首次同步好了全量数据以后,要想实现增量更新,数据增量同步ETL每次只处理增、删、改的变化数据,减少大量非变化数据同步。与数据全量同步ETL相比,数据增量同步ETL可以用最少的资源提高数据同步效率。其大致的思路有以下几种:

        1. 时间戳:最常见的方式。但是在业务系统里,不是每张表都有时间戳。

        2. 触发器:可靠性较高。但是对业务系统数据库性能损耗较大。

        3. 全量对比得出增量数据更新:对源数据库的性能损耗较小。但是大数据量对比更新时,对工具的性能开销需求较大。

        4. 全量对比MD5方式:建立一个结构类似的MD5临时表,通过MD5校验码比对。

        5. 日志解析:常见的数据库具备日志归档等功能,从日志获取变化数据,通过代码来开发和管理。

        通过之前的台网数据抓取得到的数据可以了解到,在我们的数据表格中是包含了时间字段了,因此我们可以使用基于时间戳的方案来实现台网数据的增量更新。

二、功能时序图设计

        为了实现中国地震外网数据定时同步,这里我们采用面向对象分析(OOA)的模式进行。同时为了比较清晰的说明其同步机制,我们将其大致的业务调用时序图进行设计。本节即主要描述增量同步的时序图设计。

1、原始请求分析

        为了简单介绍相关接口及正确获取增量数据,首先我们来看一下中国地震台网的更新接口。在其历史查询的接口列表中。我们可以看到其可以实现日期查询,具体的功能查询界面如下:

        然后我们来看一下实际的请求接口,打开控制台的网络监控Tab页:

https://www.ceic.ac.cn/ajax/search?page=1&&start=2024-05-01&&end=&&jingdu1=&&jingdu2=&&weidu1=&&weidu2=&&height1=&&height2=&&zhenji1=&&zhenji2=&&callback=jQuery18005659035271001251_1714803161229&_=1714807031625

        可以很明显看到,它的开始日期参数放到了start中,在start中绑定了开始查询参数。 在我们接口请求中也是利用这个时间戳字段来实现信息的增量更新。

2、业务时序图

        为了实现信息请求接口的增量更新,同时在实际请求当中,可能会遇到的数据重复判断的问题,因此我们需要进行数据的去重。我们使用cata_id和epi_id进行去重处理。具体的业务时序图如下:

        从时序图来看,大致的数据同步过程分了10步,下面针对每一步来进行简单说明:

        1.1:首先从数据库中获取已经同步数据的最大日期作为同步基础时间戳。

        1.2:如果基础时间戳不为空,在此时间戳之上,我们采用N-1,这么做是为了避免数据历史数据没有及时同步,这里可能会有数据重复,因此一定要进行数据去重的机制保证数据的一致性。

        1.3-1.5:从数据库中获取同步时间戳和拼接系统请求时间戳的方法。

        1.6:构造XxlCrawler爬取器,设置页面处理对象,进行首页信息爬取。

        1.7-1.8:定义下一页爬取规则,实现自动爬取所有其它页面的数据。

        1.9:启动信息抓取器,进行爬取。

三、后台定时任务的设计与实现

        在明确了相关的定时任务之后,这里我们在Ruoyi的框架下进行开发,需要按照时序图设计相关的时序逻辑。完整的增量更新任务类关键代码如下:

package com.yelang.project.monitor.job.task;
import java.util.Date;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.xuxueli.crawler.XxlCrawler;
import com.xuxueli.crawler.parser.strategy.NonPageParser;
import com.yelang.common.utils.DateUtils;
import com.yelang.common.utils.StringUtils;
import com.yelang.project.extend.earthquake.domain.crawler.CeicDateAdapter;
import com.yelang.project.extend.earthquake.domain.crawler.CeicEarthquake;
import com.yelang.project.extend.earthquake.domain.crawler.CeicEarthquakeCrawler;
import com.yelang.project.extend.earthquake.service.ICeicEarthquakeService;
@Component("ceicEqIncrementalUpdateTask")
public class CeicEqIncrementalUpdateTask {private Logger logger = LoggerFactory.getLogger(CeicEqIncrementalUpdateTask.class);private static final String USER_AGENT = "\"Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/102.0.0.0 Safari/537.36\"";private static final String commonUrl = "https://www.ceic.ac.cn/ajax/search?end=&&jingdu1=&&jingdu2=&&weidu1=&&weidu2=&&height1=&&height2=&&zhenji1=&&zhenji2=&_=";@Autowiredprivate ICeicEarthquakeService eqService;/*** 默认的增量更新方法,从数据库中获取当前更新的最大日期,以此为更新条件*/public void defaultIncrementalUpdate() {logger.info("执行无参方法");CeicEarthquake ceicEq = eqService.getMaxOtime();if (null == ceicEq) {logger.info("增量更新时间点为空!");return;}if (null != ceicEq) {Date yesterday = DateUtils.getPrevOneDay(ceicEq.getOTime());String yesterday_str = DateUtils.parseDateToStr(DateUtils.YYYY_MM_DD, yesterday);String firstUrl = commonUrl + System.currentTimeMillis() + "&&start=" + yesterday_str + "&&page=1";NonPageParser firstPageParse = new NonPageParser() {public void parse(String url, String pageSource) {dataParse(true, url, pageSource, yesterday_str);}};// 构造爬虫XxlCrawler crawler = buildXxlCrawler(firstPageParse, firstUrl);crawler.start(false);// 启动异步执行}}private void incrementalUpdate(String startDate, Integer pageNum) {String[] urlList = new String[pageNum - 1];for (int i = 0; i < pageNum - 1; i++) {urlList[i] = commonUrl + System.currentTimeMillis() + "&&start=" + startDate + "&&page=" + (2 + i);}NonPageParser dataParser = new NonPageParser() {public void parse(String url, String pageSource) {dataParse(false, url, pageSource, null);}};// 构造爬虫XxlCrawler crawler = this.buildXxlCrawler(dataParser, urlList);crawler.start(false);// 启动}private XxlCrawler buildXxlCrawler(NonPageParser pageParser, String... urls) {// 构造爬虫XxlCrawler crawler = new XxlCrawler.Builder().setUrls(urls)// 设置请求URL.setThreadCount(3)// 设置线程数.setPauseMillis(2000)// 设置暂停时间.setUserAgent(USER_AGENT)// 设置User-Agent.setIfPost(false).setFailRetryCount(3)// 重试三次.setPageParser(pageParser).build();return crawler;}private void dataParse(boolean isFirst, String url, String pageSource, String yesterday_str) {if (!StringUtils.isBlank(pageSource)) {pageSource = pageSource.substring(1, pageSource.length() - 1);Gson gson = new GsonBuilder().registerTypeAdapter(Date.class, new CeicDateAdapter()).create();CeicEarthquakeCrawler crawler = gson.fromJson(pageSource, CeicEarthquakeCrawler.class);Integer dataSize = crawler.getNum();// 获取数据总页数for (CeicEarthquake data : crawler.getShuju()) {String geom = "SRID=" + 4326 + ";POINT (" + data.getEpiLon() + " " + data.getEpiLat() + ")";// 拼接srid,实现动态写入data.setGeom(geom);}eqService.deduplication(crawler.getShuju());// 数据跟数据库去重后入库// 如果是首页且数据总数大于1,表示有多条,需要循环爬取if (isFirst && dataSize > 1) {logger.info("总页数:{}", crawler.getNum());incrementalUpdate(yesterday_str, dataSize);}}}
}

        自动任务的入口方法是defaultIncrementalUpdate(),在这里进行任务的调度。请注意数据转换的统一定义方法如下:

private void dataParse(boolean isFirst, String url, String pageSource, String yesterday_str) {if (!StringUtils.isBlank(pageSource)) {pageSource = pageSource.substring(1, pageSource.length() - 1);Gson gson = new GsonBuilder().registerTypeAdapter(Date.class, new CeicDateAdapter()).create();CeicEarthquakeCrawler crawler = gson.fromJson(pageSource, CeicEarthquakeCrawler.class);Integer dataSize = crawler.getNum();// 获取数据总页数for (CeicEarthquake data : crawler.getShuju()) {String geom = "SRID=" + 4326 + ";POINT (" + data.getEpiLon() + " " + data.getEpiLat() + ")";// 拼接srid,实现动态写入data.setGeom(geom);}eqService.deduplication(crawler.getShuju());// 数据跟数据库去重后入库// 如果是首页且数据总数大于1,表示有多条,需要循环爬取if (isFirst && dataSize > 1) {logger.info("总页数:{}", crawler.getNum());incrementalUpdate(yesterday_str, dataSize);}}}

        实现增量的逻辑是第一次请求的时候,通过返回的num来决定是否往下爬取,只有超过2页才需要进行剩余页面的请求。

private void incrementalUpdate(String startDate, Integer pageNum) {String[] urlList = new String[pageNum - 1];for (int i = 0; i < pageNum - 1; i++) {urlList[i] = commonUrl + System.currentTimeMillis() + "&&start=" + startDate + "&&page=" + (2 + i);}NonPageParser dataParser = new NonPageParser() {public void parse(String url, String pageSource) {dataParse(false, url, pageSource, null);}};// 构造爬虫XxlCrawler crawler = this.buildXxlCrawler(dataParser, urlList);crawler.start(false);// 启动}

        使用数据库的机制进行数据重复判断的逻辑如下:

@Override
public void deduplication(List<CeicEarthquake> eqDataList) {List<CeicEarthquake> saveList = new ArrayList<CeicEarthquake>();for (CeicEarthquake data : eqDataList) {Long size = this.getSizeByCataIdAndEpiId(data.getCataId(), data.getEpiId());if(size >= 1) {continue;//记录数大于1,表示数据中有记录}String geom = "SRID=" + 4326 +";POINT (" + data.getEpiLon()+ " "+data.getEpiLat()+")";//拼接srid,实现动态写入data.setGeom(geom);saveList.add(data);}if(saveList.size() > 0) {this.saveBatch(saveList, 300);}
}

四、Ruoyi自动任务配置

        这里采用Ruoyi进行自动任务配置,在自动任务框架驱动下进行数据的增量更新。因此需要我们进行任务的配置。本节将重点介绍Ruoyi的定时任务配置,以及如何关联到增量同步组件。

1、Ruoyi自动任务配置

        应用程序启动后,在系统监控中打开定时任务子菜单,可以看到系统中定义的所有定时任务列表。

        这里我已经定义了一个台网信息同步的定时任务,默认的是每天1点执行(时间频率请结合实际业务来进行配置),请不要给目标系统造成太大的异常流量。不要太频繁的发起访问。

        这里调用的ceicEqIncrementalUpdateTask.defaultIncrementalUpdate()。具体的调用参考如下:

        Bean调用示例:ryTask.ryParams('ry');

        Class类调用示例:com.yelang.quartz.task.RyTask.ryParams('ry')
 参数说明:支持字符串,布尔类型,长整型,浮点型,整型。

2、任务调度

        在任务创建好之后,我们可以进行任务开启,开启后,任务会自动在后台运行,在制定时间进行触发。由于我们配置的每天1点进行任务创建及运行,因此这里我们选择人工运行的模式,使用手动运行的模式。点击操作按钮中的执行一次。如下图所示:

        执行任务之前,我们来数据库中看一下数据的总条数是12459条:


select count(1)  from biz_ceic_earthquake;

        然后我们来执行任务调度,可以看到控制台进行了信息输出:

        在数据库中可以看到数据总数发生了变化,变成了12460(+1),成功将最新的数据同步到了数据库中。

 总结

        以上就是本文的主要内容, 本文需要使用XxlCrawler组件对中国地震外网的地震信息进行增量同步。在第一次全量获取数据后,后面在系统运行过程当中,不需要人工干预,我们采用自动任务的方式,将信息抓取的过程完全有程序来完成。文章首先讲解了一般数据同步的方法,然后使用代码的方式介绍在本文中使用的数据同步方法,如何进行重复数据去重,最后讲解如何进行入库,结合Ruoyi中的定时任务组件,讲解如何进行定时任务计划的制定和运行,讲解如何将定时任务和爬虫关联起来。对有对信息进行增量更新的需求有一定的参考意义。行文仓促,难免有不足之处,如有不足之处,欢迎各位专家朋友不吝赐教,万分感谢。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/6471.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

2024年 Java 面试八股文——SpringBoot篇

目录 1. 什么是 Spring Boot&#xff1f; 2. 为什么要用SpringBoot 3. SpringBoot与SpringCloud 区别 4. Spring Boot 有哪些优点&#xff1f; 5. Spring Boot 的核心注解是哪个&#xff1f;它主要由哪几个注解组成的&#xff1f; 6. Spring Boot 支持哪些日志框架&#…

数据结构习题--Fizz Buzz

数据结构习题–Fizz Buzz 给你一个整数 n &#xff0c;找出从 1 到 n 各个整数的 Fizz Buzz 表示&#xff0c;并用字符串数组 answer&#xff08;下标从 1 开始&#xff09;返回结果&#xff0c;其中&#xff1a; answer[i] “FizzBuzz” 如果 i 同时是 3 和 5 的倍数。 ans…

【Java 算法实现】合并两个有序数组(逆向双指针)

【Java 算法实现】合并两个有序数组 题目描述 给定两个按非递减顺序排列的整数数组 nums1 和 nums2&#xff0c;以及两个整数 m 和 n&#xff0c;分别表示 nums1 和 nums2 中的元素数目。 你需要将 nums2 合并到 nums1 中&#xff0c;使合并后的数组同样保持非递减顺序排列。…

应用分层和企业规范

目录 一、应用分层 1、介绍 &#xff08;1&#xff09;为什么需要应用分层&#xff1f; &#xff08;2&#xff09;如何分层&#xff1f;&#xff08;三层架构&#xff09; MVC 和 三层架构的区别和联系 高内聚&#xff1a; 低耦合&#xff1a; 2、代码重构 controlle…

2024网络安全面试问题宝典(4万字)

2024网络安全厂商面试问题宝典(4万字) 目录 评分标准网络基础问题 TCP建立连接要进行3次握手&#xff08;syn-syn&#xff0c;ack-ack&#xff09;&#xff0c;而断开连接要进行4次&#xff08;fin-ack-fin-ack&#xff09;TCP&#xff0c;UDP区别&#xff1a;安全常用的协议…

Cloudera最新认证体系-2024Hadoop认证

这里写自定义目录标题 欢迎使用Markdown编辑器新的改变功能快捷键合理的创建标题&#xff0c;有助于目录的生成如何改变文本的样式插入链接与图片如何插入一段漂亮的代码片生成一个适合你的列表创建一个表格设定内容居中、居左、居右SmartyPants 创建一个自定义列表如何创建一个…

K8S哲学 - 资源调度 HPA (horizontal pod autoScaler-sync-period)

kubectl exec&#xff1a; kubectl exec -it pod-name -c container-name -- /bin/sh kubectl run 通过一个 deployment来 演示 apiVersion: apps/v1 kind: Deployment metadata:name: deploylabels: app: deploy spec: replicas: 1selector: matchLabels:app: deploy-podt…

Universal Thresholdizer:将多种密码学原语门限化

参考文献&#xff1a; [LS90] Lapidot D, Shamir A. Publicly verifiable non-interactive zero-knowledge proofs[C]//Advances in Cryptology-CRYPTO’90: Proceedings 10. Springer Berlin Heidelberg, 1991: 353-365.[Shoup00] Shoup V. Practical threshold signatures[C…

你不知道的TCP协议:四次握手!

你不知道的TCP协议&#xff1a;四次握手&#xff01; 前言&#xff1a;我们都知道建立一个tcp连接需要进行三次握手&#xff0c;甚至被问到为什么不是四次握手、两次握手 本文将要介绍tcp协议中的四次握手 正文&#xff1a; 当一个客户端向服务端发起tcp连接请求时&#xf…

YUM源仓库部署

一、YUM仓库服务 1、概述 2、准备安装源 软件仓库的提供方式 YUM软件仓库类型 仓库类型安装路径本地源baseurlfile://…ftp源baseurlftp://…在线源baseurlhttp://… baseurlhttps://… RPM软件包的来源 CentOS发布的RPM包集合第三方组织发布的RPM包集合用户自定义的RPM包…

mac nvm install node<version> error 404

mac m2芯片遇到的问题&#xff0c;估计m系列的应该也有这个问题&#xff0c;在这里记录一下 解决方案&#xff1a; ## 需要先处理一下兼容就OK了arch -x86_64 zsh nvm install returns curl: (22) The requested URL returned error: 404 Issue #2667 nvm-sh/nvm GitHub

【信息系统项目管理师知识点速记】成本管理:制定预算

11.5 制定预算 目的: 制定预算的目的是建立一个经批准的成本基准,汇总项目的所有活动或工作包的成本估算,以便监督和控制项目的绩效。 输入: 项目管理计划: 成本管理计划: 描述如何将项目成本纳入项目预算中。资源管理计划: 提供资源费率、差旅成本估算等信息。范围基…

ue引擎游戏开发笔记(29)——实现第三人称角色随手柄力度进行移动

1.需求分析 角色可以随手柄力量大小进行走路和跑步&#xff0c;不动时保持角色停顿。 2.操作实现 1.思路&#xff1a;通过动画蓝图和动画混合实现角色移动和输入的联系。 2.建立动画蓝图和混合空间&#xff1a; 3.在混合空间中对角色移动进行编辑&#xff1a; 4.在蓝图中设定变…

Nginx(搭建高可用集群)

文章目录 1.基本介绍1.在微服务架构中的位置2.配置前提3.主从模式架构图 2.启动主Nginx和两个Tomcat1.启动linux的tomcat2.启动win的tomcat3.启动主Nginx&#xff0c;进入安装目录 ./sbin/nginx -c nginx.conf4.windows访问 http://look.sunxiansheng.cn:7777/search/cal.jsp 3…

python邮件发送

第一种方式 一&#xff1a;发送的邮件要设置授权码&#xff0c;通过邮箱邮箱授权码去验证&#xff0c;让邮件服务器帮我们去转发邮件到要接收的邮件&#xff0c;代码中的授权码&#xff0c;是需要登录126邮箱&#xff08;我这里是以126邮件发送的&#xff0c;具体的以自己为准…

Mybatis入门2

本文章是下面文章的扩充 Mybatis入门-CSDN博客文章浏览阅读432次&#xff0c;点赞6次&#xff0c;收藏10次。Mapper接口创建在java代码块中//dao层/*** 功能&#xff1a;查询所有用户数据* return*/https://blog.csdn.net/luosuss/article/details/138420052 映射配置文件 i…

【Python可视化】pyecharts

Echarts 是一个由百度开源的数据可视化&#xff0c;凭借着良好的交互性&#xff0c;精巧的图表设计&#xff0c;得到了众多开发者的认可。而 Python 是一门富有表达力的语言&#xff0c;很适合用于数据处理。当数据分析遇上数据可视化时&#xff0c;pyecharts 诞生了。 需要安…

opencv4.8 系列一Trackbar图像管理

创建亮度 static void on_lightness(int b, void* userdata) {Mat image *((Mat*)userdata);dst Mat::zeros(image.size(), image.type());;m Mat::zeros(image.size(), image.type());addWeighted(image,1.0,m,0,b,dst);imshow("亮度与对比度调整",dst); }创建对…

OceanBase 分布式数据库【信创/国产化】- OceanBase 资源单元的均衡

本心、输入输出、结果 文章目录 OceanBase 分布式数据库【信创/国产化】- OceanBase 资源单元的均衡前言OceanBase 数据更新架构资源单元的均衡多种资源CPU 单一资源的均衡示例多种资源占用率的计算资源单元的分配资源单元的均衡资源单元均衡的控制手动迁移资源单元OceanBase 分…

408数据结构-线索二叉树 自学知识点整理

前置知识&#xff1a;二叉树的概念、性质与存储结构 线索二叉树的基本概念 遍历二叉树是以一定的规则将二叉树中的结点排列成一个线性序列&#xff08;如中序遍历序列&#xff09;&#xff0c;从而得到几种遍历序列&#xff0c;使得该序列中的每个结点&#xff08;除了首尾两个…