Flink CDC系列之:TiDB CDC 导入 Elasticsearch

Flink CDC系列之:TiDB CDC 导入 Elasticsearch

  • 一、通过docker 来启动 TiDB 集群
  • 二、下载 Flink 和所需要的依赖包
  • 三、在TiDB数据库中创建表和准备数据
  • 四、启动Flink 集群,再启动 SQL CLI
  • 五、在 Flink SQL CLI 中使用 Flink DDL 创建表
  • 六、Kibana查看ElasticSearch数据
  • 七、在 TiDB增删改数据,观察 ElasticSearch 中的结果

一、通过docker 来启动 TiDB 集群

git clone https://github.com/pingcap/tidb-docker-compose.git

替换目录 tidb-docker-compose 里面的 docker-compose.yml 文件,内容如下所示:

version: "2.1"services:pd:image: pingcap/pd:v5.3.1ports:- "2379:2379"volumes:- ./config/pd.toml:/pd.toml- ./logs:/logscommand:- --client-urls=http://0.0.0.0:2379- --peer-urls=http://0.0.0.0:2380- --advertise-client-urls=http://pd:2379- --advertise-peer-urls=http://pd:2380- --initial-cluster=pd=http://pd:2380- --data-dir=/data/pd- --config=/pd.toml- --log-file=/logs/pd.logrestart: on-failuretikv:image: pingcap/tikv:v5.3.1ports:- "20160:20160"volumes:- ./config/tikv.toml:/tikv.toml - ./logs:/logs           command:- --addr=0.0.0.0:20160- --advertise-addr=tikv:20160- --data-dir=/data/tikv- --pd=pd:2379- --config=/tikv.toml- --log-file=/logs/tikv.logdepends_on:- "pd"restart: on-failuretidb:image: pingcap/tidb:v5.3.1ports:- "4000:4000"volumes:- ./config/tidb.toml:/tidb.toml- ./logs:/logscommand:- --store=tikv- --path=pd:2379- --config=/tidb.toml- --log-file=/logs/tidb.log- --advertise-address=tidbdepends_on:- "tikv"restart: on-failureelasticsearch:image: elastic/elasticsearch:7.6.0container_name: elasticsearchenvironment:- cluster.name=docker-cluster- bootstrap.memory_lock=true- "ES_JAVA_OPTS=-Xms512m -Xmx512m"- discovery.type=single-nodeports:- "9200:9200"- "9300:9300"ulimits:memlock:soft: -1hard: -1nofile:soft: 65536hard: 65536kibana:image: elastic/kibana:7.6.0container_name: kibanaports:- "5601:5601"volumes:- /var/run/docker.sock:/var/run/docker.sock

该 Docker Compose 中包含的容器有:

  • TiDB 集群: tikv、pd、tidb。
  • Elasticsearch:orders 表将和 products 表进行 join,join 的结果写入 Elasticsearch 中。
  • Kibana:可视化 Elasticsearch 中的数据。

本机添加 host 映射 pd 和 tikv 映射 127.0.0.1。 在 docker-compose.yml 所在目录下运行如下命令以启动所有容器:

docker-compose up -d
mysql -h 127.0.0.1 -P 4000 -u root # Just test tidb cluster is ready,if you have install mysql local.

该命令会以 detached 模式自动启动 Docker Compose 配置中定义的所有容器。 你可以通过 docker ps 来观察上述的容器是否正常启动了。 也可以访问 http://localhost:5601/ 来查看 Kibana 是否运行正常。

另外可以通过如下命令停止并删除所有的容器:

docker-compose down

二、下载 Flink 和所需要的依赖包

下载 Flink 1.17.1 并将其解压至目录 flink-1.17.1

https://archive.apache.org/dist/flink/flink-1.17.1/flink-1.17.1-bin-scala_2.12.tgz

下载下面列出的依赖包,并将它们放到目录 flink-1.17.1/lib/ 下:

  • flink-connector-tidb-cdc-2.4.1.jar
  • flink-sql-connector-elasticsearch7-3.0.1-1.17.jar

三、在TiDB数据库中创建表和准备数据

创建数据库和表 products,orders,并插入数据:

-- TiDB
CREATE DATABASE mydb;
USE mydb;
CREATE TABLE products (id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,name VARCHAR(255) NOT NULL,description VARCHAR(512)
) AUTO_INCREMENT = 101;INSERT INTO products
VALUES (default,"scooter","Small 2-wheel scooter"),(default,"car battery","12V car battery"),(default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"),(default,"hammer","12oz carpenter's hammer"),(default,"hammer","14oz carpenter's hammer"),(default,"hammer","16oz carpenter's hammer"),(default,"rocks","box of assorted rocks"),(default,"jacket","water resistent black wind breaker"),(default,"spare tire","24 inch spare tire");CREATE TABLE orders (order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,order_date DATETIME NOT NULL,customer_name VARCHAR(255) NOT NULL,price DECIMAL(10, 5) NOT NULL,product_id INTEGER NOT NULL,order_status BOOLEAN NOT NULL -- Whether order has been placed
) AUTO_INCREMENT = 10001;INSERT INTO orders
VALUES (default, '2020-07-30 10:08:22', 'Jark', 50.50, 102, false),(default, '2020-07-30 10:11:09', 'Sally', 15.00, 105, false),(default, '2020-07-30 12:00:30', 'Edward', 25.25, 106, false);

四、启动Flink 集群,再启动 SQL CLI

使用下面的命令跳转至 Flink 目录下

cd flink-1.17.1

使用下面的命令启动 Flink 集群

./bin/start-cluster.sh

启动成功的话,可以在 http://localhost:8081/ 访问到 Flink Web UI,如下所示:
在这里插入图片描述
使用下面的命令启动 Flink SQL CLI

./bin/sql-client.sh

启动成功后,可以看到如下的页面:

在这里插入图片描述

五、在 Flink SQL CLI 中使用 Flink DDL 创建表

首先,开启 checkpoint,每隔3秒做一次 checkpoint

-- Flink SQL                   
Flink SQL> SET execution.checkpointing.interval = 3s;

使用 Flink SQL CLI 创建对应的表,用于同步这些底层数据库表的数据

Flink SQL> CREATE TABLE products (id INT,name STRING,description STRING,PRIMARY KEY (id) NOT ENFORCED) WITH ('connector' = 'tidb-cdc','tikv.grpc.timeout_in_ms' = '20000','pd-addresses' = '127.0.0.1:2379','database-name' = 'mydb','table-name' = 'products');Flink SQL> CREATE TABLE orders (order_id INT,order_date TIMESTAMP(3),customer_name STRING,price DECIMAL(10, 5),product_id INT,order_status BOOLEAN,PRIMARY KEY (order_id) NOT ENFORCED) WITH ('connector' = 'tidb-cdc','tikv.grpc.timeout_in_ms' = '20000','pd-addresses' = '127.0.0.1:2379','database-name' = 'mydb','table-name' = 'orders'
);Flink SQL> CREATE TABLE enriched_orders (order_id INT,order_date DATE,customer_name STRING,order_status BOOLEAN,product_name STRING,product_description STRING,PRIMARY KEY (order_id) NOT ENFORCED) WITH ('connector' = 'elasticsearch-7','hosts' = 'http://localhost:9200','index' = 'enriched_orders_1');

将关联后的数据插入到ElasticSearch

Flink SQL> INSERT INTO enriched_ordersSELECT o.order_id, o.order_date, o.customer_name, o.order_status, p.name, p.descriptionFROM orders AS oLEFT JOIN products AS p ON o.product_id = p.id;

六、Kibana查看ElasticSearch数据

检查最终的结果是否写入 ElasticSearch 中,可以在 Kibana 看到 ElasticSearch 中的数据。

首先访问 http://localhost:5601/app/kibana#/management/kibana/index_pattern 创建 index pattern enriched_orders.

在这里插入图片描述
然后就可以在 http://localhost:5601/app/kibana#/discover 看到写入的数据了.
在这里插入图片描述

七、在 TiDB增删改数据,观察 ElasticSearch 中的结果

通过如下的 SQL 语句对 TiDB 数据库进行一些修改,然后就可以看到每执行一条 SQL 语句,Elasticsearch 中的数据都会实时更新。

INSERT INTO orders
VALUES (default, '2020-07-30 15:22:00', 'Jark', 29.71, 104, false);UPDATE orders SET order_status = true WHERE order_id = 10004;DELETE FROM orders WHERE order_id = 10004;

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/34223.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

不知道打仗之害,就不知道打仗之利

不知道打仗之害,就不知道打仗之利 【安志强趣讲《孙子兵法》第7讲】 【原文】 夫钝兵挫锐,屈力殚货,则诸侯乘其弊而起,虽有智者,不能善其后矣。 【注释】 屈力殚货:屈力,指力量消耗,…

维深(Wellsenn):2023中国消费端VR内容开发商调研报告(附下载

关于报告的所有内容,公众【营销人星球】获取下载查看 核心观点 国内互联网大厂商入局VR,字节跳动、网易表态明确。字节跳动2021年收购国内头部VR硬件厂商PICO后,加速构建VR内容生态,2021年 成立海南创见未来当前已推出VR视频应用…

大语言模型之三 InstructGPT训练过程

大语言模型 GPT历史文章中简介的大语言模型的的发展史,并且简要介绍了大语言模型的训练过程,本篇文章详细阐述训练的细节和相关的算法。 2020年后全球互联网大厂、AI创业公司研发了不少AI超大模型(百亿甚至千亿参数),…

【学习日记】【FreeRTOS】手动任务切换详解

前言 本文是关于 FreeRTOS 中实现两个任务轮流切换并执行的代码详解。目前不支持优先级,仅实现两个任务轮流切换。 一、任务的自传 任务从生到死的过程究竟是怎么样的呢?(其实也没死),这个问题一直困扰着我&#xf…

LeetCode[164]最大间距

难度:Hard 题目: 给定一个无序的数组 nums,返回 数组在排序之后,相邻元素之间最大的差值 。如果数组元素个数小于 2,则返回 0 。 您必须编写一个在「线性时间」内运行并使用「线性额外空间」的算法。 示例 1: 输入: …

CSDN 直播:腾讯云大数据 ES 结合 AI 大模型与向量检索的新一代云端检索分析引擎 8月-8号 19:00-20:30

本次沙龙围绕腾讯云大数据ES产品展开,重点介绍了腾讯云ES自研的存算分离技术,以及能与AI大模型和文本搜索深度结合的高性能向量检索能力。同时,本次沙龙还将为我们全方位介绍腾讯云ES重磅推出的Elasticsearch Serverless服务,期待…

基于亚奈奎斯特采样和SOMP算法的平板脉冲响应空间插值matlab仿真

目录 1.算法运行效果图预览 2.算法运行软件版本 3.部分核心程序 4.算法理论概述 5.算法完整程序工程 1.算法运行效果图预览 2.算法运行软件版本 matlab2022a 3.部分核心程序 ...................................................................... %fine regular gr…

本地构建包含java和maven的镜像

目录 1.前提条件 2.下载 2.1.创建Dockerfile 3.构建镜像 参考文章 1.前提条件 本地环境需要的系统和软件 win10 Docker Desktop Powershell 图1 Win10安装Docker后,直接在Powershell使用Docker命令 有些Developer不习惯win10系统,却想要使用Lin…

Arraylist集合

保存数据会经常使用到数组,但数组存在以下几个缺陷: 长度固定;保存的必须为同一类型的元素,(基本数据类型,或引用数据类型);使用数组进行增加元素的步骤比较麻烦; 这个时候就需要用一…

安卓:网络框架okhttp

目录 一、okhttp介绍 1. OkHttpClient类: 常用方法: 2. Request类: 常用方法: 3. Response类: 常用方法: 4. Call类: 常用方法: 5. Interceptor接口: 常用方法&…

在生产环境中部署Elasticsearch:最佳实践和故障排除技巧———索引与数据上传(二)

前言 「作者主页」:雪碧有白泡泡 「个人网站」:雪碧的个人网站 「推荐专栏」: ★java一站式服务 ★ ★ React从入门到精通★ ★前端炫酷代码分享 ★ ★ 从0到英雄,vue成神之路★ ★ uniapp-从构建到提升★ ★ 从0到英雄&#xff…

MySQL语句总和之MySQL数据库与表结构操作

目录 1、启动MySQL服务 2、进入MySQL数据库 3、退出数据库 4、查看MySQL数据库所有库 5、创建、删除、使用、查看所处库操作 6、创建表 7、查看表结构 8、表结构操作 1)修改表名 2)自增长操作 3)添加一个address字段放在Phone字段后面 4)添加…

python爬虫3:requests库-案例1

python爬虫3:requests库-案例1 前言 ​ python实现网络爬虫非常简单,只需要掌握一定的基础知识和一定的库使用技巧即可。本系列目标旨在梳理相关知识点,方便以后复习。 申明 ​ 本系列所涉及的代码仅用于个人研究与讨论,并不会对网…

分布式系统监控zabbix安装部署及使用

目录 一.zabbix监控 1.什么是zabbix 2.zabbix功能 3.zabbix的构成 4.zabbix的3种架构 4.1 C/S架构 4.2 分布式架构:zabbix-proxy-client架构 4.3 master-node-client架构 6.zabbix监控模式 二.zabbix部署及图形化页面显示设置(192.168.158.25) 1.zabbix安装…

uniapp 用 hbuilderx下载 uview

uView2.0重磅发布,利剑出鞘,一统江湖 - DCloud 插件市场 1.uniapp官网下载资源 2按下载 3.官网安装文档 要按 这个红色圈错了 然后看他的配置步骤 第四easycom 就可以 不用配了

第二课-一键安装SD-Stable Diffusion 教程

前言 看完这篇文章并跟着操作,就可以在本地开始 SD 绘图了。 理论上来说,这篇课程结束,想要画什么图都可以画了。 启动器介绍 SD 是开源的,可以在 github 上找到。但直接下载源码安装,非常费劲,而且因为国内外差异,就是我这样的秃头程序员也难以应对。 所以,我们改…

SQL SERVER 异地备份到远程共享文件夹异常处理

SQL SERVER 异地备份到远程共享文件夹异常处理 SQL Server 异地备份到远程共享文件夹异常处理 - 灰信网(软件开发博客聚合) -- 允许配置高级选项 EXEC sp_configure show advanced options, 1 GO -- 重新配置 RECONFIGURE GO -- 启用xp_cmdshell EXEC sp…

闭环控制方法及其应用:优缺点、场景和未来发展

闭环控制是一种基本的控制方法,它通过对系统输出与期望值之间的误差进行反馈,从而调整系统输入,使系统输出更加接近期望值。闭环控制的主要目标是提高系统的稳定性、精确性和鲁棒性。在实际应用中,闭环控制有多种方法,…

HCIP的BGP基础实验

一、实验需求 除R5的5.5.5.0环回外,其他所有的环回均可互相一访问。 二、实验步骤 1.配置ip 2.建立邻居关系 2.1 R1和R2建立直连的EBGP邻居关系 [r1]bgp 1 [r1-bgp]router-id 1.1.1.1 [r1-bgp]peer 12.1.1.2 as-number 2 要建的话双方都要建下面配置R2 [r2]bgp…

Token 失效退出至登录页面

目录 前端设置: 1. 在登录页面,调用登录的接口后,直接获取当前时间并保存在本地,类似保存token。 2. 在路由守卫 获取本机存储的时间戳,加15分钟与当前时间进行对比,如果大于当前时间说明token失效&…