日志文件和mysql同步到kafka_logstash_output_kafka:Mysql 同步 Kafka 深入详解

0、题记

实际业务场景中,会遇到基础数据存在 Mysql 中,实时写入数据量比较大的情景。迁移至kafka是一种比较好的业务选型方案。

997a544ad2d39b39865ea9e5b858e12f.png

而mysql写入kafka的选型方案有:

方案一:logstash_output_kafka 插件。

方案二:kafka_connector。

方案三:debezium 插件。

方案四:flume。

方案五:其他类似方案。

其中:debezium和flume是基于 mysql binlog 实现的。

如果需要同步历史全量数据+实时更新数据,建议使用logstash。

1、logstash同步原理

常用的logstash的插件是:logstash_input_jdbc实现关系型 数据库 到Elasticsearch等的同步。

实际上, 核心logstash的同步原理的掌握 ,有助于大家理解类似的各种库之间的同步。

logstash 核心原理 :输入生成事件,过滤器修改它们,输出将它们发送到其他地方。

logstash核心三部分组成:input、filter、output。

21b43936dca2e5f15cd03c7e56ed325b.png

input { }

filter { }

output { }

1.1 input输入

包含但远不限于:

jdbc:关系型数据库:mysql、 oracle 等。

file:从文件系统上的文件读取。

syslog:在已知端口514上侦听syslog消息。

redis:redis消息。beats:处理 Beats发送的事件。

kafka:kafka实时数据流。

1.2 filter过滤器

过滤器是Logstash管道中的中间处理设备。您可以将过滤器与条件组合,以便在事件满足特定条件时对其执行操作。

可以把它比作数据处理的 ETL 环节。

一些有用的过滤包括:

grok:解析并构造任意文本。 Grok是目前Logstash中将非结构化日志数据解析为结构化和可查询内容的最佳方式 。有了内置于Logstash的120种模式,您很可能会找到满足您需求的模式!

mutate:对事件字段执行常规转换。您可以重命名,删除,替换和修改事件中的字段。

drop:完全删除事件,例如调试事件。

clone:制作事件的副本,可能添加或删除字段。

geoip:添加有关IP地址的地理位置的信息。

1.3 output输出

输出是Logstash管道的最后阶段。一些常用的输出包括:

elasticsearch:将事件数据发送到Elasticsearch。

file:将事件数据写入磁盘上的文件。

kafka:将事件写入Kafka。

详细的filter demo参考:http://t.cn/EaAt4zP

2、同步Mysql到kafka配置参考

input {

jdbc {

jdbc_connection_string => "jdbc:mysql://192.168.1.12:3306/news_base"

jdbc_user => "root"

jdbc_password => "xxxxxxx"

jdbc_driver_library => "/home/logstash-6.4.0/lib/mysql-connector-java-5.1.47.jar"

jdbc_driver_class => "com.mysql.jdbc.Driver"

#schedule => "* * * * *"

statement => "SELECT * from news_info WHERE id > :sql_last_value  order by id"

use_column_value => true

tracking_column => "id"               tracking_column_type => "numeric"

record_last_run => true

last_run_metadata_path => "/home/logstash-6.4.0/sync_data/news_last_run"         }

}

filter {

ruby{

code => "event.set('gather_time_unix',event.get('gather_time').to_i*1000)"

}

ruby{

code => "event.set('publish_time_unix',event.get('publish_time').to_i*1000)"

}

mutate {

remove_field => [ "@version" ]

remove_field => [ "@timestamp" ]

remove_field => [ "gather_time" ]

remove_field => [ "publish_time" ]

}

}

output {

kafka {

bootstrap_servers => "192.168.1.13:9092"

codec => json_lines

topic_id => "mytopic"

}

file {

codec => json_lines

path => "/tmp/output_a.log"

}

}

以上内容不复杂,不做细讲。

注意:

Mysql借助logstash同步后,日期类型格式:“2019-04-20 13:55:53”已经被识别为日期格式。

code => "event.set('gather_time_unix',event.get('gather_time').to_i*1000)",

是将Mysql中的时间格式转化为时间戳格式。

3、坑总结

3.1 坑1字段大小写问题

from星友:使用logstash同步mysql数据的,因为在jdbc.conf里面没有添加 lowercase_column_names

=> "false"  这个属性,所以logstash默认把查询结果的列明改为了小写,同步进了es,所以就导致es里面看到的字段名称全是小写。

最后总结:es是支持大写字段名称的,问题出在logstash没用好,需要在同步配置中加上 lowercase_column_names => "false"  。记录下来希望可以帮到更多人。

3.2 同步到ES中的数据会不会重复?

想将关系数据库的数据同步至ES中,如果在集群的多台 服务器 上同时启动logstash。

解读:实际项目中就是没用随机id  使用指定id作为es的_id ,指定id可以是url的md5.这样相同数据就会走更新覆盖以前数据

3.3 相同配置logstash,升级6.3之后不能同步数据。

解读:高版本基于时间增量有优化。

tracking_column_type => "timestamp" 应该是需要指定标识为时间类型,默认为数字类型numeric

3.4 ETL字段统一在哪处理?

解读:可以logstash同步mysql的时候sql查询阶段处理,如: select a_value as avalue*** 。

或者filter阶段处理, mutate rename 处理。

mutate {

rename => ["shortHostname", "hostname" ]

}

或者kafka阶段借助kafka stream处理。

4、小结

相关配置和同步都不复杂,复杂点往往在于filter阶段的解析还有logstash性能问题。

需要结合实际业务场景做深入的研究和性能分析。

有问题,欢迎留言讨论。

推荐阅读:

3、 一张图理清楚关系型数据库与Elasticsearch同步 http://t.cn/EaAceD3

4、新的实现:http://t.cn/EaAt60O

5、mysql2mysql: http://t.cn/EaAtK7r 6、推荐开源实现: http://t.cn/EaAtjqN

fcea25ecad19e379785c92fcd55b9d47.png

加入星球,更短时间更快习得更多干货!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/552553.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

java 命令行参数 _java命令行参数

原标题:java命令行参数命令行参数就是main方法里面的参数String[] args他就是一个数组,args只是数据类型的一个名称,就是一个数组的变量,名称无所谓,类型没变就行了。这个就是程序的入口点。如图7.4所示:图…

java 最小生成树_图的最小生成树(java实现)

1.图的最小生成树(贪心算法)我两个算法的输出都是数组表示的,当前的索引值和当前索引对应的数据就是通路,比如parent[2] 5;即2和5之间有一个通路,第二个可能比较好理解,第一个有点混乱是什么?将一个有权图中的 所有顶…

java 自定义注解 生成json_用自定义注解实现fastjson序列化的扩展

这篇文章起源于项目中一个特殊的需求。由于目前的开发方式是前后端分离的,基本上是通过接口提供各个服务。而前两天前端fe在开发中遇到了一些问题:他们在处理字符串类型的时间时会出现精度丢失的情况,所以希望后台是以时间戳的形式返回给前端…

工厂模式 java场景_研磨设计模式之简单工厂模式(场景问题)

简单工厂不是一个标准的设计模式,但是它实在是太常用了,简单而又神奇,所以还是需要好好掌握的,就当是对学习设计模式的热身运动吧。为了保持一致性,我们尽量按照学习其它模式的步骤来进行学习。1 场景问题大家都知道&…

java asm jndi_GitHub - Q1ngShan/JNDI: JNDI 注入利用工具

JNDI 注入利用工具介绍本项目为 JNDI 注入利用工具,生成 JNDI 连接并启动后端相关服务,可用于 Fastjson、Jackson 等相关漏洞的验证。本项目是基于 welk1n 的 JNDI-Injection-Exploit,在此项目的基础服务框架上,重新编写了攻击利用…

java中fis和fos_java中-的流-与操作

/*字节输出流 OutputStrema:* OutputStream抽象类* write(int b); 将指定的字节写入此流中* write(byte[] b); 将指定的数组 输入此流中* write(byte[] b , int a , int c); 将指定的数组输入此流中 从a索引开始 获取c 个* close(); 将此流关闭 并释放资源* fl…

yolov4用1050ti_简单粗暴的多目标跟踪神器 – DeepSort

目标跟踪问题一直是计算机视觉的热点任务之一,简单的可以分为单目标跟踪与多目标跟踪,最常见的目标跟踪算法都是基于检测的跟踪算法,首先发现然后标记,好的跟踪算法必须具备REID的能力。今天小编斗胆给大家推荐一个结合传统算法跟…

类java的步骤_java类加载的过程

类加载就是三个过程:加载、链接、初始化链接又可以分为验证、准备、解析1.加载将class字节码文件通过类加载器装入内存中2.验证确保当前class文件的字节流所包含的内容符合当前JVM的规范要求,并且不会出现危害JVM自身安全的代码,当前字节流不…

java拓展接口_Java拓展接口-default关键词

Java接口在使用过程中有两点规定:1、接口中只能有定义方法名、方法返回类型,不能有方法的实现。2、实现接口的类,必须实现接口中所有的方法。例如下面的例子://定义接口public interface Action {//接口中的方法定义,只…

java编写通信录管理系统_Java 实现通讯录管理系统教程

本文实例为大家分享了java实现通讯录管理系统的具体代码,供大家参考,具体内容如下完成项目的流程:1.根据需求,确定大体方向2.功能模块分析3.界面实现4.功能模块设计5.coding6.代码测试下面是源代码:import java.awt.Co…

java姑娘_初识java这个小姑娘(二)

正版疯狂java讲义第5版编程教材76.5元(需用券)去购买 >妙解垃圾回收机制周一,早高峰!!!五个字,说尽心中的绝望!!!一段考验一个人耐力、智力、开车技术以及脾气的路。 我把车开进了…

JAVA script 循环 图片_深入分析JavaScript 事件循环(Event Loop)

事件循环(Event Loop),是每个JS开发者都会接触到的概念,但是刚接触时可能会存在各种疑惑。众所周知,JS是单线程的,即同一时间只能运行一个任务。一般情况下这不会引发问题,但是如果我们有一个耗时较多的任务&#xff0…

java 方法重载的作业_java第六章 方法及方法重载 课堂笔记、作业

当参数传递为基本数据类型时,参数变化不保留,基本数据类型参数传值当参数传递为引用数据类型时,参数变化会保留,引用数据类型参数传址//基本数据类型在别处被重新赋值,则本体不受影响,其值不变//引用型数据…

java旋转图片后边上变黑_Java旋转图像将背景的一部分变成黑色

因此,我下载了“原始”图像(不是正方形),对其进行了修改,使其变为正方形,运行您的代码,得到了java.awt.image.ImagingOpException:无法转换src图像异常,将BufferedImage.TYPE_INT_RGB更改为BufferedImage .TYPE_INT_ARGB并得到…import java.awt.geom.AffineTransfo…

python 数据库连接池_【转】Python 数据库连接池

python编程中可以使用pymysql进行数据库连接及增删改查操作,但每次连接mysql请求时,都是独立的去请求访问,比较浪费资源,而且访问数量达到一定数量时,对mysql的性能会产生较大的影响。因此实际使用中,通常会…

java 获取oracle表结构_获取Oracle中所有表的列表?

回答(19)2 years ago我们可以从以下查询获取所有表格,包括列详细信息:SELECT * FROM user_tab_columns;2 years ago使用sqlplus更好地查看如果您正在使用 sqlplus ,您可能需要首先设置一些参数以便在您的列被破坏时更好地查看(退出 sqlplus 会…

java后台处理excel_java后台利用Apache poi 生成excel文档提供前台下载示例

之前在项目中会用到在java在后台把数据填入Word文档的模板来提供前台下载,为了自己能随时查看当时的实现方案及方便他人学习我写了这篇博客,访问量已经是我写的博客里第一了。于是乎我在学会用Java在后台利用Apache poi 生成excel文档提供前台下载之后就…

宝塔php漏洞,[安全预警]关于最近宝塔闹得很厉害的PMA漏洞BUG

文章前言在2020年8月23日的下午有个憨憨管理在我群艾特全员 说宝塔爆出漏洞了赶快更新吧!影响机器需同时满足以下所有条件1、软件版本为Linux面板7.4.2 或者Windows面板6.8.02、开放888且未配置http认证,3、安装了phpmyadmin,mysql数据库不受…

求十个学生的平均成绩java,JAVA 声明一个数组,存一个学生的五门成绩。求该学生的总成绩、平均成绩。...

JAVA 声明一个数组,存一个学生的五门成绩。求该学生的总成绩、平均成绩。mip版 关注:116 答案:3 悬赏:30解决时间 2021-01-26 06:39已解决2021-01-25 17:54声明一个数组,存一个学生的五门成绩。求该学生的总成绩、平均成绩。JAVA知识最佳答案2021-01-25 18:12public class S…

php伪静态限制网页播放视频,学习猿地-php伪静态后html不能访问怎么办

php伪静态后html不能访问的解决办法:首先判断文件是否存在;然后设置存在则不rewirte,不存在且符合规则才rewrite;最后修改htaccess文件即可。具体问题:PHP伪静态后不能访问纯html文件.htaccess文件RewriteEngine onRew…