日志文件和mysql同步到kafka_logstash_output_kafka:Mysql 同步 Kafka 深入详解

0、题记

实际业务场景中,会遇到基础数据存在 Mysql 中,实时写入数据量比较大的情景。迁移至kafka是一种比较好的业务选型方案。

997a544ad2d39b39865ea9e5b858e12f.png

而mysql写入kafka的选型方案有:

方案一:logstash_output_kafka 插件。

方案二:kafka_connector。

方案三:debezium 插件。

方案四:flume。

方案五:其他类似方案。

其中:debezium和flume是基于 mysql binlog 实现的。

如果需要同步历史全量数据+实时更新数据,建议使用logstash。

1、logstash同步原理

常用的logstash的插件是:logstash_input_jdbc实现关系型 数据库 到Elasticsearch等的同步。

实际上, 核心logstash的同步原理的掌握 ,有助于大家理解类似的各种库之间的同步。

logstash 核心原理 :输入生成事件,过滤器修改它们,输出将它们发送到其他地方。

logstash核心三部分组成:input、filter、output。

21b43936dca2e5f15cd03c7e56ed325b.png

input { }

filter { }

output { }

1.1 input输入

包含但远不限于:

jdbc:关系型数据库:mysql、 oracle 等。

file:从文件系统上的文件读取。

syslog:在已知端口514上侦听syslog消息。

redis:redis消息。beats:处理 Beats发送的事件。

kafka:kafka实时数据流。

1.2 filter过滤器

过滤器是Logstash管道中的中间处理设备。您可以将过滤器与条件组合,以便在事件满足特定条件时对其执行操作。

可以把它比作数据处理的 ETL 环节。

一些有用的过滤包括:

grok:解析并构造任意文本。 Grok是目前Logstash中将非结构化日志数据解析为结构化和可查询内容的最佳方式 。有了内置于Logstash的120种模式,您很可能会找到满足您需求的模式!

mutate:对事件字段执行常规转换。您可以重命名,删除,替换和修改事件中的字段。

drop:完全删除事件,例如调试事件。

clone:制作事件的副本,可能添加或删除字段。

geoip:添加有关IP地址的地理位置的信息。

1.3 output输出

输出是Logstash管道的最后阶段。一些常用的输出包括:

elasticsearch:将事件数据发送到Elasticsearch。

file:将事件数据写入磁盘上的文件。

kafka:将事件写入Kafka。

详细的filter demo参考:http://t.cn/EaAt4zP

2、同步Mysql到kafka配置参考

input {

jdbc {

jdbc_connection_string => "jdbc:mysql://192.168.1.12:3306/news_base"

jdbc_user => "root"

jdbc_password => "xxxxxxx"

jdbc_driver_library => "/home/logstash-6.4.0/lib/mysql-connector-java-5.1.47.jar"

jdbc_driver_class => "com.mysql.jdbc.Driver"

#schedule => "* * * * *"

statement => "SELECT * from news_info WHERE id > :sql_last_value  order by id"

use_column_value => true

tracking_column => "id"               tracking_column_type => "numeric"

record_last_run => true

last_run_metadata_path => "/home/logstash-6.4.0/sync_data/news_last_run"         }

}

filter {

ruby{

code => "event.set('gather_time_unix',event.get('gather_time').to_i*1000)"

}

ruby{

code => "event.set('publish_time_unix',event.get('publish_time').to_i*1000)"

}

mutate {

remove_field => [ "@version" ]

remove_field => [ "@timestamp" ]

remove_field => [ "gather_time" ]

remove_field => [ "publish_time" ]

}

}

output {

kafka {

bootstrap_servers => "192.168.1.13:9092"

codec => json_lines

topic_id => "mytopic"

}

file {

codec => json_lines

path => "/tmp/output_a.log"

}

}

以上内容不复杂,不做细讲。

注意:

Mysql借助logstash同步后,日期类型格式:“2019-04-20 13:55:53”已经被识别为日期格式。

code => "event.set('gather_time_unix',event.get('gather_time').to_i*1000)",

是将Mysql中的时间格式转化为时间戳格式。

3、坑总结

3.1 坑1字段大小写问题

from星友:使用logstash同步mysql数据的,因为在jdbc.conf里面没有添加 lowercase_column_names

=> "false"  这个属性,所以logstash默认把查询结果的列明改为了小写,同步进了es,所以就导致es里面看到的字段名称全是小写。

最后总结:es是支持大写字段名称的,问题出在logstash没用好,需要在同步配置中加上 lowercase_column_names => "false"  。记录下来希望可以帮到更多人。

3.2 同步到ES中的数据会不会重复?

想将关系数据库的数据同步至ES中,如果在集群的多台 服务器 上同时启动logstash。

解读:实际项目中就是没用随机id  使用指定id作为es的_id ,指定id可以是url的md5.这样相同数据就会走更新覆盖以前数据

3.3 相同配置logstash,升级6.3之后不能同步数据。

解读:高版本基于时间增量有优化。

tracking_column_type => "timestamp" 应该是需要指定标识为时间类型,默认为数字类型numeric

3.4 ETL字段统一在哪处理?

解读:可以logstash同步mysql的时候sql查询阶段处理,如: select a_value as avalue*** 。

或者filter阶段处理, mutate rename 处理。

mutate {

rename => ["shortHostname", "hostname" ]

}

或者kafka阶段借助kafka stream处理。

4、小结

相关配置和同步都不复杂,复杂点往往在于filter阶段的解析还有logstash性能问题。

需要结合实际业务场景做深入的研究和性能分析。

有问题,欢迎留言讨论。

推荐阅读:

3、 一张图理清楚关系型数据库与Elasticsearch同步 http://t.cn/EaAceD3

4、新的实现:http://t.cn/EaAt60O

5、mysql2mysql: http://t.cn/EaAtK7r 6、推荐开源实现: http://t.cn/EaAtjqN

fcea25ecad19e379785c92fcd55b9d47.png

加入星球,更短时间更快习得更多干货!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/552553.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

mysql max嵌套select_使用嵌套select子式 解决mysql不能叠加使用如max(sum())的问题

网上也有解决方案 有的有瑕疵 有的较复杂(mysql没有分析函数,可以使用变量实现) select sumScoreValue,studentid,studentName from sc_studentb, ( select sum (scoreValue) as sumScoreValue,studentid from sc_score group by studentid order by sumSc网上也有解…

mysql 检查记录存在_Mysql 插入记录时检查记录是否已经存在,存在则更新,不存在则插入记录SQL...

我们在开发数据库相关的逻辑过程中, 经常检查表中是否已经存在这样的一条记录, 如果存在则更新或者不做操作, 如果没有存在记录,则需要插入一条新的记录。这样的逻辑固然可以通过两条sql语句完成。SELECT COUNT(*) FROM xxx WHERE…

python linux调试_python调试

以下是我做调试或分析时用过的工具的一个概览。如果你知道有更好的工具,请在评论中留言,可以不用很完整的介绍。日志没错,就是日志。再多强调在你的应用里保留足量的日志的重要性也不为过。你应当对重要的内容打日志。如果你的日志打的足够好…

猜数字游戏python程序用函数guesssecret_Python-三、函数

3.1 def语句和参数def ...(...)语句用于定义一个函数,以便后面调用函数,如:def hello(name): #定义函数hello,变元nameprint(hello name) #打印hello变元namenamein input() #定义namein变量,从键盘取值并赋给namein…

java 命令行参数 _java命令行参数

原标题:java命令行参数命令行参数就是main方法里面的参数String[] args他就是一个数组,args只是数据类型的一个名称,就是一个数组的变量,名称无所谓,类型没变就行了。这个就是程序的入口点。如图7.4所示:图…

java 最小生成树_图的最小生成树(java实现)

1.图的最小生成树(贪心算法)我两个算法的输出都是数组表示的,当前的索引值和当前索引对应的数据就是通路,比如parent[2] 5;即2和5之间有一个通路,第二个可能比较好理解,第一个有点混乱是什么?将一个有权图中的 所有顶…

中文分词工具 java_java读取中文分词工具(一)

import java.io.BufferedReader;import java.io.File;import java.io.FileInputStream;import java.io.IOException;import java.io.InputStreamReader;import java.io.RandomAccessFile;import java.util.StringTokenizer;/** 文本格式:已分词的中文文本&#xff0…

java中的成员变量和局部变量的区别_java中成员变量与局部变量区别分析

本文实例分析了java中成员变量与局部变量区别。分享给大家供大家参考。具体分析如下:成员变量:在这个类里定义的私有变量,属于这个类。创建以及使用成员变量public class Person {String name;String Sex;int age;double Height;public stati…

java 自定义注解 生成json_用自定义注解实现fastjson序列化的扩展

这篇文章起源于项目中一个特殊的需求。由于目前的开发方式是前后端分离的,基本上是通过接口提供各个服务。而前两天前端fe在开发中遇到了一些问题:他们在处理字符串类型的时间时会出现精度丢失的情况,所以希望后台是以时间戳的形式返回给前端…

工厂模式 java场景_研磨设计模式之简单工厂模式(场景问题)

简单工厂不是一个标准的设计模式,但是它实在是太常用了,简单而又神奇,所以还是需要好好掌握的,就当是对学习设计模式的热身运动吧。为了保持一致性,我们尽量按照学习其它模式的步骤来进行学习。1 场景问题大家都知道&…

java asm jndi_GitHub - Q1ngShan/JNDI: JNDI 注入利用工具

JNDI 注入利用工具介绍本项目为 JNDI 注入利用工具,生成 JNDI 连接并启动后端相关服务,可用于 Fastjson、Jackson 等相关漏洞的验证。本项目是基于 welk1n 的 JNDI-Injection-Exploit,在此项目的基础服务框架上,重新编写了攻击利用…

java保存登录信息_java – 保存登录详细信息(首选项)android

我有一个具有登录,注销功能的Android应用程序.登录表单包含用户名和密码以及登录按钮.我想在用户选中“记住我”复选框时保存用户名和密码.我的project.java文件如下所示:public class project extends Activity {private static final int IO_BUFFER_SIZE 4 * 102…

java堆和非堆_java 堆与非堆 内存

堆(Heap)和非堆(Non-heap)内存按照官方的说法:“Java 虚拟机具有一个堆,堆是运行时数据区域,所有类实例和数组的内存均从此处分配。堆是在 Java 虚拟机启动时创建的。”“在JVM中堆之外的内存称为非堆内存(Non-heap memory)”。可以看出JVM主…

java中fis和fos_java中-的流-与操作

/*字节输出流 OutputStrema:* OutputStream抽象类* write(int b); 将指定的字节写入此流中* write(byte[] b); 将指定的数组 输入此流中* write(byte[] b , int a , int c); 将指定的数组输入此流中 从a索引开始 获取c 个* close(); 将此流关闭 并释放资源* fl…

java 上传文件及预览_SpringBoot上传下载文件及在线预览

SpringBoot上传下载文件及在线预览今天大概就说说如何使用SpringBoot进行上传和下载以及在线预览文件 本篇主要介绍上传下载的功能,对于界面就简单一点,大致如下:一、老规矩还是先看看小项目的目录结构:二、添加对应的pom依赖org.…

yolov4用1050ti_简单粗暴的多目标跟踪神器 – DeepSort

目标跟踪问题一直是计算机视觉的热点任务之一,简单的可以分为单目标跟踪与多目标跟踪,最常见的目标跟踪算法都是基于检测的跟踪算法,首先发现然后标记,好的跟踪算法必须具备REID的能力。今天小编斗胆给大家推荐一个结合传统算法跟…

java 数据库语句_java连接各数据库的语句

1、Oracle8/8i/9i数据库(thin模式)Class.forName("oracle.jdbc.driver.OracleDriver").newInstance();String url"jdbc:oracle:thin:localhost:1521:orcl";//orcl为数据库的SIDString user"test\";String password"test";Connection c…

alm数据库mysql_mysql

1. mysql启动mysqld --console2. mysql关闭mysqladmin shutdown -uroot -proot3. 免安装mysql配置:启动mysql: mysqld --skip-grant-tables(跳过安全校验), 修改mysql.user表里面的数据,最后flush privileges;mysql jdbc连接代码…

类java的步骤_java类加载的过程

类加载就是三个过程:加载、链接、初始化链接又可以分为验证、准备、解析1.加载将class字节码文件通过类加载器装入内存中2.验证确保当前class文件的字节流所包含的内容符合当前JVM的规范要求,并且不会出现危害JVM自身安全的代码,当前字节流不…

java拓展接口_Java拓展接口-default关键词

Java接口在使用过程中有两点规定:1、接口中只能有定义方法名、方法返回类型,不能有方法的实现。2、实现接口的类,必须实现接口中所有的方法。例如下面的例子://定义接口public interface Action {//接口中的方法定义,只…