FlinkCDC第四部分-同步mysql到mysql,ctrl就完事~(flink版本1.17.1)

 本文介绍了不同源单表-单表同步,不同源多表-单表同步。

注:此版本支持火焰图

Flink版本:1.17.1

环境:Linux CentOS 7.0、jdk1.8

基础文件:

flink-1.17.1-bin-scala_2.12.tgz、

flink-connector-jdbc-3.0.0-1.16.jar、(maven仓库目录:corg.apache.flink/flink-connector-jdbc/3.0.0-1.16)

flink-sql-connector-mysql-cdc-2.3.0.jar、(maven仓库目录:com.ververica/flink-sql-connector-mysql-cdc/2.3.0)
安装Flink步骤详见文章第二篇

支持的mysql版本: 

一、 数据源ip为***.51的源表,同步数据到数据源ip为***.50的目标表中,需要以下几个步骤:

1. 启动flink服务:

[root@localhost bin]#  ./start-cluster.sh

2. 停止flink服务:

[root@localhost bin]#  ./stop-cluster.sh

3. 启动FinkSQL:

[root@localhost bin]# ./sql-client.sh

4. 编写FlinkSql,创建临时表和job:

FlinkSql与mysql字段的类型映射

 把写好的Sql粘贴到FlinkSql客户端命令行中,分号'  ;  '是语句结束标识符,按回车创建:

 创建来源表结构:

来源表链接类型为'connector' = 'mysql-cdc'

Flink SQL> CREATE TABLE source_alarminfo51 (
>   id STRING NOT NULL,
>   AlarmTypeID STRING,
>   `Time` timestamp,
>   PRIMARY KEY (`id`) NOT ENFORCED
>  ) WITH (
>     'connector' = 'mysql-cdc',
>     'hostname' = '***',
>     'port' = '3306',
>     'username' = '***',
>     'password' = '***',
>     'database-name' = 'alarm',
>     'server-time-zone' = 'Asia/Shanghai',
>     'table-name' = 'alarminfo'
>  );

[INFO] Execute statement succeed.

 创建目标表结构(目标表结构可比来源表字段多,可使用视图指定字段默认值):

目标表链接类型为'connector' = 'jdbc',注意url需要跟后面以下属性值

Flink SQL> CREATE TABLE target_alarminfo50 (
>   id STRING NOT NULL,
>   AlarmTypeID STRING,
>   `Time` timestamp
>   PRIMARY KEY (`id`) NOT ENFORCED
>  ) WITH (
>     'connector' = 'jdbc',
>     'url' = 'jdbc:mysql://***:3306/alarm?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false&serverTimezone=Asia/Shanghai&useSSL=true&dontTrackOpenResources=true&defaultFetchSize=10000&useCursorFetch=true',
>     'username' = '***',
>     'password' = '****',
>     'table-name' = 'alarminfo',
>     'driver' = 'com.mysql.cj.jdbc.Driver'
>  );

[INFO] Execute statement succeed.

 最后创建同步关系:

INSERT INTO target_alarminfo50 SELECT * FROM source_alarminfo51;

如下:

创建完表结构可使用下列语句查看和删除:

查看表:show tables;

删除表:drop table if exists  target_alarminfo; 

flink-UI页面效果:

打开火焰图:

编辑flink-conf.yaml:最后面添加 

rest.flamegraph.enabled: true

配置后重启flink服务,重新创建任务。

火焰图效果:

数据同步效果:

源表:

目标表数据:首次数据全量,后面数据变更增量 

 注:

在分析火焰图时,可以关注以下几点:
函数的执行时间:纵向的轴显示了函数的嵌套层级,越往下表示越深层的函数调用。横向轴表示时间,通过不同颜色的方块来表示函数的执行时间。
热点函数:寻找占据执行时间大部分的函数,这些函数可能是需要优化的关键点。
函数之间的关系:观察函数之间的调用关系,查看是否有不必要的函数调用或循环。
I/O 操作:关注是否有大量的数据读取、写入或网络通信,这可能是性能瓶颈的来源。
根据火焰图的分析结果,您可以进一步定位和排查潜在的性能问题,并在代码、配置或资源分配方面进行优化。
请注意,为了准确分析火焰图,建议在负载较高的情况下生成火焰图,并保持足够的监视时间。此外,Flink 的火焰图功能在生产环境中可能会造成一定的开销,因此建议在测试或开发环境中使用。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/1147.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

人工智能与Chat GPT

一本书全面掌握ChatGPT,既有向ChatGPT提问的技巧, 也有构建自己的ChatGPT模型的方法,涵盖开发背景、关联技术、使用方法、应用形式、实用案例等 人工智能是我们这个时代最热门的话题,人们既希望它能代替我们做一些工作&#xff0c…

云原生——Docker容器化实战

❄️作者介绍:奇妙的大歪❄️ 🎀个人名言:但行前路,不负韶华!🎀 🐽个人简介:云计算网络运维专业人员🐽 前言 "Docker"一词指代了多个概念,包括开源…

uniapp调接口出现跨域问题。

今天在写uniapp项目的时候,使用在线模拟接口的时候,出现跨域问题。 【问题描述】: ①在内嵌浏览器运行,不会出现跨域问题,好像是内嵌浏览器自动去掉了跨域问题。 ②在外部浏览器调用的时候会出现跨域问题。&#xf…

IDEA 搭建Android 开发环境

项目实战 废话不多说开始创建先第一个 Android 项目 步骤一 FILE → New → Project 步骤二-选择 Android 项目模板 选那个安卓机器人,如果没有这个选项,需要升级IDEA版本或者安装安卓插件 选择*Basic Activity* Next-下一步 步骤三-项目初始化 名称、包名、安装位置自行调整…

Cadence PCB 仿真激励专题

🏡《总目录》   🏡《宝典目录》 目录 1,内容概述2,内容目录 1,内容概述 本专题详细介绍Cadence PCB仿真的激励源。 2,内容目录 Cadence PCB仿真 使用 Allegro PCB SI 激励信号源参数Simulation配置方法图…

3DE重客户端安装

3DE重客户端安装 一、百度网盘下载路径二、详细安装步骤 一、百度网盘下载路径 https://pan.baidu.com/s/16TltMRbrWuSe7p-Vn1x4Dw?pwdfku7 提取码:fku7 二、详细安装步骤 1、将\3deinstall\2022x_install_GA目录下的所有.tar文件全选解压 2.点击\3deinstall\…

ubuntu 20.04, 22.04网络配置比较

1.ubuntu 20.04网络配置,配置静态IP:切换roote用户,vi /etc/netplan/00-installer-config.yaml,修改网络配置,格式如下: network: ethernets: ens33: dhcp4: false addresses: [172.22.…

切换.net Framework 版本后,出现NuGet 包是使用不同于当前目标框架的目标框架安装的,可能需要重新安装

问题现象: 由于添加新的dll文件,依赖的.NET Framework版本与当前的不一致,在vs 中切换了目标框架版本后,运行程序,出现以下的warnning信息: 一些 NuGet 包是使用不同于当前目标框架的目标框架安装的&#…

《代码中的软件工程》学习总结/心得体会

《代码中的软件工程》阅读总结回顾 参考资料《代码中的软件工程》https://gitee.com/mengning997/se 终于系统性的阅读完了这本《代码中的软件工程》,受益匪浅。我本科时也上过软件工程这门课程,但是学到的东西诚然是没有在这本书中学到的多的&#xf…

APSIM模型的参数优化

随着数字农业和智慧农业的发展,基于过程的农业生产系统模型在模拟作物对气候变化的响应与适应、农田管理优化、作物品种和株型筛选、农田固碳和温室气体排放等领域扮演着越来越重要的作用。 APSIM (Agricultural Production Systems sIMulator)模型[1]是世界知名的…

C++STL:无序关联容器

文章目录 1. 无序关联容器1.1 概述1.2 无序容器种类 2. unordered_map2.1 概述2.2 成员方法2.3 创建C unordered_map容器的方法2.4 迭代器2.5 C STL unordered_map获取元素的几种方法2.6 C unordered_map insert()方法2.7 C unordered_map emplace()和emplace_hint()方法2.7.1 …

【稳定性验证】视频流mesh环境下稳定性验证

目录 正常保持上线状态 延时丢包 丢包(很稳) 延时 丢包 乱序 (也很稳) webGL lost 正常保持上线状态 延时丢包 丢包(很稳) 延时 丢包 乱序 (也很稳) webGL lost

Flink SQL之常用函数

官方函数查询地址&#xff1a;https://nightlies.apache.org/flink/flink-docs-release-1.12/dev/table/functions/systemFunctions.html 可根据使用的版本查找&#xff0c;该链接为1.12版本。 1.比较函数 <> > > < < 注意&#xff1a;select nullnull…

手写Spring框架---MVC实现

目录 预备 自研框架MVC的实现 MVC架构草图&#xff1a; 大致流程 实现思路 自定义注解 JavaBean 请求的拦截-建立DispatcherServlet 责任链处理请求 RequestProcessor矩阵 Render矩阵 预备 在DispatcherServlet&#xff1a; 解析请求路径和请求方法依赖容器&#xf…

PostgreSQL 笔记

PostgreSQL 笔记 一、简介 这里主要是记录学习 PostgreSQL 常用操作命令&#xff0c;方便今后查阅&#xff01;&#xff01;&#xff01; PostgreSQL 是一个免费的对象-关系数据库服务器(ORDBMS)&#xff0c;在灵活的BSD许可证下发行。 PostgreSQL 开发者把它念作 post-gress…

水库大坝安全监测系统是由什么组成的?

水库大坝是防洪抗灾的重要设施&#xff0c;它们的安全性直接关系到人民群众的生命财产安全。因此&#xff0c;水库大坝的安全监测必不可少。水库大坝安全监测系统是一种集成了数据采集、传输、处理和分析的技术平台&#xff0c;能够实时、准确地监测大坝的状态&#xff0c;及时…

一.《某三国》人物属性及其相关属性

人物属性 1.找一个可以操控变化的属性来找 比如血量.坐标或者五铢(绑定金币),这里我们用五铢找 五铢只要打一个怪就会加一点 2.我们直接搜变化即可搜到 五铢地址0AD64EAC 3.我们CE给地址下访问 4.这里我们最后找第一条访问 因为他是被改变的 或者你CE给地址下写入 5.然后我…

APSIM作物生长模拟模型:农田管理、土壤碳氮平衡、土壤水平衡、作物产量、物候发育光合生产、作物产量等

查看原文>>>基于R语言APSIM模型高级应用及批量模拟实践技术 目录 专题一、APSIM模型应用与R语言数据清洗 专题二、APSIM气象文件准备与R语言融合应用 专题三、APSIM模型的物候发育和光合生产模块 专题四、APSIM物质分配与产量模拟 专题五、APSIM土壤水平衡模块 …

前端浏览器缓存的好处和弊端以及如何处理弊端

浏览器缓存 好处&#xff1a; 减少冗余的数据传输&#xff0c;节省带宽。减轻服务器的请求压力&#xff0c;因为有缓存可以减少向服务器发送请求&#xff0c;资源从缓存中读取&#xff0c;加快客户端的访问速度。因为无需从服务器请求等待响应 缺点&#xff1a; 系统更新时…

使用docker-file 将springboot项目打成镜像,发布成容器服务

一 docker-file将jar包发布成容器服务 1.1 docker的安装 [rootlocalhost ~]# uname -r 3.10.0-862.el7.x86_64 [rootlocalhost ~]# yum install docker [rootlocalhost export]# systemctl start docker [rootlocalhost export]# docker -v Docker version 1.13.1, build…