PiflowX组件-ReadFromUpsertKafka

ReadFromUpsertKafka组件

组件说明

upsert方式从Kafka topic中读取数据。

计算引擎

flink

有界性

Unbounded

组件分组

kafka

端口

Inport:默认端口

outport:默认端口

组件属性

名称展示名称默认值允许值是否必填描述例子
kafka_hostKAFKA_HOST“”逗号分隔的Kafka broker列表。127.0.0.1:9092
topicTOPIC“”用于写入Kafka topic名称。topic-1
tableDefinitionTableDefinition“”Flink table定义。
key_formatkeyFormat“”Set(“json”, “csv”, “avro”)用于对Kafka消息中key部分反序列化的格式。key字段由PRIMARY KEY语法指定。json
value_formatValueFormat“”Set(“json”, “csv”, “avro”)用于对Kafka消息中value部分反序列化的格式json
value_fields_includeValueFieldsIncludeALLSet(“ALL”, “EXCEPT_KEY”)控制哪些字段应该出现在 value 中。可取值:"ALL:消息的 value 部分将包含 schema 中所有的字段包括定义为主键的字段。"EXCEPT_KEY:记录的 value 部分包含 schema 的所有字段,定义为主键的字段除外。ALL
key_fields_prefixKeyFieldsPrefix“”为所有消息键(Key)格式字段指定自定义前缀,以避免与消息体(Value)格式字段重名。默认情况下前缀为空。 如果定义了前缀,表结构和配置项 ‘key.fields’ 都需要使用带前缀的名称。当构建消息键格式字段时,前缀会被移除, 消息键格式将会使用无前缀的名称。请注意该配置项要求必须将 ‘value.fields-include’ 配置为 ‘EXCEPT_KEY’。
propertiesPROPERTIES“”该选项可以传递任意的 Kafka 参数。选项的后缀名必须匹配定义在 Kafka 参数文档中的参数名。 Flink 会自动移除 选项名中的 “properties.” 前缀,并将转换后的键名以及值传入 KafkaClient。 例如,你可以通过 ‘properties.allow.auto.create.topics’ = ‘false’ 来禁止自动创建 topic。 但是,某些选项,例如’key.deserializer’ 和 ‘value.deserializer’ 是不允许通过该方式传递参数,因为 Flink 会重写这些参数的值。

ReadFromUpsertKafka示例配置

演示实时统计网页pv和uv的总量。

{"flow": {"name": "ReadFromUpsertKafkaTest","uuid": "1234","stops": [{"uuid": "5555","name": "ReadFromUpsertKafka1","bundle": "cn.piflow.bundle.flink.kafka.ReadFromUpsertKafka","properties": {"kafka_host": "hadoop01:9092","topic": "result_total_pv_uv_min","key_format": "json","value_format": "json","value_fields_include": "ALL","tableDefinition": "{\"ifNotExists\":true,\"physicalColumnDefinition\":[{\"columnName\":\"do_date\",\"columnType\":\"STRING\",\"nullable\":false,\"primaryKey\":true,\"partitionKey\":false,\"comment\":\"统计日期\"},{\"columnName\":\"do_min\",\"columnType\":\"STRING\",\"nullable\":false,\"primaryKey\":true,\"partitionKey\":false,\"comment\":\"统计分钟\"},{\"columnName\":\"pv\",\"columnType\":\"BIGINT\",\"nullable\":false,\"primaryKey\":false,\"partitionKey\":false,\"comment\":\"点击量\"},{\"columnName\":\"uv\",\"columnType\":\"BIGINT\",\"nullable\":false,\"primaryKey\":false,\"partitionKey\":false,\"comment\":\"一天内同个访客多次访问仅计算一个UV\"},{\"columnName\":\"currenttime\",\"columnType\":\"TIMESTAMP\",\"nullable\":false,\"primaryKey\":false,\"partitionKey\":false,\"comment\":\"当前时间\"}],\"metadataColumnDefinition\":null,\"computedColumnDefinition\":null}","properties": "{\"value.json.fail-on-missing-field\": false,\"properties.group.id\": \"test\"}"}},{"uuid": "6666","name": "ShowChangeLogData1","bundle": "cn.piflow.bundle.flink.common.ShowChangeLogData","properties": {"showNumber": "5000"}}],"paths": [{"from": "ReadFromUpsertKafka1","outport": "","inport": "","to": "ShowChangeLogData1"}]}
}
示例说明
  1. 通过k.kafka.ReadFromUps从kafka的result_total_pv_uv_min topic中读取数据(使用WriteToUpsertKafka组件写入到result_total_pv_uv_min中的数据);

  2. 通过ShowChangeLogData组件将数据输出到控制台。

tableDefinition属性结构
{"ifNotExists": true,"physicalColumnDefinition": [{"columnName": "do_date","columnType": "STRING","nullable": false,"primaryKey": true,"partitionKey": false,"comment": "统计日期"}, {"columnName": "do_min","columnType": "STRING","nullable": false,"primaryKey": true,"partitionKey": false,"comment": "统计分钟"}, {"columnName": "pv","columnType": "BIGINT","nullable": false,"primaryKey": false,"partitionKey": false,"comment": "点击量"}, {"columnName": "uv","columnType": "BIGINT","nullable": false,"primaryKey": false,"partitionKey": false,"comment": "一天内同个访客多次访问仅计算一个UV"}, {"columnName": "currenttime","columnType": "TIMESTAMP","nullable": false,"primaryKey": false,"partitionKey": false,"comment": "当前时间"}],"metadataColumnDefinition": null,"computedColumnDefinition": null
}

演示DEMO

在这里插入图片描述
欢迎关注PiflowX公众号,谢谢支持!!!

在这里插入图片描述

演示案例参考

实时数仓|以upsert的方式读写Kafka数据—Flink1.12为例_upsert-connect 时间周期-CSDN博客

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/604048.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

AI看图识熊实战(一)

使用ONNX Runtime封装onnx模型并推理 进行这一步之前,请确保已正确安装配置了Visual Studio 2017 和 C#开发环境。 项目的代码也可以在这里找到,下面的步骤是带着大家从头到尾做一遍。 界面设计 创建Windows窗体应用(.NET Framework)项目&#xff0c…

c# 学习笔记 - 枚举

文章目录 1. 枚举1.1 枚举结构梳理1.2 枚举完整代码1.3 枚举知识点补充 2. 迭代两种命名空间接口3. yield语句 1. 枚举 1.1 枚举结构梳理 结构图   上图内容可能依旧不通俗易懂,这里使用最简明的话语告诉大家实现方式. foreach语句就是集合的遍历操作&#xff0c…

221.【2023年华为OD机试真题(C卷)】字符串变换最小字符串(贪心策略-JavaPythonC++JS实现)

🚀点击这里可直接跳转到本专栏,可查阅顶置最新的华为OD机试宝典~ 本专栏所有题目均包含优质解题思路,高质量解题代码(Java&Python&C++&JS分别实现),详细代码讲解,助你深入学习,深度掌握! 文章目录 一. 题目二.解题思路三.题解代码Python题解代码JAVA题解…

2024.1.3力扣每日一题——从链表中移除节点

2024.1.3 题目来源我的题解方法一 递归方法二 栈方法三 反转链表方法四 单调栈头插法 题目来源 力扣每日一题;题序:2487 我的题解 方法一 递归 当前节点对其右侧节点是否删除无影响,因此可以对其右侧节点进行递归移除。 若当前节点为空&am…

快速掌握Postman实现接口测试

快速掌握Postman实现接口测试 Postman简介 Postman是谷歌开发的一款网页调试和接口测试工具,能够发送任何类型的http请求,支持GET/PUT/POST/DELETE等方法。Postman非常简单易用,可以直接填写URL,header,body等就可以发…

Unity2D学习笔记 | 《勇士传说》教程 | (六)

目录 (一)存档点对象制作 (二)保存数据与加载数据 (三)存储数值与场景 (四)游戏结束画面制作 (五)序列化数据保存 (一)存档点对象…

Java多线程技术11——ThreadPoolExecutor类的使用2

1 isShutdown()方法 public boolean isShutdown()方法的作用是判断线程池是否已经关闭 public class Run1 {public static void main(String[] args) {Runnable runnable new Runnable() {Overridepublic void run() {try {System.out.println("开始: " Thread.c…

软件安装文档 | MinIO

# docker 下载镜像 docker pull minio/minio# 安装镜像docker run \ --name minio \ -p 19000:9000 \ -p 19090:9090 \ -d --restartalways \ -e "MINIO_ROOT_USERsuweijie" \ -e "MINIO_ROOT_PASSWORDSuweijie0217" \ -v /home/data:/data \ -v /home/c…

嵌入式培训机构四个月实训课程笔记(完整版)-Linux系统编程第三天-Linux进程练习题(物联技术666)

更多配套资料CSDN地址:点赞+关注,功德无量。更多配套资料,欢迎私信。 物联技术666_嵌入式C语言开发,嵌入式硬件,嵌入式培训笔记-CSDN博客物联技术666擅长嵌入式C语言开发,嵌入式硬件,嵌入式培训笔记,等方面的知识,物联技术666关注机器学习,arm开发,物联网,嵌入式硬件,单片机…

优化|PLSA理论与实践

PLSA又称为概率潜在语义分析,是一种利用概率生成模型对文本集合进行话题分析的无监督学习方法。该模型最大的特点是加入了主题这一隐变量,文本生成主题,主题生成单词,从而得到单词-文本共现矩阵。本文将对包含物理学、计算机科学、…

学习笔记——C++中的循环结构 while语句

while循环语句 作用:满足循环条件,执行循环语句 语法:while(循环条件){循环语句} 解释:只要循环条件的结果为真,就执行循环语句 以打印0-9这十个数字为例,特别需要注意的是&…

Anaconda安装失败及解决办法

查看Anaconda版本时正常显示 报错显示: 解决办法: 确保系统要求满足:检查你的操作系统是否满足 Anaconda 的最低要求。例如,确保你使用的是 64 位操作系统,同时具备足够的磁盘空间。禁用防病毒软件:某些防…

单片机原理及应用:独立式键盘控制LED与多功能按键识别

今天来介绍另一个外设——按键与LED的配合工作,与开关不同,按键需要注意消除抖动带来的影响,代码逻辑也会更复杂一写,下面先为大家介绍独立式键盘的相关知识。 单片机的独立式键盘指的是一种不依赖于计算机或其他外部设备的键盘输…

【STM32学习】硬件CRC与传统CRC-32计算的不同点

硬件CRC与传统CRC-32计算的不同点 1、stm32的硬件CRC32与传统CRC-32有何不同?2、解决办法 1、stm32的硬件CRC32与传统CRC-32有何不同? ①STM32F103的硬件CRC校验是对整个32位字进行CRC计算,传统的CRC-32是逐字节的计算。 ②STM32的硬件CRC32的…

基于SSM的校内信息服务发布系统的设计与实现

末尾获取源码 开发语言:Java Java开发工具:JDK1.8 后端框架:SSM 前端:Vue 数据库:MySQL5.7和Navicat管理工具结合 服务器:Tomcat8.5 开发软件:IDEA / Eclipse 是否Maven项目:是 目录…

Redis高并发高可用(集群)

Redis Cluster是Redis的分布式解决方案,在3.0版本正式推出,有效地解决了Redis分布式方面的需求。当遇到单机内存、并发、流量等瓶颈时,可以采用Cluster架构方案达到负载均衡的目的。之前,Redis分布式方案一般有两种: 1、客户端分区方案,优点是分区逻辑可控,缺点是需要自己…

初识动态内存管理

前言: 我们都知道,内存分为几个区——栈区、堆区、静态区、常量区、代码区,我们在写代码的时候经常会遇到栈溢出这个问题,是因为在程序运行之前,我们无法准确的知道要分配多少空间给程序,所以说很容易造…

CentOS使用docker安装mysql并使用navicat 远程链接

这篇文章没用开启mysql的挂载功能,如果想开启的话可以和我的下篇文章结合着看。 CentOS中开启mysql挂载-CSDN博客 docker在之前的文章中已经安装完成了 这里输入命令查询已被上传的MySQL镜像 docker search mysql这里stars代表点赞数,official代表官…

KNN 回归

K 近邻回归(K-Nearest Neighbors Regression)是一种基于实例的回归算法,用于预测连续数值型的输出变量。它的基本思想是通过找到与给定测试样本最近的 K 个训练样本,并使用它们的输出值来预测测试样本的输出。它与 K 最近邻分类类…

计算机毕业设计----SSM场地预订管理系统

项目介绍 本项目分为前后台,前台为普通用户登录,后台为管理员登录; 用户角色包含以下功能: 按分类查看场地,用户登录,查看网站公告,按分类查看器材,查看商品详情,加入购物车,提交订单,查看订单,修改个人信息等功能。 管理员角…