0基础学习PyFlink——流批模式在主键上的对比

假如我们将《0基础学习PyFlink——使用PyFlink的Sink将结果输出到外部系统》中的模式从批处理(batch)改成流处理(stream),则其在print连接器上产生的输出是不一样。

批处理

    env_settings = EnvironmentSettings \.new_instance() \.in_batch_mode() \.with_configuration(config) \.build()
# 批处理
+I[A, 3]
+I[B, 1]
+I[C, 2]
+I[D, 2]
+I[E, 1]

流处理

    env_settings = EnvironmentSettings \.new_instance() \.in_streaming_mode() \.with_configuration(config) \.build()
# 流处理
+I[A, 1]
+I[B, 1]
+I[C, 1]
+I[D, 1]
-U[A, 1]
+U[A, 2]
+I[E, 1]
-U[C, 1]
+U[C, 2]
-U[D, 1]
+U[D, 2]
-U[A, 2]
+U[A, 3]

我们看到批处理是一次性的达成了最终计算——只插入了5条数据,且每条数据都是最终结果。
而流处理则是进行了13次操作,其中插入操作5次,删除4次,更新4次。

只有插入操作

Mysql表无主键

CREATE TABLE WordsCountTable (word varchar(255) NOT NULL,count BIGINT
);

Sink表无主键

    my_sink_ddl = """CREATE TABLE WordsCountTableSink (`word` STRING,`count` BIGINT) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://127.0.0.1:3306/words_count_db?useSSL=false','table-name' = 'WordsCountTable','driver'='com.mysql.jdbc.Driver','username'='admin','password'='pwd123');"""

Sink表有主键

    my_sink_ddl = """CREATE TABLE WordsCountTableSink (`word` STRING,`count` BIGINT,PRIMARY KEY (`word`) NOT ENFORCED) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://127.0.0.1:3306/words_count_db?useSSL=false','table-name' = 'WordsCountTable','driver'='com.mysql.jdbc.Driver','username'='admin','password'='pwd123');"""

则对于只有插入操作的Batch模式,不管Sink表有没有主键,每次程序执行时都会插入新数据。比如我们执行两次批处理模式代码,则可以看到5的2倍=10条数据。

select * from WordsCountTable;
+------+-------+
| word | count |
+------+-------+
| A    |     3 |
| B    |     1 |
| C    |     2 |
| D    |     2 |
| E    |     1 |
| A    |     3 |
| B    |     1 |
| C    |     2 |
| D    |     2 |
| E    |     1 |
+------+-------+
10 rows in set (0.00 sec)

这个很好理解。

Mysql表有主键

CREATE TABLE WordsCountTable (word varchar(255) NOT NULL,count BIGINT,PRIMARY KEY (word)
);

Sink表无主键

因为word成为主键,不可以重复。第一次执行插入操作时成功了

+------+-------+
| word | count |
+------+-------+
| A    |     3 |
| B    |     1 |
| C    |     2 |
| D    |     2 |
| E    |     1 |
+------+-------+
5 rows in set (0.00 sec)

但是第二次执行时,会因为主键冲突报错:

Caused by: java.sql.SQLIntegrityConstraintViolationException: Duplicate entry ‘E’ for key ‘WordsCountTable.PRIMARY’

Sink表有主键

因为Mysql和Sink表里主键一致,不管执行多少次程序,都不会产生多余的数据。

+------+-------+
| word | count |
+------+-------+
| A    |     3 |
| B    |     1 |
| C    |     2 |
| D    |     2 |
| E    |     1 |
+------+-------+
5 rows in set (0.00 sec)

有删除和更新操作

在流模式中我们看到,流处理处理有插入操作外,还有其他操作。我们再对比下它们的表现。

Sink表无主键

Mysql表无主键

Mysql有无主键

因为流模式删除和更新操作需要通过主键来寻找对象,所以会报如下错误

java.lang.IllegalStateException: please declare primary key for sink table when query contains update/delete record.

Sink表有主键

由于Sink表设置了主键,于是流模式产生的更新和删除操作可以通过其找到对应项,就不会报错。

Mysql表无主键

由于Mysql表没有主键,导致每次执行都会插入一批数据。比如下面是我们执行两次的结果

+------+-------+
| word | count |
+------+-------+
| E    |     1 |
| A    |     3 |
| D    |     2 |
| C    |     2 |
| B    |     1 |
| A    |     3 |
| D    |     2 |
| B    |     1 |
| C    |     2 |
| E    |     1 |
+------+-------+
10 rows in set (0.00 sec)

这从另外一个方面说明:**流模式产生的一系列操作,在Execute环节,最终会对这些操作进行合并,将合并的操作同步给外部系统。**比如之前的流操作实际产生了13个行为,而最终落到数据库里只有5条数据,且第二次操作也是插入了5条新的、最终的数据,这就说明中间的操作在同步给数据库之前已经做了合并处理。

Mysql表有主键

因为Mysql表有主键,Sink过来的操作执行的是“有则更新,无则写入”的模式。
比如我们第一次执行程序时,得到

+------+-------+
| word | count |
+------+-------+
| A    |     3 |
| B    |     1 |
| C    |     2 |
| D    |     2 |
| E    |     1 |
+------+-------+
5 rows in set (0.00 sec)

然后我们将数据源中的E改成了A,则这次将出现4个A,但是不会出现E。执行后的结果是

+------+-------+
| word | count |
+------+-------+
| A    |     4 |
| B    |     1 |
| C    |     2 |
| D    |     2 |
| E    |     1 |
+------+-------+
5 rows in set (0.00 sec)

这个实验就证明了,当Sink和Mysql表的主键一致时,执行的是insert on duplicate key update操作。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/118415.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

两个list中存放相同的对象,一个是页面导入,一个是从数据库查询,外部传入一个集合存放的是对象的属性名称,根据属性名称处理两个list

需求:两个list中存放相同的对象,一个是页面导入,一个是从数据库查询,外部传入一个集合存放的是对象的属性名称.要求根据传入的属性(多个)判断两个list中是否有重复的对象, 如果重复则删除数据库的list, 然后合并两个list. /*** 处理导入和数据库重复数据* param list* param l…

网络协议--IGMP:Internet组管理协议

13.1 引言 12.4节概述了IP多播给出,并介绍了D类IP地址到以太网地址的映射方式。也简要说明了在单个物理网络中的多播过程,但当涉及多个网络并且多播数据必须通过路由器转发时,情况会复杂得多。 本章将介绍用于支持主机和路由器进行多播的In…

Android系统启动

首语 Android系统启动与应用启动、四大组件、AMS等很多内容都有关联,因此,Android系统启动是首先需要了解的知识。 Android 系统启动流程 Android系统流程主要部分如上图所示。下面对各个流程进行解析。 Boot ROM 启动电源以及系统启动。当电源按下时…

JavaScript进阶 第三天笔记

JavaScript 进阶 - 第3天笔记 了解构造函数原型对象的语法特征,掌握 JavaScript 中面向对象编程的实现方式,基于面向对象编程思想实现 DOM 操作的封装。 了解面向对象编程的一般特征掌握基于构造函数原型对象的逻辑封装掌握基于原型对象实现的继承理解什…

LVS+keepalive高可用集群

keepalive简介 keepalive为LVS应用延伸的高可用服务。lvs的调度器无法做高可用。但keepalive不是为lvs专门集群服务的,也可以为其他的的代理服务器做高可用。 keepalive在lvs的高可用集群,主调度器和备调度器(可以有多个) 一主两备或一主一备。 VRRP: k…

英语——语法——从句——名词性从句——笔记

文章目录 名词性从句一、定义二、分类(一)宾语从句(二)主语从句(三)C同位语从句(四)D表语从句 名词性从句 一、句子成分 简而言之,构成一个句子的成分(或要素…

TCP网络通信

TCP通信的 实现发1收1 package TCP1;//完成TCP通信的 实现发1收1import java.io.DataOutputStream; import java.io.ObjectOutputStream; import java.io.OutputStream; import java.net.InetAddress; import java.net.Socket;public class Client {public static void main(S…

基于Java的医院远程预约管理系统设计与实现(源码+lw+部署文档+讲解等)

文章目录 前言具体实现截图论文参考详细视频演示为什么选择我自己的网站自己的小程序(小蔡coding) 代码参考数据库参考源码获取 前言 💗博主介绍:✌全网粉丝10W,CSDN特邀作者、博客专家、CSDN新星计划导师、全栈领域优质创作者&am…

如何将html转化为pdf

html转换为pdf html2pdf.js库, 基于html2canvas和jspdf,只能打印2-3页pdf,比较慢,分页会截断html2canvas 只能打印2-3页pdf,比较慢,分页会截断 // canvasDom-to-image 不支持某些css属性Pdfmake html-to-p…

【vue3 】 创建项目vscode 提示无法找到模块

使用命令创建 vue3 创建新应用 npm create vuelatest会看到一些可选功能的询问? √ 请输入项目名称: … vue-project √ 是否使用 TypeScript 语法? … 否 / 是 √ 是否启用 JSX 支持? … 否 / 是 √ 是否引入 Vue Router 进行单…

Remmina Linux 远程桌面(堡垒机)解决方案,含文件互传

简介 Remmina 是一款在 Linux 和其他类 Unix 系统下的自由开源、功能丰富、强大的远程桌面客户端。 对于一个Linux作为主力开发机而言,Remmina 解决痛点主要是公司堡垒机远程客户现场的计算机,公司只给开发了win系统下的远程连接程序,而没有…

【模式识别】贝叶斯决策模型理论总结

贝叶斯决策模型理论 一、引言二、贝叶斯定理三、先验概率和后验概率3.1 先验概率3.2 后验概率 四、最大后验准则五、最小错误率六、最小化风险七、最小最大决策八、贝叶斯决策建模参考 一、引言 在概率计算中,我们常常遇到这样的一类问题,某事件的发生可…

使用vscode搭建虚拟机

首先vscode插件安装 名称: Remote - SSH ID: ms-vscode-remote.remote-ssh 说明: Open any folder on a remote machine using SSH and take advantage of VS Codes full feature set. 版本: 0.51.0 VS Marketplace 链接: https://marketplace.visualstudio.com/items?it…

【C++】继承和多态

继承和多态 一、继承1. 继承概念2. 继承定义(1)继承的格式定义(2)继承父类成员访问方式的变化 3. 父类和子类对象赋值转换4. 继承中的作用域5. 子类的默认成员函数6. 继承与友元7. 继承与静态成员8. 复杂的菱形继承及菱形虚拟继承…

OpenCV #以图搜图:均值哈希算法(Average Hash Algorithm)原理与实验

1. 介绍 均值哈希算法(Average Hash Algorithm) 是哈希算法的一种,主要用来做相似图片的搜索工作。 2. 原理 均值哈希算法(aHash)首先将原图像缩小成一个固定大小的像素图像,然后将图像转换为灰度图像&am…

多级缓存入门

文章目录 什么是多级缓存JVM进程缓存环境准备安装MySQL导入Demo工程导入商品查询页面 初识Caffeine Lua语法初识Lua第一个lua程序变量和循环Lua的数据类型声明变量循环 条件控制、函数函数条件控制 多级缓存安装OpenRestyOpenResty快速入门反向代理流程OpenResty监听请求编写it…

k8s kubeadm配置

master 192.168.41.30 docker、kubeadm、kubelet、kubectl、flannel node01 192.168.41.31 docker、kubeadm、kubelet、kubectl、flannel node02 192.168.41.32 do…

ajax请求的时候get 和post方式的区别?

在 AJAX 请求中,GET 和 POST 是两种常用的请求方法,它们在发送请求时有一些区别: GET 请求: GET 请求用于向服务器请求获取指定资源,请求参数会附加在 URL 的末尾,以查询字符串的形式出现。GET 请求将请求…

一文精通C++ -- 继承

前言:继承是C类和对象三大特性中关键的一环,上承封装,下接多态,C中的继承是一种面向对象编程的概念,它允许一个类(称为子类或派生类)继承另一个类(称为父类或基类)的属性…

【Java网络编程】 三

本文主要介绍了TCP版本的回显服务器的编写。 一.TCP版本回显服务器 1.服务器 服务器的实现流程 1.接收请求并解析 2.根据请求计算出响应(业务流程) 3.把响应返回给客户端 代码: import java.io.IOException; import java.io.InputStream; i…