使用Logstash将MySQL中的数据同步至Elasticsearch

目录

1 使用docker安装ELK

1.1 安装Elasticsearch

1.2 安装Kibana

1.3 安装Logstash

2 数据同步

2.1 准备MySQL表和数据

2.2 运行Logstash

2.3 测试

3 Logstash报错(踩坑)记录

3.1 记录一

3.1.1 报错信息

3.1.2 报错原因

3.1.3 解决方案

3.2 记录二

3.2.1 报错信息

3.2.2 报错原因

3.3.3 解决方案


1 使用docker安装ELK

        ELK是指Elasticsearch、Logstash、Kibana。

1.1 安装Elasticsearch

# 拉取es镜像
docker pull elasticsearch:7.4.2mkdir -p /root/docker/elasticsearch/config
mkdir -p /root/docker/elasticsearch/data# 任何ip都能访问
echo "http.host: 0.0.0.0" >> /root/docker/elasticsearch/config/elasticsearch.yml# 运行elasticsearch REST API端口9200 集群端口9300
docker run --name elasticsearch -p 9200:9200 -p 9300:9300 \
--restart=always \
--privileged=true \
-e "discovery.type=single-node" \
-e ES_JAVA_OPTS="-Xms64m -Xmx512m" \
-v /root/docker/elasticsearch/config/elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml \
-v /root/docker/elasticsearch/data:/usr/share/elasticsearch/data \
-v /root/docker/elasticsearch/plugins:/usr/share/elasticsearch/plugins \
-d elasticsearch:7.4.2# 保证权限 任何人任何组都可以读写操作执行,可以进入elasticsearch使用ll命令查看权限
chmod -R 777 /root/docker/elasticsearch/ 

 测试是否安装成功:

# 查看elasticsearch是否运行
docker ps -a

        在浏览器输入虚拟机的ip和elasticsearch的REST API端口http://172.1.11.10:9200/ ,如果出现以下内容,说明安装成功。

{"name": "7876d2859af8","cluster_name": "elasticsearch","cluster_uuid": "i46io2YkTY6pXr8IQ9qmXA","version": {"number": "7.4.2","build_flavor": "default","build_type": "docker","build_hash": "2f90bbf7b93631e52bafb59b3b049cb44ec25e96","build_date": "2019-10-28T20:40:44.881551Z","build_snapshot": false,"lucene_version": "8.2.0","minimum_wire_compatibility_version": "6.8.0","minimum_index_compatibility_version": "6.0.0-beta1"},"tagline": "You Know, for Search"
}

1.2 安装Kibana

# 拉取镜像,可视化检索数据
docker pull kibana:7.4.2# 运行Kibana
docker run --name kibana --restart=always --privileged=true \
-e ELASTICSEARCH_HOSTS=http://172.xx.xx.xx:9200 \
-p 5601:5601 -d kibana:7.4.2

说明:

(1)-e ELASTICSEARCH_HOSTS=http://172.xx.xx.xx:9200 :Elasticsearch地址。

(2)-d:后端运行。

(3)--restart=always:开机启动。

(4)--name kibana :容器名称。

(6)privileged=true :权限。

1.3 安装Logstash

  • Logstash是具有实时流水线能力的开源的数据收集引擎。Logstash可以动态统一不同来源的数据,并将数据标准化到您选择的目标输出。它提供了大量插件,可帮助我们解析,丰富,转换和缓冲任何类型的数据。 
  • 管道(Logstash Pipeline)是Logstash中独立的运行单元,每个管道都包含两个必须的元素输入(input)和输出(output),和一个可选的元素过滤器(filter),事件处理管道负责协调它们的执行。 输入和输出支持编解码器,使您可以在数据进入或退出管道时对其进行编码或解码,而不必使用单独的过滤器。
  • Logstash官方插件 logstash-input-jdbc集成在Logstash(5.x之后)的版本,可以通过配置实现mysql和es全量与增量数据的定时同步。
# 拉取logstash
docker pull logstash:7.4.2

2 数据同步

2.1 准备MySQL表和数据

create table pms_spu_info
(id                   bigint not null auto_increment comment '商品id',spu_name             varchar(200) comment '商品名称',spu_description      varchar(1000) comment '商品描述',catalog_id           bigint comment '所属分类id',brand_id             bigint comment '品牌id',weight               decimal(18,4),publish_status       tinyint comment '上架状态[0 - 下架,1 - 上架]',create_time          datetime,update_time          datetime,primary key (id)
);

2.2 运行Logstash

# 运行logstash
docker run -d --name logstash logstash:7.4.2mkdir -p /root/docker/logstash/config
mkdir -p /root/docker/logstash/data
mkdir -p /root/docker/logstash/pipeline
mkdir -p /root/docker/logstash/jars# 上传mysql驱动mysql-connector-java-5.1.47.jar到/root/docker/logstash/jars#拷贝已启动的容器中的文件到宿主机,用于重启挂载
docker cp logstash2:/usr/share/logstash/config /root/docker/logstash/
docker cp logstash2:/usr/share/logstash/data /root/docker/logstash/
docker cp logstash2:/usr/share/logstash/pipeline /root/docker/logstash/# 保证权限 任何人任何组都可以读写操作执行
chmod -R 777 /root/docker/logstash# 删除logstash容器
docker rm -f logstash# 配置连接es
cd /root/docker/logstash/config
vi logstash.yml
  • logstash.yml
http.host: "0.0.0.0"
xpack.monitoring.elasticsearch.hosts: [ "http://172.xx.xx.6:9200" ]
  • 创建mysql.conf,编写mysql数据同步至es相关配置
# 创建mysql.conf
cd /root/docker/logstash2/pipeline/
vi mysql.conf

        1)mysql.conf内容如下:

input {jdbc {type => "jdbc"# 数据库连接地址jdbc_connection_string => "jdbc:mysql://172.xx.xx.xx:9906/gulimall_pms?useUnicode=true&characterEncoding=UTF-8&useSSL=false"# 数据库连接账号和密码jdbc_user => "root"jdbc_password => "root"# MySQL驱动架包jdbc_driver_library => "/usr/share/logstash/mysql/mysql-connector-java-8.0.17.jar"# MySQL驱动jdbc_driver_class => "com.mysql.jdbc.Driver"# 数据库重连尝试次数connection_retry_attempts => "3"# 判断数据库连接是否可用,默认是false不开启jdbc_validate_connection => "true"# 数据库连接可用校验超时时间,默认3600秒jdbc_validation_timeout => "3600"# 开启分页查询,默认false不开启jdbc_paging_enabled => "true"# 单次分页查询条数(默认100000,若字段较多且更新频率较高,建议调低此值)jdbc_page_size => "500"# 查询数据sql,如果sql较复杂,建议配通过statement_filepath配置sql文件的存放路径statement => "SELECT id,spu_name spuName,spu_description spuDescription,catalog_id catalogId,brand_id brandId,weight,publish_status publishStatus,DATE_FORMAT(create_time,'%Y-%m-%d %H:%i:%s') createTime,DATE_FORMAT(update_time,'%Y-%m-%d %H:%i:%s') updateTime FROM pms_spu_info WHERE update_time > :sql_last_value"# 是否将字段名转换为小写,默认true(如果有数据序列化、反序列化需求,建议改为false)lowercase_column_names => false# 是否记录上次执行结果,true表示会将上次执行结果的tracking_column字段的值保存到last_run_metadata_path指定的文件中record_last_run => true# 需要记录查询结果某字段的值时,此字段为true,否则默认tracking_column为timestamp的值use_column_value => true# 需要记录的字段,用于增量同步,需是数据库字段tracking_column => "updateTime"# 轨迹字段类型Value can be any of: numeric,timestamp,Default value is "numeric"tracking_column_type => timestamp# record_last_run上次数据存放位置last_run_metadata_path => "/usr/share/logstash/config/logstash_metadata"# 是否清除last_run_metadata_path的记录,需要增量同步时此字段必须为falseclean_run => false# 同步频率(分 时 天 月 年),默认每分钟同步一次schedule => "* * * * *"}
}output {elasticsearch {# host => "192.168.1.1"# port => "9200"# 配置ES集群地址hosts => ["172.xx.xx.xx:9200"]# 索引名字,必须小写index => "spu"# 文档id,数据唯一索引(建议使用表的主键)document_id => "%{id}"}stdout {codec => json_lines}
}

        2)查询sql如下:

SELECT 
id,spu_name spuName,spu_description spuDescription,catalog_id catalogId,
brand_id brandId,weight,publish_status publishStatus,
DATE_FORMAT(create_time,'%Y-%m-%d %H:%i:%s') createTime,
DATE_FORMAT(update_time,'%Y-%m-%d %H:%i:%s') updateTime 
FROM pms_spu_info WHERE update_time > :sql_last_value

日期通过DATE_FORMAT(date,"输出格式")进行格式化,数据库与es日期格式保持一致。

  •  重新运行logstash容器
docker run  --name logstash --restart=always -d -p 5044:5044 -p 9600:9600   \
--privileged=true \
-v /root/docker/logstash/config:/usr/share/logstash/config   \
-v /root/docker/logstash/jars/mysql-connector-java-5.1.47.jar:/usr/share/logstash/logstash-core/lib/jars/mysql-connector-java-5.1.47.jar \
-v /root/docker/logstash/pipeline:/usr/share/logstash/pipeline \
logstash:7.4.2 -f /usr/share/logstash/pipeline/mysql.conf

说明:

(1)-f 是一个非常有用的选项,可以使用户使用指定的文件来指定一些Docker镜像的构建和配置信息。

(2)-f 也可以用于强制删除容器。

2.3 测试

  • mysql表中数据,如下

  • 通过Kibana进行查询,如下:

3 Logstash报错(踩坑)记录

3.1 记录一

3.1.1 报错信息

LogStash::PluginLoadingError Unable to find driver class via URLClassLoader in given driver jars : com.mysql.jdbc.Driver and com.mysql.jdbc.Driver

3.1.2 报错原因

        Logstashd的logstash-input-jdbc插件在调用数据库驱动jar包时,默认会去logstash/logstash-core/lib/jars/目录下去找。

3.1.3 解决方案

        将数据库驱动(例如:mysql-connector-java-5.1.47.jar)放到/usr/share/logstash/logstash-core/lib/jars/下面。

3.2 记录二

3.2.1 报错信息

javax.net.ssl.SSLException: closing inbound before receiving peer's close _notify

3.2.2 报错原因

        安装的是mysql8.x的版本,远程连接发现需要做ssl身份验证,本机连接不需要,取消掉其ssl身份验证需要调整配置。        

3.3.3 解决方案

        数据库连接地址上添加useSSL=false,如下:

"jdbc:mysql://172.xx.xx.xx:9906/gulimall_pms?useUnicode=true&characterEncoding=UTF-8&useSSL=false"

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/661606.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

图像异或加密、解密的实现

很多论文提到了从左上角开始做异或,逐行推导得到结果。 解密过程是加密的逆过程。 先看其基本方法: 参考文献: A Chaotic System Based Image Encryption Scheme with Identical Encryption and Decryption Algorithm 大多数论文都用了这个思路,我们使用MATLAB实现代码…

Leetcode 热门百题斩(第一天)

介绍 针对leetcode的热门一百题,解决大多数实习生面试的基本算法题。通过我自己的思路和多种方法,供大家参考。 1.两数之和(题号:1) 方法一 最先想到的就是两个for去遍历匹配。 class Solution {public int[] twoSum(int[]…

重写Sylar基于协程的服务器(2、配置模块的设计)

重写Sylar基于协程的服务器(2、配置模块的设计) 重写Sylar基于协程的服务器系列: 重写Sylar基于协程的服务器(0、搭建开发环境以及项目框架 || 下载编译简化版Sylar) 重写Sylar基于协程的服务器(1、日志模…

vue3使用is动态切换组件报错Vue received a Component which was made a reactive object.

vue3使用is动态切换组件,activeComponent用ref定义报错 Vue received a Component which was made a reactive object. This can lead to unnecessary performance overhead, and should be avoided by marking the component with markRaw or using shallowRef ins…

cesium 多边形渐变颜色

cesium画一个渐变颜色的多边形 方式一:用一张颜色渐变的图片作为材质,结合color属性,可设置多边形的颜色,达到渐变效果。图片指向正北方向。 viewer.entities.add({polygon: {hierarchy: Cesium.Cartesian3.fromDegreesArray([115…

bs4模块

bs4模块与案例 使用指南 bs4,全称BeautifulSoup 4,是Python中一个强大的网页解析库,它可以帮助我们方便地从网页中提取数据。bs4将复杂HTML文档转换成树形结构,每个节点都是Python对象,所有对象可以归纳为4种&#xf…

【PaddleSpeech】语音合成-男声

环境安装 系统:Ubuntu > 16.04 源码下载 使用apt安装 build-essential sudo apt install build-essential 克隆 PaddleSpeech 仓库 # github下载 git clone https://github.com/PaddlePaddle/PaddleSpeech.git # 也可以从gitee下载 git clone https://gite…

EBC金融英国CEO:高波动性周期下,如何寻找市场的稳定性?

利率主导的市场,将在2024年延续。目前,固收市场对于降息的定价,正通过利率传导至不同资产中。尽管市场迫切利用通胀去佐证降息,但各国央行仍囿于通胀目标的政策桎梏。政策和市场预期的博弈将继续牵动市场脉搏,引发价格…

基于SSM+MySQL的的新闻发布系统设计与实现

目录 项目简介 项目技术栈 项目运行环境 项目截图 代码截取 源码获取 项目简介 新闻发布系统是一款基于Servletjspjdbc的网站应用程序,旨在提供一个全面且高效的新闻发布平台。该系统主要包括后台管理和前台新闻展示两个平台,涵盖了新闻稿件的撰写…

充电桩项目实战:搞定多数据源!

你好,我是田哥 最近,我在对充电桩项目进行微服务升级中,既然是项目升级,难免会遇到各种各样的问题。比如:分布式事务问题、多数据源问题、分布式锁问题等。 项目技术栈: SpringSpring BootSpring Cloud Ali…

JavaScript基础(二)—— 运算符、表达式与语句(if、switch、循环)

学习目标: 掌握常见运算符,为程序“能思考”做准备 掌握分支语句,让程序具备判断能力 掌握循环语句,让程序具备重复执行能力 一、运算符 1. 赋值运算符 对变量进行赋值的运算符,能够使用赋值运算符简化代码。 …

推荐系统|概要03_AB测试

文章目录 A/B测试问题流量不够用解决方案——分层实验 Holdout 机制 A/B测试 其中小流量是指对部分的用户先尝试改进的算法模型,而非全部。若为全部,如果算法模型存在问题,可能会导致用户体验差,导致用户流失,而小流量…

深入探究iframe:网页嵌入的魔法盒子(下)

🤍 前端开发工程师、技术日更博主、已过CET6 🍨 阿珊和她的猫_CSDN博客专家、23年度博客之星前端领域TOP1 🕠 牛客高级专题作者、打造专栏《前端面试必备》 、《2024面试高频手撕题》 🍚 蓝桥云课签约作者、上架课程《Vue.js 和 E…

DATAX改造支持geometry类型数据同步

数据库使用postgresql安装了postgis插件存储了geometry空间数据,想使用datax做数据同步,但datax本身不支持geometry类型数据,如何改造呢? 1.首先下载已改造支持geometry类型的datax引擎,下载地址 https://download.c…

Jmeter性能测试: Jmeter 5.6.3 分布式部署

目录 一、实验 1.环境 2.jmeter 配置 slave 代理压测机 3.jmeter配置master控制器压测机 4.启动slave从节点检查 5.启动master主节点检查 6.运行jmeter 7.观察jmeter-server主从节点变化 二、问题 1.jmeter 中间请求和响应乱码 一、实验 1.环境 (1&#…

oracle数仓rac两个节点查询耗时不一致问题处理

问题描述 数据库节点1查询比节点2查询慢。现场操作应用发现发现同一sql语句在节点2上只要2分钟左右,在节点1,该条sql执行要超过30分钟。 处理过程 根据问题,初步判断是由于错误的执行计划,导致性能问题,但实际上对两…

编程流程图

对于复杂流程,我做开发之前一般会 先画一下流程图。特别是多个部门有交叉的情况下: processOn: 这个是我之前 一直的选择,他可以画上面的这些,流程图,网页操作,但是他不是免费的,查过…

JavaScript事件冒泡和捕获

🧑‍🎓 个人主页:《爱蹦跶的大A阿》 🔥当前正在更新专栏:《VUE》 、《JavaScript保姆级教程》、《krpano》、《krpano中文文档》 ​ ​ ✨ 前言 事件传播是JavaScript中非常重要的一个概念,它描述了从嵌套元素到祖先…

【C++干货铺】哈希结构在C++中的应用

目录 unordered系列关联式容器 unordered_map unordered_map的接口说明 1.unordered_map的构造 2. unordered_map的容量 3. unordered_map的迭代器 4. unordered_map的元素访问 5. unordered_map的查询 6. unordered_map的修改操作 7. unordered_map的桶操作 底层结构 …

mysql+node.js+html+js完整扫雷项目

一.下载 可以直接下载绑定资源, 也可以访问:克隆仓库:mine_clearance: mysqlnode.jshtmljs完整扫雷项目 (gitee.com) 二.运行sql数据文件 将mysql数据文件导入到本地 先在本地localhost里创建数据库 mine_clearance, 然后如图&…