使用Logstash将数据从MySQL同步至Elasticsearch(有坑)

文章目录

  • 一、准备工作
    • 1、安装elasticSearch+kibana
    • 2、安装MySQL
    • 3、安装Logstash
  • 二、全量同步
    • 1、准备MySQL数据与表
    • 2、上传mysql-connector-java.jar
    • 3、启动Logstash
    • 4、修改logstash.conf文件
    • 5、修改full_jdbc.sql文件
    • 6、打开Kibana创建索引和映射
    • 7、重启logstash进行全量同步
    • 8、踩坑
      • (1)报错
  • 三、增量同步
    • 1、修改增量配置
    • 2、新建increment_jdbc.sql文件
    • 3、重启容器
    • 4、测试
    • 5、同步原理

一、准备工作

1、安装elasticSearch+kibana

我们此处用的es和kibana版本是7.4.0版本的。
docker安装elasticSearch+kibana

2、安装MySQL

docker安装mysql-简单无坑

3、安装Logstash

logstash就是一个具备实时数据传输能力的管道,负责将数据信息从管道的输入端传输到管道的输出端;与此同时这根管道还可以根据自己的需求在inuput --output中间加上滤网,Logstash内置了几十种插件,可以满足各种应用场景。

logstash官方插件 logstash-input-jdbc集成在logstash(5.X之后)中,通过配置文件实现mysql与elasticsearch数据同步。
能实现mysql数据全量和增量的数据同步,且能实现定时同步。

在这里插入图片描述

# 拉取logstach
docker pull logstash:8.5.2

二、全量同步

全量同步是指全部将数据同步到es,通常是刚建立es,第一次同步时使用。

1、准备MySQL数据与表

CREATE TABLE `product` (`id` int NOT NULL COMMENT 'id',`name` varchar(255) DEFAULT NULL,`price` decimal(10,2) DEFAULT NULL,`create_at` varchar(255) DEFAULT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3;
INSERT INTO `shop`.`product`(`id`, `name`, `price`, `create_at`) VALUES (1, '小米手机', 33.00, '1');
INSERT INTO `shop`.`product`(`id`, `name`, `price`, `create_at`) VALUES (2, '长虹手机', 2222.00, '2');
INSERT INTO `shop`.`product`(`id`, `name`, `price`, `create_at`) VALUES (3, '华为电脑', 3333.00, '3');
INSERT INTO `shop`.`product`(`id`, `name`, `price`, `create_at`) VALUES (4, '小米电脑', 333.30, '4');

2、上传mysql-connector-java.jar

把mysql-connector-java-8.0.21.jar上传到logstach服务器,

3、启动Logstash

# 编辑logstash.yml
vi /usr/local/logstash/config/logstash.yml# 内容,需要修改es地址
http.host: "0.0.0.0"
xpack.monitoring.elasticsearch.hosts: [ "http://172.17.0.3:9200" ]
# 自定义网络(可以解决网络不一致的问题)
#docker network create --subnet=172.188.0.0/16  czbkNetwork
# 启动 logstash
docker run --name logstash -v /usr/local/logstash/config/logstash.yml:/usr/share/logstash/config/logstash.yml -v /usr/local/logstash/config/conf.d/:/usr/share/logstash/pipeline/   -v /root/mysql-connector-java-8.0.21.jar:/usr/share/logstash/logstash-core/lib/jars/mysql-connector-java-5.1.48.jar -d  d7102f8c625d# 查看日志
docker logs -f --tail=200 c1d20ebf76c3

4、修改logstash.conf文件

cd /usr/local/logstash/config/conf.d
vi logstash.conf

stdin从标准输入读取事件。

默认情况下,每一行读取为一个事件

input { stdin {}#使用jdbc插件jdbc {# mysql数据库驱动#jdbc_driver_library => "/usr/share/logstash/logstash-core/lib/jars/mysql-connector-java-5.1.48.jar"jdbc_driver_class => "com.mysql.jdbc.Driver"# mysql数据库链接,数据库名jdbc_connection_string => "jdbc:mysql://172.17.0.2:3306/shop?allowMultiQueries=true&useUnicode=true&characterEncoding=utf8&autoReconnect=true&failOverReadOnly=false&useSSL=false&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true"# mysql数据库用户名,密码jdbc_user => "root"jdbc_password => "root"# 分页jdbc_paging_enabled => "true"# 分页大小jdbc_page_size => "50000"# sql语句执行文件,也可直接使用 statement => 'select * from t'statement_filepath => "/usr/share/logstash/pipeline/sql/full_jdbc.sql"#statement => " select *  from product  where id  <=100 "}}# 过滤部分(不是必须项)
filter {json {source => "message"remove_field => ["message"]}
}# 输出部分
output {elasticsearch {# elasticsearch索引名index => "product"# elasticsearch的ip和端口号hosts => ["172.17.0.3:9200"]# 同步mysql中数据id作为elasticsearch中文档iddocument_id => "%{id}"}stdout {codec => json_lines}
}

5、修改full_jdbc.sql文件

mkdir /usr/local/logstash/config/conf.d/sql
cd /usr/local/logstash/config/conf.d/sql
vi full_jdbc.sql

full_jdbc.sql内容如下

SELECTid,TRIM( REPLACE ( name, ' ', '' ) ) AS productname,price
FROM product

6、打开Kibana创建索引和映射

注意!mysql—>logstash—>es
如果创建的映射是有大写的时候,es会自动转成小写
而且查看映射数据结构的时候会出现两个相同的字段(productname和productName)
这样就导致我们自己定义的映射无法使用,而有数据的是es自动生成的那个小写

PUT product
{"settings": {"number_of_shards": 1,"number_of_replicas": 1},"mappings": {"properties": {"productname": {"type": "text"},"price": {"type": "double"} }}
}

如果对映射没有硬性要求,可以忽略当前步骤,会自动创建索引。

# 当前在es中是没有数据的
GET product/_search

7、重启logstash进行全量同步

# 重启
docker restart c1d20ebf76c3
# 查看日志
docker logs -f --tail=200 c1d20ebf76c3

发现mysql中的数据已经同步至logstash中了。

8、踩坑

(1)报错

Error response from daemon: Cannot restart container 3849f947e115: driver failed programming external connectivity on endpoint logstash (60f5d9678218dc8d19bc8858fb1a195f4ebee294cff23d499a28612019a0ff78): (iptables failed: iptables --wait -t nat -A DOCKER -p tcp -d 0/0 --dport 4567 -j DNAT --to-destination 172.188.0.77:4567 ! -i br-413b460a0fc8: iptables: No chain/target/match by that name.

原因为:在启动firewalld之后,iptables被激活,
此时没有docker chain,重启docker后被加入到iptable里面

解决方案:
systemctl restart docker

三、增量同步

1、修改增量配置

修改上面的logstash.conf文件

input { stdin {}#使用jdbc插件jdbc {# mysql数据库驱动#jdbc_driver_library => "/usr/share/logstash/logstash-core/lib/jars/mysql-connector-java-5.1.48.jar"jdbc_driver_class => "com.mysql.jdbc.Driver"# mysql数据库链接,数据库名jdbc_connection_string => "jdbc:mysql://172.188.0.15:3306/shop?characterEncoding=UTF-8&useSSL=false"# mysql数据库用户名,密码jdbc_user => "root"jdbc_password => "root"# 设置监听间隔  各字段含义(分、时、天、月、年),全部为*默认含义为每分钟更新一次# /2* * * *表示每隔2分钟执行一次,依次类推schedule => "* * * * *"# 分页jdbc_paging_enabled => "true"# 分页大小jdbc_page_size => "50000"# sql语句执行文件,也可直接使用 statement => 'select * from t'statement_filepath => "/usr/share/logstash/pipeline/sql/increment_jdbc.sql"#上一个sql_last_value值的存放文件路径, 必须要在文件中指定字段的初始值#last_run_metadata_path => "./config/station_parameter.txt"#设置时区,此处更新sql_last_value查询的时区,sql_last_value还是默认UTCjdbc_default_timezone => "Asia/Shanghai"#使用其它字段追踪,而不是用时间#use_column_value => true#追踪的字段#tracking_column => idtracking_column_type => "timestamp"}}# 过滤部分(不是必须项)
filter {json {source => "message"remove_field => ["message"]}
}# 输出部分
output {elasticsearch {# elasticsearch索引名index => "product"# elasticsearch的ip和端口号hosts => ["172.188.0.88:9200"]# 同步mysql中数据id作为elasticsearch中文档iddocument_id => "%{id}"}stdout {codec => json_lines}
}

2、新建increment_jdbc.sql文件

/usr/local/logstash/config/conf.d/sql目录下新建increment_jdbc.sql文件

cd /usr/local/logstash/config/conf.d/sql
vi increment_jdbc.sql

increment_jdbc.sql内容如下:

此处sql尽量保持与全量一致Select后的

SELECTid,TRIM( REPLACE ( product_name, ' ', '' ) ) AS productname,priceFROM product where update_time > :sql_last_value

3、重启容器

# 启动
docker restart 容器id

4、测试

数据库插入一条数据之后,会自动同步至es

5、同步原理

#进入容器
docker exec -it 4f95a47f12de /bin/bash
#查看记录点
cat /usr/share/logstash/.logstash_jdbc_last_run

last_run_metadata_path=>“/usr/share/logstash/.logstash_jdbc_last_run”

在容器里面的/usr/share/logstash/路径下的隐藏文件.logstash_jdbc_last_run中记录了全量同步的UTC时间

每次同步完成记录该时间(重要)
在这里插入图片描述
注意
logstash_jdbc_last_run默认是没有的,执行增量后创建
文件也是可以删除的
容器重启会自动创建

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/38671.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

TCP/IP协议追层分析物理层(第三十九课)

TCP/IP协议追层分析物理层(第三十九课) 1 物理层:建立、维护、断开物理连接,定义了接口及介质,实现了比特流的传输。 1、传输介质分类 有线介质:网线(双绞线)、光纤 无线介质:无线电 微波 激光 红外线 2、双绞线分类: 五类cat5: 适用于100Mbps 超五类cat5e:适用于…

Qt扫盲- Graphics View框架理论综述

Graphics View框架理论综述 一、概述二、Graphics View 体系结构1. The Scene2. The View3. 图元 Item 三、图形视图坐标系统1. 图元Item的坐标2. Scene Scene坐标3. View 视图坐标4. 坐标映射 四、关键特性1. 缩放和旋转2. 打印3. 拖放4. 鼠标指针和 提示5. 动画6. OpenGL渲染…

【100天精通python】Day35:一文掌握GUI界面编程基本操作

目录 专栏导读 1 GUI 编程概述 1.1 为什么需要GUI&#xff1f; 1.2 常见的GUI编程工具和库 1.3 GUI应用程序的组成和架构 2 使用Tkinter 库 进行GUI编程 2.1 使用Tkinter库进行GUI编程的基本流程 2.2 使用Tkinter库进行GUI编程 2.2.1 导入Tkinter库 2.2.2 添加标签和…

绘制世界地图or中国地图

写在前面 在8月初,自己需要使用中国地图的图形,自己就此也查询相关的教程,自己也做一下小小总结,希望对自己和同学们有所帮助。 最终图形 这个系列从2022年开始,一直更新使用R语言分析数据及绘制精美图形。小杜的生信笔记主要分享小杜学习日常!如果,你对此感兴趣可以加…

MySQL存储结构及索引

文章目录 MySQL结构1.2存储引擎介绍1.3存储引擎特点InnoDB逻辑存储结构 MyISAMMemory区别及特点存储引擎选择 索引索引概述索引结构BTreeHash索引分类聚集索引&二级索引索引语法SQL性能分析索引优化最左前缀法则范围查询字符串不加引号模糊查询or连接条件数据分布影响覆盖索…

达梦数据库dbms_stats包的操作实践记录

索引的统计信息收集 GATHER_INDEX_STATSindex_stats_show 根据模式名&#xff0c;索引名获得该索引的统计信息。用于经过 GATHER_TABLE_STATS、GATHER_INDEX_STATS 或 GATHER_SCHEMA_STATS 收集之后展示。返回两个结果集&#xff1a;一个是索引的统计信息&#xff1b;另一个是…

Kotlin优点及为什么使用Kotlin

文章目录 一 Hello Kotlin二 Kotlin优点三 团队为什么采用 Kotlin 一 Hello Kotlin Kotlin和Andriod 二 Kotlin优点 三 团队为什么采用 Kotlin

Mendix 基础审计模块介绍

一、前言 作为售前顾问&#xff0c;帮助客户选型低代码产品是日常工作。考察一家低代码产品的好坏&#xff0c;其中一个维度就是产品的成熟度。产品成熟度直接影响产品在使用中的稳定性和用户体验&#xff0c;对于新工具导入和可持续运用至关重要。 那怎么考察一个产品是否成…

【校招VIP】java语言考点之ConcurrentHashMap1.7和1.8

考点介绍&#xff1a; ConcurrentHashMap是JAVA校招面试的热门考点&#xff0c;主要集中在1.7和1.8的底层结构和相关的性能提高。 理解这个考点要从map本身的并发问题出发&#xff0c;再到hashTable的低性能并发安全&#xff0c;引申到ConcurrentHashMap的分块处理。同时要理解…

【C++】做一个飞机空战小游戏(八)——生成敌方炮弹(rand()和srand()函数应用)

[导读]本系列博文内容链接如下&#xff1a; 【C】做一个飞机空战小游戏(一)——使用getch()函数获得键盘码值 【C】做一个飞机空战小游戏(二)——利用getch()函数实现键盘控制单个字符移动【C】做一个飞机空战小游戏(三)——getch()函数控制任意造型飞机图标移动 【C】做一个飞…

SpringBoot中的可扩展接口

目录 # 背景 # 可扩展的接口启动调用顺序图 # ApplicationContextInitializer # BeanDefinitionRegistryPostProcessor # BeanFactoryPostProcessor # InstantiationAwareBeanPostProcessor # SmartInstantiationAwareBeanPostProcessor # BeanFactoryAware # Applicati…

炬芯科技发布全新第二代智能手表芯片,引领腕上新趋势!

2023年7月&#xff0c;炬芯科技宣布全新第二代智能手表芯片正式发布。自2021年底炬芯科技推出第一代的智能手表芯片开始便快速获得了市场广泛认可和品牌客户的普遍好评。随着技术的不断创新和突破&#xff0c;为了更加精准地满足市场多元化的变幻和用户日益增长的体验需求&…

Jmeter-压力测试工具

文章目录 Jmeter快速入门1.1.下载1.2.解压1.3.运行 2.快速入门2.1.设置中文语言2.2.基本用法 Jmeter快速入门 1s内发送大量请求&#xff0c;模拟高QPS&#xff0c;用以测试网站能承受的压力有多大 Jmeter依赖于JDK&#xff0c;所以必须确保当前计算机上已经安装了JDK&#xff0…

Android Shape 的使用

目录 什么是Shape? shape属性 子标签属性 corners &#xff08;圆角&#xff09; solid &#xff08;填充色&#xff09; gradient &#xff08;渐变&#xff09; stroke &#xff08;描边&#xff09; padding &#xff08;内边距&#xff09; size &#xff08;大小…

CentOS系统环境搭建(三)——Centos7安装DockerDocker Compose

centos系统环境搭建专栏&#x1f517;点击跳转 Centos7安装Docker&Docker Compose 使用 yum 安装Docker 内核 [rootVM-4-17-centos ~]# uname -r 3.10.0-1160.88.1.el7.x86_64Docker 要求 CentOS 系统的内核版本高于 3.10 更新 yum yum update安装需要的软件包&#x…

在Windows Server 2008上启用自动文件夹备份

要在Windows Server 2008上启用自动文件夹备份&#xff0c;您可以使用内置的Windows备份功能。下面是如何设置它的方法&#xff1a; 1. 点击“开始”按钮并选择“服务器管理器”&#xff0c;打开“服务器管理器”。 2. 在“服务器管理器”窗口中&#xff0c;单击左侧窗格中的“…

Python学习笔记_基础篇(六)_Set集合,函数,深入拷贝,浅入拷贝,文件处理

1、Set基本数据类型 a、set集合&#xff0c;是一个无序且不重复的元素集合 class set(object):"""set() -> new empty set objectset(iterable) -> new set objectBuild an unordered collection of unique elements."""def add(self, *a…

redis-数据类型及样例

一.string 类型数据的基本操作 1.添加/修改数据 set key value2.获取数据 get key3.删除数据 del key4.添加/修改多个数据 mset key1 value1 key2 value25.获取多个数据 mget key1 key2二.list类型的基本操作 数据存储需求&#xff1a;存储多个数据&#xff0c;并对数据…

day 0815

计算文件有多少行&#xff1f; 2.文件的拷贝

SpringBoot引入外部jar打包失败解决,SpringBoot手动引入jar打包war后报错问题

前言 使用外部手动添加的jar到项目&#xff0c;打包时出现jar找不到问题解决 处理 例如项目结构如下 引入方式换成这种 <!-- 除了一下这两种引入外部jar&#xff0c;还是可以将外部jar包添加到maven中&#xff08;百度查&#xff09;--><!-- pdf转word --><…