Logstash开启定时任务增量同步mysql数据到es的时区问题

本文使用修改时间modify_date作为增量同步检测字段,可检测新增和修改,检测不到删除,检测删除请使用canal查询binlog日志同步数据

检测修改时间字段为varchar的时候可以先创建索引,并设置对应的mapping为(可以无视时区问题)

...
"time": {"type": "date","format": "yyyy-MM-dd HH:mm:ss||strict_date_optional_time||epoch_millis"}...

检测修改时间字段为datetime的时候需要注意时区问题

前提:正常存储时间,mysql是UTC+8,Logstash 和ES则都使用UTC

下面几个测试得出的结论

使用测试4,不需要考虑客户端连接es的时区问题(直观上忽略es存储的时间格式为UTC,把其当作UTC+8来用)

使用测试1,需要注意es中存储UTC,使用客户端client连接时需要转换对应的事件UTC+8转为UTC

SearchRequest searchRequest = new SearchRequest("stu_sign");
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
LocalDateTime localDateTime1 = LocalDateTime.of(2021, 1, 1, 8, 0, 0);
LocalDateTime localDateTime2 = LocalDateTime.of(2023, 3, 1, 8, 0, 0);
ZonedDateTime zonedDateTime1 = localDateTime1.atZone(ZoneId.of("UTC+8"));
ZonedDateTime zonedDateTime2 = localDateTime2.atZone(ZoneId.of("UTC+8"));// 将ZonedDateTime转换为UTC时区
ZonedDateTime utcZonedDateTime1 = zonedDateTime1.withZoneSameInstant(ZoneId.of("UTC"));
ZonedDateTime utcZonedDateTime2 = zonedDateTime2.withZoneSameInstant(ZoneId.of("UTC"));searchSourceBuilder.query(QueryBuilders.rangeQuery("sing_date").gte(utcZonedDateTime1).lte(utcZonedDateTime2));
searchRequest.source(searchSourceBuilder);
try {SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);System.out.println(searchResponse.getHits().getHits());
} catch (IOException e) {e.printStackTrace();
}
测试1:

不创建mapping

设置时区都为Asia/Shanghai,ES中存储时间均为UTC,比正常少八个小时,

追踪日志记录时间为UTC:

— !ruby/object:DateTime ‘2024-12-31 02:13:30.000000000 Z’

定时任务执行正常

SELECT count(*) AS count FROM (SELECT * FROM stu_sign_2023202401 WHERE modify_date > ‘2024-12-31 10:13:30’ AND modify_date < NOW() ) AS t1 LIMIT 1

配置文件为:

input {jdbc {# 配置数据库信息jdbc_connection_string => "jdbc:mysql://188.18.66.185:3306/eschool?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai"jdbc_driver_class => "com.mysql.cj.jdbc.Driver"jdbc_user => "root"jdbc_password => "123456"jdbc_paging_enabled => "true"jdbc_page_size => "50000"jdbc_default_timezone => "Asia/Shanghai"# mysql驱动所在位置jdbc_driver_library => "D:\environment\apache-maven-3.8.8\maven_repository\mysql\mysql-connector-java\8.0.27\mysql-connector-java-8.0.27.jar"#sql执行语句statement => "SELECT * FROM `stu_sign_2023202401` WHERE modify_date > :sql_last_value AND modify_date < NOW() "use_column_value => truetracking_column => "modify_date"tracking_column_type => "timestamp"last_run_metadata_path => "E:\software\logstash\last_run_stu_login.txt"schedule => "*/3 * * * * Asia/Shanghai"lowercase_column_names => false}
}output {elasticsearch {hosts => ["127.0.0.1:9200"]index => "stu_sign_test"# document_id => "%{id}"}stdout {codec => json_lines}
}
测试2:

使用mapping,将自动映射的date转为其他允许的时间格式

"modify_date": {"type": "date","format": "yyyy-MM-dd HH:mm:ss||strict_date_optional_time||epoch_millis"}

与测试1结果完全一致

测试3:

不使用mapping,时间日期调整

mysql中时间为"modify_date":“2024-12-31 10:13:30”

ES记录时间多8个小时:“modify_date”:“2024-12-31T18:13:30.000Z”

{"id":20120,"appeal_time":null,"teacher_name":"黄雅平","stu_id":736,"stu_name":"郑欣欣","modify_date":"2024-12-31T18:13:30.000Z","sing_date":"2024-12-31T18:13:30.000Z","teach_time":"2024-12-31T08:00:00.000Z","teach_time_str":"20241231","stu_sign_status":0,"@timestamp":"2025-04-01T01:54:25.599Z","teacher_id":731,"@version":"1","class_num_end":null,"leave_status":null,"sign_status":0,"stu_num":"20233003","course_name":"数字系统基础","appeal_msg":null,"course_sched_id":186641,"grade_id":"DE44AF38ADB74D9BBF42A6C8285B8285","appeal_status":null,"academy_id":471,"classroom_id":440428,"roll_call_status":0,"roll_call_date":"2024-12-31T18:13:30.000Z","classroom":"304","class_end_time":"2024-12-31 18:30:00","create_date":"2024-12-31T18:13:30.000Z","course_id":1373,"academy_superior_id":6,"late_status":null,"sign_type":null,"class_id":5407,"class_num_begin":null,"class_begin_time":"2024-12-31 16:35:00","semester_id":1,"leave_statusersss":null,"leave_statuser":null}

追踪日志记录为— !ruby/object:DateTime ‘2024-12-31 18:13:30.000000000 Z’

执行sql为:SELECT count(*) AS count FROM (SELECT * FROM stu_sign_2023202401 WHERE modify_date > ‘2024-12-31 18:13:30’ AND modify_date < NOW() ) AS t1 LIMIT 1

对应的配置文件为:

input {jdbc {# 配置数据库信息jdbc_connection_string => "jdbc:mysql://188.18.66.185:3306/eschool?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC"jdbc_driver_class => "com.mysql.cj.jdbc.Driver"jdbc_user => "root"jdbc_password => "123456"jdbc_paging_enabled => "true"jdbc_page_size => "50000"jdbc_default_timezone => "UTC"# mysql驱动所在位置jdbc_driver_library => "D:\environment\apache-maven-3.8.8\maven_repository\mysql\mysql-connector-java\8.0.27\mysql-connector-java-8.0.27.jar"#sql执行语句statement => "SELECT * FROM `stu_sign_2023202401` WHERE modify_date > :sql_last_value AND modify_date < NOW() "use_column_value => truetracking_column => "modify_date"tracking_column_type => "timestamp"last_run_metadata_path => "E:\software\logstash\last_run_stu_login.txt"schedule => "*/3 * * * * Asia/Shanghai"lowercase_column_names => false}
}
测试4:

不使用mapping,时间日期调整

mysql中时间为"modify_date":“2024-12-31 10:13:30”

ES记录时间:“modify_date”:“2024-12-31T10:13:30.000Z”

{"teacher_id":731,"class_num_end":null,"stu_sign_status":0,"roll_call_status":0,"semester_id":1,"stu_id":749,"leave_statusersss":null,"classroom_id":440428,"course_sched_id":186641,"teacher_name":"黄雅平","create_date":"2024-12-31T10:13:30.000Z","appeal_msg":null,"appeal_status":null,"class_end_time":"2024-12-31 18:30:00","modify_date":"2024-12-31T10:13:30.000Z","class_begin_time":"2024-12-31 16:35:00","leave_statuser":null,"teach_time":"2024-12-31T00:00:00.000Z","appeal_time":null,"academy_superior_id":6,"sign_type":null,"roll_call_date":"2024-12-31T10:13:30.000Z","late_status":null,"classroom":"304","@timestamp":"2025-04-01T02:03:36.223Z","class_id":5407,"stu_num":"2024008","@version":"1","id":20107,"teach_time_str":"20241231","sign_status":0,"sing_date":"2024-12-31T10:13:30.000Z","course_name":"数字系统基础","course_id":1373,"stu_name":"张恒","class_num_begin":null,"academy_id":471,"leave_status":null,"grade_id":"DE44AF38ADB74D9BBF42A6C8285B8285"}

追踪日志记录为— !ruby/object:DateTime ‘2024-12-31 10:13:30.000000000 Z’

执行sql为:SELECT count(*) AS count FROM (SELECT * FROM stu_sign_2023202401 WHERE modify_date > ‘2024-12-31 10:13:30’ AND modify_date < NOW() ) AS t1 LIMIT 1

对应的配置文件为:

input {jdbc {# 配置数据库信息jdbc_connection_string => "jdbc:mysql://188.18.66.185:3306/eschool?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai"jdbc_driver_class => "com.mysql.cj.jdbc.Driver"jdbc_user => "root"jdbc_password => "123456"jdbc_paging_enabled => "true"jdbc_page_size => "50000"jdbc_default_timezone => "UTC"# mysql驱动所在位置jdbc_driver_library => "D:\environment\apache-maven-3.8.8\maven_repository\mysql\mysql-connector-java\8.0.27\mysql-connector-java-8.0.27.jar"#sql执行语句statement => "SELECT * FROM `stu_sign_2023202401` WHERE modify_date > :sql_last_value AND modify_date < NOW() "use_column_value => truetracking_column => "modify_date"tracking_column_type => "timestamp"last_run_metadata_path => "E:\software\logstash\last_run_stu_login.txt"schedule => "*/3 * * * * Asia/Shanghai"lowercase_column_names => false}
}

时间配置正确符合预期需求

其中定时任务配置
schedule => “/3 * * * * Asia/Shanghai"
schedule => "
/3 * * * * UTC”
schedule => "*/3 * * * * "
效果一致

测试5:
不使用mapping,时间日期调整
与测试1相反,时间都调整为UTC的时候,es储存的时间均多出八个小时
如下:
mysql中时间为"modify_date":“2024-12-31 18:13:30”
ES记录时间:“modify_date”:“2024-12-31T18:13:30.000Z”
追踪日志记录为— !ruby/object:DateTime ‘2024-12-31 18:13:30.000000000 Z’
执行sql为:SELECT count(*) AS count FROM (SELECT * FROM stu_sign_2023202401 WHERE modify_date > ‘2024-12-31 18:13:30’ AND modify_date < NOW() ) AS t1 LIMIT 1

可自行得出结论:
jdbc_connection_string和jdbc_default_timezone需要配合使用,效果会叠加。具体为什么会这样?懂得兄弟可以在评论区解释一下,博主也学习学习

本文环境相关:
elasticsearch-7.16.3
kibana-7.16.3-windows-x86_64
logstash-7.16.3
mysql5.7.38

客户端:RestHighLevelClient

 <dependency><groupId>org.elasticsearch.client</groupId><artifactId>elasticsearch-rest-high-level-client</artifactId><version>7.16.3</version></dependency>

启动命令:

logstash.bat -f E:\software\logstash\logstash-7.16.3\conf\stu_sign.conf

最后附上配置文件解释
Logstash配置文件conf介绍:

input {jdbc {# mysql 数据库链接jdbc_connection_string => "jdbc:mysql:localhost/database?characterEncoding=utf8"# 用户名和密码jdbc_user => "xxx"jdbc_password => "xxxx"# 驱动jdbc_driver_library => "D:/xx/xx/logstash-6.2.4/config/mysql-connector-java-8.0.18.jar"# 驱动类名jdbc_driver_class => "com.mysql.jdbc.Driver"jdbc_paging_enabled => "true"jdbc_page_size => "50000"# 执行的sql 文件路径+名称#statement_filepath => ""parameters => { "sql_last_value" => "UpdateTime" }statement => "SELECT * FROM (SELECT * FROM table1 ) t WHERE t.updatetime > :sql_last_value"# 设置监听间隔 各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新schedule => "* * * * *"# 索引类型#type => "article"# 防止自动将大小转为小写lowercase_column_names => false# 记录上一次运行记录record_last_run => true# 使用字段值use_column_value => true# 追踪字段名tracking_column => "updatetime"# 字段类型tracking_column_type => "timestamp"# 上一次运行元数据保存路径last_run_metadata_path => "./logstash_last_id"# 是否删除记录的数据clean_run => false}
}
filter {json {source => "message"remove_field => ["message"]}
}
output {elasticsearch {hosts => "http://localhost:9200/"index => "indexname"document_type => "articles"document_id => "%{articleid}"template_overwrite => true}# 这里输出调试,正式运行时可以注释掉stdout {codec => json_lines}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/76354.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

如何使用 FastAPI 构建 MCP 服务器

哎呀&#xff0c;各位算法界的小伙伴们&#xff01;今天咱们要聊聊一个超酷的话题——MCP 协议&#xff01;你可能已经听说了&#xff0c;Anthropic 推出了这个新玩意儿&#xff0c;目的是让 AI 代理和你的应用程序之间的对话变得更顺畅、更清晰。不过别担心&#xff0c;为你的…

【Git】-- 处理 Git 提交到错误分支的问题

如果你不小心把本应提交到 test 分支的代码提交到了 master 分支&#xff08;但尚未 push&#xff09;&#xff0c;可以按照以下步骤解决&#xff1a; 方法一&#xff08;推荐&#xff09;&#xff1a;使用 git reset 和 git stash 首先&#xff0c;确保你在 master 分支&…

通用目标检测技术选型分析报告--截止2025年4月

前言 本文撰写了一份关于通用目标检测&#xff08;General Object Detection&#xff09;的技术选型分析报告&#xff0c;覆盖2000至2025年技术演进历程&#xff0c;重点纳入YOLO-World、RT-DETR、Grounding DINO等2024-2025年的最新模型。 报告将包括技术定义、行业现状、技…

链路追踪Skywalking

一、什么是Skywalking 分布式链路追踪的一种方式&#xff1a;Spring Cloud SleuthZipKin&#xff0c;这种方案目前也是有很多企业在用&#xff0c;但是作为程序员要的追逐一些新奇的技术&#xff0c;Skywalking作为后起之秀也是值得大家去学习的。 Skywalking是一个优秀的国产…

websocket获取客服端真实ip

在websocket建立连接时,获取访问客户端的真实ip 1. websocket建立连接过程 2. pom依赖 <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId></dependency>3. 添加配置,握…

NSSCTF(MISC)—[justCTF 2020]pdf

相应的做题地址&#xff1a;https://www.nssctf.cn/problem/920 binwalk分离 解压文件2AE59A.zip mutool 得到一张图片 B5F31内容 B5FFD内容 转换成图片 justCTF{BytesAreNotRealWakeUpSheeple}

部分国产服务器CPU及内存性能测试情况

近日对部分国产服务器进行了CPU和内存的性能测试&#xff0c; 服务器包括华锟振宇、新华三和中兴三家&#xff0c;CPU包括鲲鹏、海光和Intel&#xff0c;初步测试结果如下&#xff1a; 服务器厂商四川华锟振宇新华三中兴中兴服务器HuaKun TG225 B1R4930 G5R5930 G2R5300 G4操作…

【无标题】Scala函数基础

函数和方法的区别 1&#xff09; 核心概念 &#xff08;1&#xff09; 为完成某一功能的程序语句的集合&#xff0c;称为函数。 &#xff08;2&#xff09; 类中的函数称之方法。 2&#xff09; 案例实操 &#xff08;1&#xff09; Scala 语言可以在任何的语法结构中声明…

uniapp -- 列表垂直方向拖拽drag组件

背景 需要在小程序中实现拖拽排序功能,所以就用到了m-drag拖拽组件,在开发的过程中,发现该组件在特殊的场景下会有些问题,并对其进行了拓展。 效果 组件代码 <template><!-- 创建一个垂直滚动视图,类名为m-drag --><scroll

conda安装python 遇到 pip is configured with locations that require TLS/SSL问题本质解决方案

以前写了一篇文章&#xff0c;不过不是专门为了解决这个问题的&#xff0c;但是不能访问pip install 不能安装来自https 协议的包问题几乎每次都出现&#xff0c;之前解决方案只是治标不治本 https://blog.csdn.net/wangsenling/article/details/130194456​​​​​​​https…

【初阶数据结构】队列

文章目录 目录 一、概念与结构 二、队列的实现 队列的定义 1.初始化 2.入队列 3.判断队列是否为空 4.出队列 5.取队头数据 6.取队尾数据 7.队列有效个数 8.销毁队列 三.完整源码 总结 一、概念与结构 概念&#xff1a;只允许在一端进行插入数据操作&#xff0c;在另一端进行删除…

Apache Shiro 全面指南:从入门到高级应用

一、Shiro 概述与核心架构 1.1 什么是 Shiro&#xff1f; Apache Shiro 是一个强大且易用的 Java 安全框架&#xff0c;它提供了认证&#xff08;Authentication&#xff09;、授权&#xff08;Authorization&#xff09;、加密&#xff08;Cryptography&#xff09;和会话管…

es 3期 第28节-深入掌握集群组建与集群设置

#### 1.Elasticsearch是数据库&#xff0c;不是普通的Java应用程序&#xff0c;传统数据库需要的硬件资源同样需要&#xff0c;提升性能最有效的就是升级硬件。 #### 2.Elasticsearch是文档型数据库&#xff0c;不是关系型数据库&#xff0c;不具备严格的ACID事务特性&#xff…

Android学习总结之通信篇

一、Binder跨进程通信的底层实现细节&#xff08;挂科率35%&#xff09; 高频问题&#xff1a;“Binder如何实现一次跨进程方法调用&#xff1f;”   候选人常见错误&#xff1a;   仅回答“通过Binder驱动传输数据”&#xff0c;缺乏对内存映射和线程调度的描述混淆Binde…

数据结构C语言练习(两个栈实现队列)

一、引言 在数据结构的学习中&#xff0c;我们经常会遇到一些有趣的问题&#xff0c;比如如何用一种数据结构去实现另一种数据结构的功能。本文将深入探讨 “用栈实现队列” 这一经典问题&#xff0c;详细解析解题思路、代码实现以及每个函数的作用&#xff0c;帮助读者更好地…

前端如何导入谷歌字体库

#谷歌字体库内容丰富&#xff0c;涵盖上千种多语言支持的字体&#xff0c;学习导入谷歌字体库来增加网站的阅读性&#xff0c;是必不可少的一项技能# 1&#xff0c;前往谷歌字体网站 要会魔法&#xff0c;裸连很卡 2&#xff0c; 寻找心仪字体 Googles Fonts下面的filters可…

SnapdragonCamera骁龙相机源码解析

骁龙相机是高通开发的一个测试系统摄像头的demo&#xff0c;代码完善&#xff0c;功能强大。可以配合Camera驱动进行功能联调。 很多逻辑代码在CaptureModule.java里。 CaptureModule有8000多行&#xff0c;包罗万象。 涉及到界面显示要结合CaptureUI.java 一起来实现。 Ca…

多线程猜数问题

题目&#xff1a;线程 A 生成随机数&#xff0c;另外两个线程来猜数&#xff0c;线程 A 可以告诉猜的结果是大还是小&#xff0c;两个线程都猜对后&#xff0c;游戏结束&#xff0c;编写代码完成。 一、Semaphore 多个线程可以同时操作同一信号量&#xff0c;由此实现线程同步…

seq2seq

理解 transformer 中的 encoder decoder 详细的 transformer 教程见&#xff1a;【极速版 – 大模型入门到进阶】Transformer 文章目录 &#x1f30a; Encoder: 给一排向量输出另外一排向量&#x1f30a; Encoder vs. Decoder: multi-head attention vs. masked multi-head at…

Proxmox pct 部署ubuntu

pct 前言 PCT(Proxmox Container Tool)是 PVE 中用于管理 Linux 容器(LXC)的命令行工具。通过 PCT,用户可以执行各种容器管理任务,例如创建新的容器、启动和停止容器、更新容器、安装软件包、导出和导入容器等。PCT 提供了与 Web 界面相同的功能,但通过命令行进行操作,…