FlinkCDC实战:将 MySQL 数据同步至 ES

📌 当前需要处理的业务场景:

  • 将订单表和相关联的表(比如: 商品表、子订单表、物流信息表)组织成宽表, 放入到 ES 中, 加速订单数据的查询.

  • 同步数据到 es.

  • 概述

    • 1. 什么是 CDC

    • 2. 什么是 Flink CDC

    • 3. Flink CDC Connectors 和 Flink 的版本映射

  • 实战

    • 1. 宽表查询

      • 1.1 创建 mysql 表

      • 1.2 启动 Flink 集群和 Flink SQL CLI

      • 1.3 在 Flink SQL CLI 中使用 Flink DDL 创建表

      • 1.4 关联订单数据并且将其写入 Elasticsearch 中

      • 1.5 修改 MySQL 中的数据

本文用到的 mysql 版本为 8.0.28,ES 版本为 7.17.3,flink 版本为 1.13.6,flink-sql-connector-mysql-cdc 版本为 2.2.1

flink-sql-connector-elasticsearch 版本为 7_2.11-1.13.2

概述

1. 什么是 CDC

CDC (Change Data Capture) 是 变更数据获取的简称。核心思想是监测并捕获数据库的变动(数据或数据表的插入、更新以及删除等),将这些变更按发生的顺序完整地记录下来,写入到消息中间件中以供其他服务进行订阅并消费。

2. 什么是 Flink CDC

Flink 社区开发了 flink-cdc-connectors 组件,这个一个可以直接从 MySQL、PostgreSQL等数据库直接 读取全量数据增量变更数据的source 组件。

3. Flink CDC Connectors 和 Flink 的版本映射

Flink CDC Conectors VersionFlink Version
1.0.01.11.*
1.1.01.11.*
1.2.01.12.*
1.3.01.12.*
1.4.01.13.*
2.0.*1.13.*

实战

1. 宽表查询

在商城项目中,商品、订单、物流的数据往往是存储在 MySQL 中,为了加速查询的效率,可以将数据组织成一张宽表,并实时地把它写到 ElasticSearch 中。

确保要监听的 mysql 服务器开启了 binlog 和对应监听 binlog 的账号有相对应的权限。

1.1 创建 mysql 表

-- MySQL
CREATE DATABASE mydb;
USE mydb;
CREATE TABLE products (id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,name VARCHAR(255) NOT NULL,description VARCHAR(512)
);
ALTER TABLE products AUTO_INCREMENT = 101;
​
INSERT INTO products
VALUES (default,"scooter","Small 2-wheel scooter"),(default,"car battery","12V car battery"),(default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"),(default,"hammer","12oz carpenter's hammer"),(default,"hammer","14oz carpenter's hammer"),(default,"hammer","16oz carpenter's hammer"),(default,"rocks","box of assorted rocks"),(default,"jacket","water resistent black wind breaker"),(default,"spare tire","24 inch spare tire");
​
CREATE TABLE orders (order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,order_date DATETIME NOT NULL,customer_name VARCHAR(255) NOT NULL,price DECIMAL(10, 5) NOT NULL,product_id INTEGER NOT NULL,order_status BOOLEAN NOT NULL -- Whether order has been placed
) AUTO_INCREMENT = 10001;
​
INSERT INTO orders
VALUES (default, '2020-07-30 10:08:22', 'Jark', 50.50, 102, false),(default, '2020-07-30 10:11:09', 'Sally', 15.00, 105, false),(default, '2020-07-30 12:00:30', 'Edward', 25.25, 106, false);CREATE TABLE shipments (shipment_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,order_id INTEGER NOT NULL,origin VARCHAR(255) NOT NULL,destination VARCHAR(255) NOT NULL,is_arrived BOOLEAN NOT NULL
) AUTO_INCREMENT = 10001;
INSERT INTO shipments
VALUES (default,10001,'Beijing','Shanghai',false),(default,10002,'Hangzhou','Shanghai',false),(default,10003,'Shanghai','Hangzhou',false);

1.2 启动 Flink 集群和 Flink SQL CLI

# 进入 Flink 目录
cd flink-13.6
​
# 启动 Flink 集群.启动成功的话,可以在 http://ip:8081/ 访问到对应的 Flink Web UI
./bin/start-cluster.sh
​
# 启动 Flink SQL CLI
./bin/sql-client.sh

1.3 在 Flink SQL CLI 中使用 Flink DDL 创建表

首先开启 checkpoint, 每隔 3s 做一次 checkpoint.

-- Flink SQL
Flink SQL> SET execution.checkpointing.interval = 3s;

然后,对于数据库中的表 productsordersshipments,使用 Flink SQL CLI 创建对应的表,用于同步这些底层数据库表的数据。

-- Flink SQL
Flink SQL> CREATE TABLE products (id INT,name STRING,description STRING,PRIMARY KEY (id) NOT ENFORCED) WITH ('connector' = 'mysql-cdc','hostname' = '192.168.110.100','port' = '3380','username' = 'root','password' = 'root','database-name' = 'flink_db','table-name' = 'products');
​
​
Flink SQL> CREATE TABLE orders (order_id INT,order_date TIMESTAMP(0),customer_name STRING,price DECIMAL(10, 5),product_id INT,order_status BOOLEAN,PRIMARY KEY (order_id) NOT ENFORCED) WITH ('connector' = 'mysql-cdc','hostname' = '192.168.110.100','port' = '3380','username' = 'root','password' = 'root','database-name' = 'flink_db','table-name' = 'orders');
​
​
Flink SQL> CREATE TABLE shipments (shipment_id INT,order_id INT,origin STRING,destination STRING,is_arrived BOOLEAN,PRIMARY KEY (shipment_id) NOT ENFORCED) WITH ('connector' = 'mysql-cdc','hostname' = '192.168.110.100','port' = '3380','username' = 'root','password' = 'root','database-name' = 'flink_db','table-name' = 'shipments');

最后,创建 enriched_orders 表,用来将关联后的订单数据写入 Elasticsearch 中

-- Flink SQL
Flink SQL> CREATE TABLE enriched_orders (order_id INT,order_date TIMESTAMP(0),customer_name STRING,price DECIMAL(10, 5),product_id INT,order_status BOOLEAN,product_name STRING,product_description STRING,shipment_id INT,origin STRING,destination STRING,is_arrived BOOLEAN,PRIMARY KEY (order_id) NOT ENFORCED) WITH ('connector' = 'elasticsearch-7','hosts' = 'http://114.132.43.99:9200','index' = 'enriched_orders');

1.4 关联订单数据并且将其写入 Elasticsearch 中

使用 Flink SQL 将订单表 order 与商品表 products,物流信息表 shipments 关联,并将关联后的订单信息写入 Elasticsearch 中。

-- Flink SQL
Flink SQL> INSERT INTO enriched_ordersSELECT o.*, p.name, p.description, s.shipment_id, s.origin, s.destination, s.is_arrivedFROM orders AS oLEFT JOIN products AS p ON o.product_id = p.idLEFT JOIN shipments AS s ON o.order_id = s.order_id;

现在可以通过 Kibana 查询包含商品和物流信息的订单数据。

首先访问 http://localhost:5601/app/kibana#/management/kibana/index_pattern 创建 index pattern enriched_orders.

然后就可以在 http://localhost:5601/app/kibana#/discover 看到写入的数据了.

1.5 修改 MySQL 中的数据

--MySQL
INSERT INTO orders
VALUES (default, '2020-07-30 15:22:00', 'Jark', 29.71, 104, false);
​
INSERT INTO shipments
VALUES (default,10004,'Shanghai','Beijing',false);

每执行一步就刷新一次 Kibana,可以看到 Kibana 中显示的订单数据将实时更新.

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/63407.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

使用 Glide 加载占位图或错误图时,发现它们没有应用圆角效果--问题解决

如果您在使用 Glide 加载占位图或错误图时,发现它们没有应用圆角效果,可能是因为占位图和错误图的加载方式没有使用自定义的圆角变换。以下是确保占位图和错误图都能显示圆角效果的步骤。 1. 确保自定义变换类正确 首先,确保您的 GlideRoundTransformUtil 类实现正确。以下…

Cobalt Strike 4.8 用户指南-第十二节 可拓展 PE,进程注入和后渗透

12.1、概述 Malleable C2 文件不仅仅是通信指标。Malleable C2 配置文件还能控制 Beacon 的内存特性,决定 Beacon 如何进行进程注入,并影响 Cobalt Strike 的后渗透工作。本章将介绍 Malleable C2 语言的这些扩展。 # 12.2、PE和内存指标 Malleable C…

unity 让文字变形

效果: using TMPro; using UnityEngine; using NaughtyAttributes;[ExecuteInEditMode] public class TMTextPerpective : MonoBehaviour {[OnValueChanged("DoPerspective")][Range(-1f, 1f)]public float CenterBias 0f;[OnValueChanged("DoPers…

NIO - selector简单介绍

一 前言 selector作为NIO当中三大组件之一,是处理NIO非阻塞模式下的核心组件,它允许一个单个线程管理多个通道。 NIO下的阻塞模式 因为对于阻塞模式下的NIO模式,存在很大的问题,即使在单线程下,对应的服务端也会一直进…

C语言:分支结构

C语言:分支结构 分支结构 问题引出 我们在程序设计往往会遇到如下的问题,比如下面的函数的计算 也就是我们是必须要通过一个条件的结果来选择下一步的操作,算法上属于一个分支结构,C语言中实现分支结构主要使用if语句 条件判断…

利用anzocapital昂首资本技术优化订单执行

在金融市场的深海中,anzocapital昂首资本作为内行,深知订单执行的技术缺陷如何悄然侵蚀交易者的利润。那么,一个懂交易的人会如何避免这些缺陷,确保自己的投资策略不被市场波动所左右呢? 在订单执行过程中,技术缺陷可…

数据库基础入门:从零开始学习数据库的核心概念

数据库是现代软件开发的核心组成部分之一,无论是网站、手机应用还是企业管理系统,都离不开数据库的支持。本文将带你从零开始,逐步了解数据库的基本概念和常见操作。 什么是数据库? 数据库(Database)是一个…

RTR Chaptor11 下

全局光照 定向遮蔽预计算定向遮蔽定向遮蔽的动态计算使用定向屏蔽进行着色 满反射全局光照表面预照明定向表面预照明预计算传输存储方法动态漫反射全局光照光照传播体积基于体素的方法屏幕空间方法其他方法 镜面全局光照局部环境贴图环境贴图的动态更新基于体素的方法平面反射屏…

java如何解析和生成sql?

1.什么是 JSQLParser? JSQLParser 是一个开源的 Java 库,用于解析 SQL 语句并将其转换为抽象语法树(AST)。它支持多种 SQL 方言,包括 MySQL、PostgreSQL、Oracle 和 SQL Server 等。JSQLParser 使开发者能够轻松地分析…

【Apache Paimon】-- 4 -- Flink 消费 kafka 数据,然后写入 paimon

目录 1、本地开发环境 2、kafka2paimon 实现流程 3、代码实现 3.1、项目名称 3.2、项目结构 3.3、Pom.xml 和 log4j.properties 文件 3.4、代码核心类 3.4.1、入口类:Kafka2PaimonDemo.java 3.4.2、参数解析类 3.4.2.1、JobParameterUtil.java( flink job schedule…

超越DFINE最新目标检测SOTA模型DEIM

代码地址:https://github.com/ShihuaHuang95/DEIM 论文地址:DEIM: DETR with Improved Matching for Fast Convergence 论文中文版:DEIM: 改进匹配的 DETR 以实现快速收敛 以下是文章的主要贡献和发现: DEIM框架:提…

在python中使用布尔逻辑

布尔是python中常见类型。它的值只能是两项内容之一:true或false. 编写"if"语句 若要在python中表达条件逻辑,可以使用if语句。——编写If语句离不开逻辑运算符:等于、不等于、小于、大于或等于、大于和大于或等于。 在python中…

位运算的总结--奇思妙解

目录 前言 先回顾常用的位运算 1:给一个数 n ,确定它的二进制表示中的第x位是0 还是 1 2:将一个数 n 的二进制表示的第x 位修改成 1 3:将一个数 n 的二进制表示的第 x位修改成 0 4:与位图联系 5:提取一…

语音识别flask接口开发

要开发一个flask语音识别接口,首先要解决语音文件在网络中的传输问题,然后选识别算法进行识别 文章目录 1、以二进制文件流方式上次语音2、网页端长连接流式上传语音文件3、语音识别接口 1、以二进制文件流方式上次语音 python服务端代码,以…

Kafka怎么发送JAVA对象并在消费者端解析出JAVA对象--示例

1、在pom.xml中加入依赖 <dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-kafka</artifactId><version>3.1.6</version></dependency> 2、配置application.yml 加入Kafk…

JS中的原型链与继承

原型链的类比 JS中原型链&#xff0c;本质上就是对象之间的关系&#xff0c;通过protoype和[[Prototype]]属性建立起来的连接。这种链条是动态的&#xff0c;可以随时变更。 这个就跟C/C中通过指针建立的关系很相似&#xff0c;比如&#xff0c;通过指针建立一个链表&#xf…

hive分区分桶、数据倾斜总结

一、hive的基本概念 hive是一个构建在hadoop上的数据仓库工具&#xff0c;可以将结构化的数据文件映射为一张数据库表并提供数据查询功能 二、hive的特点 &#xff08;1&#xff09;数据是存储在hdfs上 &#xff08;2&#xff09;底层是将sql转换为MapReduce任务进行计算 …

CSS学习记录04

CSS边框 CSS border 属性指定元素边框的样式、宽度和颜色。border-style 属性指定要显示的边框类型。dotted - 定义点线边框dashed - 定义虚线边框solid - 定义实线边框double - 定义双边框groove - 定义3D坡口边框&#xff0c;效果取决于border-color值ridge - 定义3D脊线边框…

一文了解模式识别顶会ICPR 2024的研究热点与最新趋势

简介 对模式识别研究领域前沿方向的跟踪是提高科研能力和制定科研战略的关键。本文通过图文并茂的方式介绍了ICPR 2024的研究热点与最新趋势&#xff0c;帮助读者了解和跟踪模式识别的前沿研究方向。本推文的作者是黄星宇&#xff0c;审校为邱雪和许东舟。 一、会议介绍 ICPR…

服务器挖矿

文章目录 一、确定挖矿进程并停止二、查找并清除挖矿相关文件三、检查并修复系统漏洞四、加强安全防护 一、确定挖矿进程并停止 查找挖矿进程 在Linux系统中&#xff0c;可以使用命令如top或htop来查看系统资源占用情况。挖矿程序通常会占用大量的CPU或GPU资源。例如&#xff…