Flink系列之:Upsert Kafka SQL 连接器

Flink系列之:Upsert Kafka SQL 连接器

  • 一、Upsert Kafka SQL 连接器
  • 二、依赖
  • 三、完整示例
  • 四、可用元数据
  • 五、键和值格式
  • 六、主键约束
  • 七、一致性保证
  • 八、为每个分区生成相应的watermark
  • 九、数据类型映射

一、Upsert Kafka SQL 连接器

  • Scan Source: Unbounded 、
  • Sink: Streaming Upsert Mode

Upsert Kafka 连接器支持以 upsert 方式从 Kafka topic 中读取数据并将数据写入 Kafka topic。

作为 source,upsert-kafka 连接器生产 changelog 流,其中每条数据记录代表一个更新或删除事件。更准确地说,数据记录中的 value 被解释为同一 key 的最后一个 value 的 UPDATE,如果有这个 key(如果不存在相应的 key,则该更新被视为 INSERT)。用表来类比,changelog 流中的数据记录被解释为 UPSERT,也称为 INSERT/UPDATE,因为任何具有相同 key 的现有行都被覆盖。另外,value 为空的消息将会被视作为 DELETE 消息。

作为 sink,upsert-kafka 连接器可以消费 changelog 流。它会将 INSERT/UPDATE_AFTER 数据作为正常的 Kafka 消息写入,并将 DELETE 数据以 value 为空的 Kafka 消息写入(表示对应 key 的消息被删除)。Flink 将根据主键列的值对数据进行分区,从而保证主键上的消息有序,因此同一主键上的更新/删除消息将落在同一分区中。

二、依赖

	<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>3.0.2-1.18</version></dependency>

三、完整示例

下面的示例展示了如何创建和使用 Upsert Kafka 表:

CREATE TABLE pageviews_per_region (user_region STRING,pv BIGINT,uv BIGINT,PRIMARY KEY (user_region) NOT ENFORCED
) WITH ('connector' = 'upsert-kafka','topic' = 'pageviews_per_region','properties.bootstrap.servers' = '...','key.format' = 'avro','value.format' = 'avro'
);CREATE TABLE pageviews (user_id BIGINT,page_id BIGINT,viewtime TIMESTAMP,user_region STRING,WATERMARK FOR viewtime AS viewtime - INTERVAL '2' SECOND
) WITH ('connector' = 'kafka','topic' = 'pageviews','properties.bootstrap.servers' = '...','format' = 'json'
);-- 计算 pv、uv 并插入到 upsert-kafka sink
INSERT INTO pageviews_per_region
SELECTuser_region,COUNT(*),COUNT(DISTINCT user_id)
FROM pageviews
GROUP BY user_region;

确保在 DDL 中定义主键。

这段代码是用来创建两个表,一个是"pageviews_per_region",另一个是"pageviews",并定义了它们的结构和连接器。

  1. "pageviews_per_region"表包含了三个字段:user_region(用户所在地区,字符串类型)、pv(页面访问量,长整型)和uv(独立访客量,长整型)。该表的主键为user_region,但不强制执行。

  2. "pageviews"表包含了四个字段:user_id(用户ID,长整型)、page_id(页面ID,长整型)、viewtime(访问时间,时间戳类型)和user_region(用户所在地区,字符串类型)。该表还定义了一个称为"viewtime"的水位线(watermark),它指定了在两秒之前的数据不再考虑为计算pv和uv。

这两个表都使用了Kafka连接器来读写数据。'connector’属性指定了使用的连接器类型,'topic’属性指定了连接器读写的Kafka主题,'properties.bootstrap.servers’属性指定了Kafka集群的地址。

对于"pageviews_per_region"表,'key.format’和’value.format’属性指定了数据的序列化格式为Avro。

对于"pageviews"表,'format’属性指定了数据的序列化格式为JSON。

最后,使用INSERT INTO语句,在"pageviews_per_region"表中计算出每个地区的pv和uv,并将结果插入到upsert-kafka sink中。

总之,这段代码的作用是通过Kafka连接器创建两个表,并将"pageviews"表中的数据计算出每个地区的pv和uv,并插入到"pageviews_per_region"表中。

四、可用元数据

连接器参数

参数是否必选默认值数据类型描述
connector必选(none)String指定要使用的连接器,Upsert Kafka 连接器使用:‘upsert-kafka’。
topic必选(none)String用于读取和写入的 Kafka topic 名称。
properties.bootstrap.servers必选(none)String以逗号分隔的 Kafka brokers 列表。
properties.*可选(none)String该选项可以传递任意的 Kafka 参数。选项的后缀名必须匹配定义在 Kafka 参数文档中的参数名。 Flink 会自动移除 选项名中的 “properties.” 前缀,并将转换后的键名以及值传入 KafkaClient。 例如,你可以通过 ‘properties.allow.auto.create.topics’ = ‘false’ 来禁止自动创建 topic。 但是,某些选项,例如’key.deserializer’ 和 ‘value.deserializer’ 是不允许通过该方式传递参数,因为 Flink 会重写这些参数的值。
key.format必选(none)String用于对 Kafka 消息中 key 部分序列化和反序列化的格式。key 字段由 PRIMARY KEY 语法指定。支持的格式包括 ‘csv’、‘json’、‘avro’
key.fields-prefix可选(none)String为键格式的所有字段定义自定义前缀,以避免与值格式的字段发生名称冲突。默认情况下,前缀为空。如果定义了自定义前缀,则表架构和“key.fields”都将使用前缀名称。构造密钥格式的数据类型时,将删除前缀,并在密钥格式中使用无前缀的名称。请注意,此选项要求“value.fields-include”必须设置为“EXCEPT_KEY”。
value.format必选(none)String用于对 Kafka 消息中 value 部分序列化和反序列化的格式。支持的格式包括 ‘csv’、‘json’、‘avro’。
value.fields-include必选‘ALL’String控制哪些字段应该出现在 value 中。可取值:ALL:消息的 value 部分将包含 schema 中所有的字段,包括定义为主键的字段。EXCEPT_KEY:记录的 value 部分包含 schema 的所有字段,定义为主键的字段除外。
sink.parallelism可选(none)Integer定义 upsert-kafka sink 算子的并行度。默认情况下,由框架确定并行度,与上游链接算子的并行度保持一致。
sink.buffer-flush.max-rows可选0Integer缓存刷新前,最多能缓存多少条记录。当 sink 收到很多同 key 上的更新时,缓存将保留同 key 的最后一条记录,因此 sink 缓存能帮助减少发往 Kafka topic 的数据量,以及避免发送潜在的 tombstone 消息。 可以通过设置为 ‘0’ 来禁用它。默认,该选项是未开启的。注意,如果要开启 sink 缓存,需要同时设置 ‘sink.buffer-flush.max-rows’ 和 ‘sink.buffer-flush.interval’ 两个选项为大于零的值。
sink.buffer-flush.interval可选0Duration缓存刷新的间隔时间,超过该时间后异步线程将刷新缓存数据。当 sink 收到很多同 key 上的更新时,缓存将保留同 key 的最后一条记录,因此 sink 缓存能帮助减少发往 Kafka topic 的数据量,以及避免发送潜在的 tombstone 消息。 可以通过设置为 ‘0’ 来禁用它。默认,该选项是未开启的。注意,如果要开启 sink 缓存,需要同时设置 ‘sink.buffer-flush.max-rows’ 和 ‘sink.buffer-flush.interval’ 两个选项为大于零的值。

五、键和值格式

此连接器需要键和值格式,其中键字段源自 PRIMARY KEY 约束。

以下示例显示如何指定和配置键和值格式。格式选项以“键”或“值”加上格式标识符作为前缀。

CREATE TABLE KafkaTable (`ts` TIMESTAMP(3) METADATA FROM 'timestamp',`user_id` BIGINT,`item_id` BIGINT,`behavior` STRING,PRIMARY KEY (`user_id`) NOT ENFORCED
) WITH ('connector' = 'upsert-kafka',...'key.format' = 'json','key.json.ignore-parse-errors' = 'true','value.format' = 'json','value.json.fail-on-missing-field' = 'false','value.fields-include' = 'EXCEPT_KEY'
)

六、主键约束

Upsert Kafka 始终以 upsert 方式工作,并且需要在 DDL 中定义主键。在具有相同主键值的消息按序存储在同一个分区的前提下,在 changelog source 定义主键意味着 在物化后的 changelog 上主键具有唯一性。定义的主键将决定哪些字段出现在 Kafka 消息的 key 中。

七、一致性保证

默认情况下,如果启用 checkpoint,Upsert Kafka sink 会保证至少一次将数据插入 Kafka topic。

这意味着,Flink 可以将具有相同 key 的重复记录写入 Kafka topic。但由于该连接器以 upsert 的模式工作,该连接器作为 source 读入时,可以确保具有相同主键值下仅最后一条消息会生效。因此,upsert-kafka 连接器可以像 HBase sink 一样实现幂等写入。

八、为每个分区生成相应的watermark

Flink 支持根据 Upsert Kafka 的 每个分区的数据特性发送相应的 watermark。当使用这个特性的时候,watermark 是在 Kafka consumer 内部生成的。 合并每个分区 生成的 watermark 的方式和 stream shuffle 的方式是一致的。 数据源产生的 watermark 是取决于该 consumer 负责的所有分区中当前最小的 watermark。如果该 consumer 负责的部分分区是 idle 的,那么整体的 watermark 并不会前进。在这种情况下,可以通过设置合适的 table.exec.source.idle-timeout 来缓解这个问题。

九、数据类型映射

Upsert Kafka 用字节存储消息的 key 和 value,因此没有 schema 或数据类型。消息按格式进行序列化和反序列化,例如:csv、json、avro。因此数据类型映射表由指定的格式确定。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/234467.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

2023年中国数据智能管理峰会(DAMS上海站2023)-核心PPT资料下载

一、峰会简介 数据已经成为企业的核心竞争力&#xff01;谁掌控数据、更好的利用数据、实现资产化&#xff0c;谁就会真正率先进入大数据时代。 1、数据智能管理趋势和挑战 在峰会上&#xff0c;与会者讨论了数据智能管理的最新趋势和挑战。随着数据量的不断增加&#xff0c…

JNI逆向

IDA&#xff1a;JNI类型转换 1.IDA高版本&#xff08;IDA 高版本内置了定义的JNI结构体; 如果没有的话&#xff0c;在Views->Open subviews -> Type Libraries 中添加Android ARM的lib即可&#xff09; 解决方法: 只需要对JNIEnv 指针&#xff08;JNIEnv * &#xff09…

【影像组学入门百问】#32—#34

#32-影像组学研究过程中&#xff0c;图像重采样参 数怎么选择&#xff1f; 在影像组学研究过程中&#xff0c;选择合适的图像重采样参数对于保证分析质量和准确性至关重要。以下是在选择图像重采样参数时需要考虑的一些建议&#xff1a; 1.目标分辨率&#xff1a;首先&#…

力扣 | 347. 前 K 个高频元素

leetcode 347 号算法题&#xff1a;前 K 个高频元素 给你一个整数数组 nums 和一个整数 k &#xff0c;请你返回其中出现频率前 k 高的元素。 你可以按 任意顺序 返回答案。输入: nums [1,1,1,2,2,3], k 2 输出: [2, 1]输入: nums [1], k 1 输出: [1]1 < nums.length &…

使用postman时,报错SSL Error: Unable to verify the first certificate

开发中使用postman调用接口&#xff0c;出现以下问题&#xff0c;在确认路径、参数、请求方式均为正确的情况下 解决方法 File - Settings -> SSL certification verification 关闭 找到图中配置&#xff0c;这里默认是打开状态&#xff0c;把它关闭即可&#xff1a;ON …

虾皮测评选品:如何在虾皮平台上进行有效的产品测评和选品

在如今的电商市场中&#xff0c;虾皮&#xff08;Shopee&#xff09;平台已经成为了卖家们最为重要的销售渠道之一。而在虾皮平台上进行产品测评和选品对于卖家来说至关重要&#xff0c;它直接影响到店铺的销售额和利润。本文将为您提供一些关于如何在虾皮平台上进行有效的产品…

如何通过ETLCloud的API对接功能实现各种SaaS平台数据对接

前言 当前使用SaaS系统的企业越来越多&#xff0c;当我们需要对SaaS系统中产生的数据进行分析和对接时就需要与SaaS系统提供的API进行对接&#xff0c;因为SaaS一般是不会提供数据库表给企业&#xff0c;这时就应该使用ETL&#xff08;Extract, Transform, Load&#xff09;的…

Tenda账号密码泄露漏洞复现 [附POC]

文章目录 Tenda账号密码泄露漏洞复现 [附POC]0x01 前言0x02 漏洞描述0x03 影响版本0x04 漏洞环境0x05 漏洞复现1.访问漏洞环境2.构造POC3.复现Tenda账号密码泄露漏洞复现 [附POC] 0x01 前言 免责声明:请勿利用文章内的相关技术从事非法测试,由于传播、利用此文所提供的信息…

复杂 SQL 实现分组分情况分页查询

其他系列文章导航 Java基础合集数据结构与算法合集 设计模式合集 多线程合集 分布式合集 ES合集 文章目录 其他系列文章导航 文章目录 前言 一、根据 camp_status 字段分为 6 种情况 1.1 SQL语句 1.2 SQL解释 二、分页 SQL 实现 2.1 SQL语句 2.2 根据 camp_type 区分返…

Unity中Shader测试常用的UGUI功能简介

文章目录 前言一、锚点1、锚点快捷修改位置2、使用Anchor Presets快捷修改3、Anchor Presets界面按下 Shift 可以快捷修改锚点和中心点位置4、Anchor Presets界面按下 Alt 可以快捷修改锚点位置、UI对象位置 和 长宽大小 二、Canvas画布1、UGUI中 Transform 变成了 Rect Transf…

openssl数据压缩

介绍 数据压缩是将原有数据通过某种压缩算法计算得到相对数据量小的过程。这种过程是可逆的&#xff0c;即能通过压缩后的数据恢复出原数据。数据压缩能够节省存储空间&#xff0c;减轻网络负载。 在即需要加密又需要压缩的情况下&#xff0c;必须先压缩再加密&#xff0c;次…

Linux 操作系统(查看文件内容)

cat 格式&#xff1a;cat [选项]...[文件]... 说明&#xff1a;把多个文件连接后输出到标准输出&#xff08;屏幕&#xff09;或者加”> 文件名” 输出到另一个文件中 常用选项&#xff1a; -b或—number-noblank: 从1开始对所有非空输出行进行编号 -n或—number: 从1开始所…

网络协议小记

一、TCP/IP协议 作为一个小萌新&#xff0c;当然我无法将tcp/ip协议的大部分江山和盘托出&#xff0c;但是其中很多面试可能问到的知识&#xff0c;我觉得有必要总结一下&#xff01; 首先&#xff0c;在学习tcp/ip协议之前&#xff0c;我们必须搞明白什么是tcp/ip协议。 1、…

6. 3 lambda表达式

6.3 lambda表达式 指定时间间隔完成工作&#xff0c;该工作放在了ActionListener中的action performed 方法中&#xff1b;work类继承上述类&#xff0c;构造work类实例 定制比较器完成排序&#xff0c;定义一个长度比较类&#xff0c;再传给sort&#xff1b;实现基于长度排…

Web地图开发,在vue3中引入高德地图API

在vue3中引入高德地图API要实现的功能 设置地图的显示样式实现点击地图添加标记、点击地图获取详细地址和经纬度输入框搜索获取相关地区提示&#xff08;下拉框&#xff0c;选中后进行标记&#xff0c;视角移动到相对位置&#xff09;输入框输入内容&#xff0c;回车获取详细地…

架构设计系列之前端架构和后端架构的区别和联系

前端架构和后端架构都是软件系统中最关键的架构层&#xff0c;负责处理不同方面的任务和逻辑&#xff0c;两者之间是存在一些区别和联系的&#xff0c;我会从以下几个方面来阐述&#xff1a; 一、定位和职责 前端架构 主要关注用户界面和用户体验&#xff0c;负责处理用户与…

打造中国人自己的GPTs,百度灵境矩阵升级为智能体平台

12月18日&#xff0c;百度「灵境矩阵」平台全新升级为「文心大模型智能体平台」。灵境矩阵基于文心大模型&#xff0c;为开发者提供多样化的开发方式&#xff0c;支持广大开发者根据自身行业领域、应用场景&#xff0c;选取多样化的开发方式&#xff0c;打造大模型时代的原生应…

关于我对归纳偏置(inductive bias)的概念和应用的详细总结

归纳偏置&#xff08;inductive bias&#xff09; 1.归纳偏置&#xff08;inductive bias&#xff09;的概念2.归纳偏置&#xff08;inductive bias&#xff09;的应用 1.归纳偏置&#xff08;inductive bias&#xff09;的概念 归纳偏置&#xff08;inductive bias&#xff0…

贪吃蛇(二)绘制地图

绘制地图主要是考察基础的循环和分支控制&#xff0c;视频没看完&#xff0c;自己写了一下。 绘制一个基础地图 #include"curses.h" void cursesinit() {initscr();keypad(stdscr,1); }void mapinit() {int row;int col;for(row 0;row < 20;row){if(row 0 || …

如何在 FastAPI 中设置定时任务:完全指南

Web 应用程序开发中&#xff0c;及时高效处理常规任务至关重要&#xff0c;包括定时收集数据或管理任务计划。针对强大且性能卓越的 FastAPI 框架&#xff0c;我们可以通过几种策略来管理这些必要的定时任务。 实现 FastAPI 中的定时任务 本指南将探讨在 FastAPI 环境中管理定…