PiflowX组件-WriteToUpsertKafka

WriteToUpsertKafka组件

组件说明

以upsert方式往Kafka topic中写数据。

计算引擎

flink

有界性

Streaming Upsert Mode

组件分组

kafka

端口

Inport:默认端口

outport:默认端口

组件属性

名称展示名称默认值允许值是否必填描述例子
kafka_hostKAFKA_HOST“”逗号分隔的Kafka broker列表。127.0.0.1:9092
topicTOPIC“”用于写入Kafka topic名称。topic-1
tableDefinitionTableDefinition“”Flink table定义。
key_formatkeyFormat“”Set(“json”, “csv”, “avro”)用于对Kafka消息中key部分序列化的格式。key字段由PRIMARY KEY语法指定。json
value_formatValueFormat“”Set(“json”, “csv”, “avro”)用于对Kafka消息中value部分序列化的格式json
value_fields_includeValueFieldsIncludeALLSet(“ALL”, “EXCEPT_KEY”)控制哪些字段应该出现在 value 中。可取值:
"ALL:消息的 value 部分将包含 schema 中所有的字段包括定义为主键的字段。
"EXCEPT_KEY:记录的 value 部分包含 schema 的所有字段,定义为主键的字段除外。
ALL
key_fields_prefixKeyFieldsPrefix“”为所有消息键(Key)格式字段指定自定义前缀,以避免与消息体(Value)格式字段重名。默认情况下前缀为空。 如果定义了前缀,表结构和配置项 ‘key.fields’ 都需要使用带前缀的名称。当构建消息键格式字段时,前缀会被移除, 消息键格式将会使用无前缀的名称。请注意该配置项要求必须将 ‘value.fields-include’ 配置为 ‘EXCEPT_KEY’。
sink_parallelismSinkParallelism“”定义upsert-kafka sink算子的并行度。默认情况下,由框架确定并行度,与上游链接算子的并行度保持一致。
sink_buffer_flush_max_rowsSinkBufferFlushMaxRows“”缓存刷新前,最多能缓存多少条记录。当sink收到很多同key上的更新时,缓存将保留同key的最后一条记录,因此sink缓存能帮助减少发往Kafka topic的数据量,以及避免发送潜在的tombstone消息。 可以通过设置为 ‘0’ 来禁用它默认,该选项是未开启的。注意,如果要开启sink缓存,需要同时设置 ‘sink.buffer-flush.max-rows’ 和 'sink.buffer-flush.interval两个选项为大于零的值。
sink_buffer_flush_intervalSinkBufferFlushInterval“”该选项可以传递任意的 Kafka 参数。选项的后缀名必须匹配定义在 Kafka 参数文档中的参数名。 Flink 会自动移除 选项名中的 “properties.” 前缀,并将转换后的键名以及值传入 KafkaClient。 例如,你可以通过 ‘properties.allow.auto.create.topics’ = ‘false’ 来禁止自动创建 topic。 但是,某些选项,例如’key.deserializer’ 和 ‘value.deserializer’ 是不允许通过该方式传递参数,因为 Flink 会重写这些参数的值。
propertiesPROPERTIES“”Kafka source连接器其他配置

WriteToUpsertKafka示例配置

演示实时统计网页pv和uv的总量。

{"flow": {"name": "UpsertKafkaTest","uuid": "1234","stops": [{"uuid": "0000","name": "JsonStringParser1","bundle": "cn.piflow.bundle.flink.json.JsonStringParser","properties": {"content": "[{\"user_id\":\"1\",\"client_ip\":\"192.168.12.1\",\"client_info\":\"phone\",\"page_code\":\"1001\",\"access_time\":\"2021-01-08 11:32:24\",\"dt\":\"2021-01-08\"},{\"user_id\":\"1\",\"client_ip\":\"192.168.12.1\",\"client_info\":\"phone\",\"page_code\":\"1201\",\"access_time\":\"2021-01-08 11:32:55\",\"dt\":\"2021-01-08\"},{\"user_id\":\"2\",\"client_ip\":\"192.165.12.1\",\"client_info\":\"pc\",\"page_code\":\"1031\",\"access_time\":\"2021-01-08 11:32:59\",\"dt\":\"2021-01-08\"},{\"user_id\":\"1\",\"client_ip\":\"192.168.12.1\",\"client_info\":\"phone\",\"page_code\":\"1101\",\"access_time\":\"2021-01-08 11:33:24\",\"dt\":\"2021-01-08\"},{\"user_id\":\"3\",\"client_ip\":\"192.168.10.3\",\"client_info\":\"pc\",\"page_code\":\"1001\",\"access_time\":\"2021-01-08 11:33:30\",\"dt\":\"2021-01-08\"},{\"user_id\":\"1\",\"client_ip\":\"192.168.12.1\",\"client_info\":\"phone\",\"page_code\":\"1001\",\"access_time\":\"2021-01-08 11:34:24\",\"dt\":\"2021-01-08\"}]","schema": "user_id:STRING,client_ip:STRING,client_info:STRING,page_code:STRING,access_time:TIMESTAMP,dt:STRING"}},{"uuid": "1111","name": "WriteToKafka1","bundle": "cn.piflow.bundle.flink.kafka.WriteToKafka","properties": {"kafka_host": "hadoop01:9092","topic": "user_ip_pv","tableDefinition": "{\"catalogName\":null,\"dbname\":null,\"tableName\":null,\"ifNotExists\":true,\"physicalColumnDefinition\":[{\"columnName\":\"user_id\",\"columnType\":\"STRING\",\"comment\":\"用户ID\"},{\"columnName\":\"client_ip\",\"columnType\":\"STRING\",\"comment\":\"客户端IP\"},{\"columnName\":\"client_info\",\"columnType\":\"STRING\",\"comment\":\"设备机型信息\"},{\"columnName\":\"page_code\",\"columnType\":\"STRING\",\"comment\":\"页面代码\"},{\"columnName\":\"access_time\",\"columnType\":\"TIMESTAMP\",\"comment\":\"请求时间\"},{\"columnName\":\"dt\",\"columnType\":\"STRING\",\"comment\":\"时间分区天\"}],\"metadataColumnDefinition\":null,\"computedColumnDefinition\":null,\"watermarkDefinition\":null}","format": "json","properties": "{\"json.ignore-parse-errors\":\"true\"}"}},{"uuid": "2222","name": "ReadFromKafka1","bundle": "cn.piflow.bundle.flink.kafka.ReadFromKafka","properties": {"kafka_host": "hadoop01:9092","topic": "user_ip_pv","group": "test","startup_mode": "earliest-offset","tableDefinition": "{\"catalogName\":null,\"dbname\":null,\"tableName\":\"source_ods_fact_user_ip_pv\",\"ifNotExists\":true,\"physicalColumnDefinition\":[{\"columnName\":\"user_id\",\"columnType\":\"STRING\",\"comment\":\"用户ID\"},{\"columnName\":\"client_ip\",\"columnType\":\"STRING\",\"comment\":\"客户端IP\"},{\"columnName\":\"client_info\",\"columnType\":\"STRING\",\"comment\":\"设备机型信息\"},{\"columnName\":\"page_code\",\"columnType\":\"STRING\",\"comment\":\"页面代码\"},{\"columnName\":\"access_time\",\"columnType\":\"TIMESTAMP\",\"comment\":\"请求时间\"},{\"columnName\":\"dt\",\"columnType\":\"STRING\",\"comment\":\"时间分区天\"}],\"metadataColumnDefinition\":null,\"computedColumnDefinition\":null,\"watermarkDefinition\":null}","format": "json","properties": "{}"}},{"uuid": "3333","name": "SQLExecute1","bundle": "cn.piflow.bundle.flink.common.SQLExecute","properties": {"sql": "CREATE VIEW view_total_pv_uv_min AS SELECT dt AS do_date, count(client_ip) AS pv, count(DISTINCT client_ip) AS uv,max(access_time) AS access_time FROM source_ods_fact_user_ip_pv GROUP BY dt;"}},{"uuid": "4444","name": "WriteToUpsertKafka1","bundle": "cn.piflow.bundle.flink.kafka.WriteToUpsertKafka","properties": {"kafka_host": "hadoop01:9092","topic": "result_total_pv_uv_min","key_format": "json","value_format": "json","value_fields_include": "ALL","tableDefinition": "{\"catalogName\":null,\"dbname\":null,\"tableName\":\"result_total_pv_uv_min\",\"ifNotExists\":true,\"physicalColumnDefinition\":[{\"columnName\":\"do_date\",\"columnType\":\"STRING\",\"nullable\":false,\"primaryKey\":true,\"partitionKey\":false,\"comment\":\"统计日期\"},{\"columnName\":\"do_min\",\"columnType\":\"STRING\",\"nullable\":false,\"primaryKey\":true,\"partitionKey\":false,\"comment\":\"统计分钟\"},{\"columnName\":\"pv\",\"columnType\":\"BIGINT\",\"nullable\":false,\"primaryKey\":false,\"partitionKey\":false,\"comment\":\"点击量\"},{\"columnName\":\"uv\",\"columnType\":\"BIGINT\",\"nullable\":false,\"primaryKey\":false,\"partitionKey\":false,\"comment\":\"一天内同个访客多次访问仅计算一个UV\"},{\"columnName\":\"currenttime\",\"columnType\":\"TIMESTAMP\",\"nullable\":false,\"primaryKey\":false,\"partitionKey\":false,\"comment\":\"当前时间\"}],\"metadataColumnDefinition\":null,\"computedColumnDefinition\":null,\"watermarkDefinition\":null,\"asSelectStatement\":\"SELECT  do_date,cast(DATE_FORMAT(access_time,'HH:mm') AS STRING) AS do_min,pv,uv,NOW() AS currenttime from view_total_pv_uv_min\"}","properties": "{\"value.json.fail-on-missing-field\": false}"}}],"paths": [{"from": "JsonStringParser1","outport": "","inport": "","to": "WriteToKafka1"},{"from": "WriteToKafka1","outport": "","inport": "","to": "ReadFromKafka1"},{"from": "ReadFromKafka1","outport": "","inport": "","to": "SQLExecute1"},{"from": "SQLExecute1","outport": "","inport": "","to": "WriteToUpsertKafka1"}]}
}
示例说明
  1. 通过JsonStringParser将给定的json字符串解析,并输出到下游,通过WriteToKafka组件将数据写入到kafka的user_ip_pv topic中;

  2. 通过ReadFromKafka组件从user_ip_pv topic中读取数据;

  3. 使用SQLExecute组件执行创建视图view_total_pv_uv_min的语句;

  4. 使用WriteToUpsertKafka定义upsert kafka table,并使用tableDefinition属性中定义的asSelectStatement执行语句,将结果写入kafka。

tableDefinition属性结构
{"catalogName": null,"dbname": null,"tableName": "result_total_pv_uv_min","ifNotExists": true,"physicalColumnDefinition": [{"columnName": "do_date","columnType": "STRING","nullable": false,"primaryKey": true,"partitionKey": false,"comment": "统计日期"},{"columnName": "do_min","columnType": "STRING","nullable": false,"primaryKey": true,"partitionKey": false,"comment": "统计分钟"},{"columnName": "pv","columnType": "BIGINT","nullable": false,"primaryKey": false,"partitionKey": false,"comment": "点击量"},{"columnName": "uv","columnType": "BIGINT","nullable": false,"primaryKey": false,"partitionKey": false,"comment": "一天内同个访客多次访问仅计算一个UV"},{"columnName": "currenttime","columnType": "TIMESTAMP","nullable": false,"primaryKey": false,"partitionKey": false,"comment": "当前时间"}],"metadataColumnDefinition": null,"computedColumnDefinition": null,"watermarkDefinition": null,"asSelectStatement": "SELECT  do_date,cast(DATE_FORMAT(access_time,'HH:mm') AS STRING) AS do_min,pv,uv,NOW() AS currenttime from view_total_pv_uv_min"
}

演示DEMO
在这里插入图片描述

演示案例参考

实时数仓|以upsert的方式读写Kafka数据—Flink1.12为例_upsert-connect 时间周期-CSDN博客

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/589129.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

阿里开源大模型 Qwen-72B 私有化部署

近期大家都知道阿里推出了自己的开源的大模型千问72B,据说对于中文非常友好,在开源模型里面,可谓是名列前茅。 千问拥有有强大的基础语言模型,已经针对多达 3 万亿个 token 的多语言数据进行了稳定的预训练,覆盖领域、…

【Java】JUC并发编程(重量锁、轻量锁、偏向锁)

JUC并发编程 预备: 创建一个maven工程,导入lombok和logback的依赖。 1、基础概念 1、进程与线程 **进程:**程序由指令和数据组成,但这些指令要运行,数据要读写,就必须将指令加载至 CPU ,数…

Golang不可不知的7个并发概念

并发性支持是Golang最重要的原生特性之一,本文介绍了Golang中和并发性相关的7个概念。原文: Golang: 7 must-know concurrency related concepts 并发是Go编程语言的基本特性,意味着程序可以同时执行多个任务。Golang的并发独特而强大,其内置…

【Linux操作系统】探秘Linux奥秘:文件系统的管理与使用

🌈个人主页:Sarapines Programmer🔥 系列专栏:《操作系统实验室》🔖诗赋清音:柳垂轻絮拂人衣,心随风舞梦飞。 山川湖海皆可涉,勇者征途逐星辉。 目录 🪐1 初识Linux OS &…

【数据结构】栈和队列(队列的基本操作和基础知识)

🌈个人主页:秦jh__https://blog.csdn.net/qinjh_?spm1010.2135.3001.5343🔥 系列专栏:《数据结构》https://blog.csdn.net/qinjh_/category_12536791.html?spm1001.2014.3001.5482 ​ 目录 前言 队列 队列的概念和结构 队列的…

二叉树详解(深度优先遍历、前序,中序,后序、广度优先遍历、二叉树所有节点的个数、叶节点的个数)

目录 一、树概念及结构(了解) 1.1树的概念 1.2树的表示 二、二叉树概念及结构 2.1概念 2.2现实中的二叉树: 2.3数据结构中的二叉树: 2.4特殊的二叉树: 2.5 二叉树的存储结构 2.51 顺序存储: 2.5.2 链式存储&…

CSS2_基础学习

CSS2_基础学习 一、css基础知识二、css选择器2.0 选择器的优先级2.1 CSS基本选择器2.2 复合选择器2.2.1. 交集选择器2.2.2. 并集选择器2.2.3. 后代选择器(加空格)2.2.4. 子代选择器2.2.5. 兄弟选择器2.2.6. 属性选择器2.2.7. 伪类选择器2.2.8. 伪元素选择…

微信小程序发送模板消息-详解【有图】

前言 在发送模板消息之前我们要首先搞清楚微信小程序的逻辑是什么,这只是前端的一个demo实现,建议大家在后端处理,前端具体实现:如下图 1.获取小程序Id和密钥 我们注册完微信小程序后,可以在开发设置中看到以下内容&a…

navicat premium历史版本下载及更新navicat premium15 永久(使用)有效期

1、navicat premium介绍 Navicat Premium 是一套可创建多个连接的数据库开发工具,让你从单一应用程序中同时连接 MySQL、Redis、MariaDB、MongoDB、SQL Server、Oracle、PostgreSQL 和 SQLite 。它与 GaussDB 、OceanBase 数据库及 Amazon RDS、Amazon Aurora、Amaz…

基于简化版python+VGG+MiniGoogLeNet的智能43类交通标志识别—深度学习算法应用(含全部python工程源码)+数据集+模型(一)

目录 前言总体设计系统整体结构图系统流程图 运行环境Python环境Anaconda环境 模块实现1. 数据预处理 相关其它博客工程源代码下载其它资料下载 前言 本项目专注于解决出国自驾游特定场景下的交通标志识别问题。借助Kaggle上的丰富交通标志数据集,我们采用了VGG和G…

推荐系统中 排序策略 CTR 动态加权平均法

CTR(Click-Through Rate)动态加权平均法是一种用于计算广告点击率的方法,其中每个点击率被赋予一个权重,这个权重可以随着时间、事件或其他因素而动态调整。这种方法旨在更灵活地反映广告点击率的变化,使得最近的数据更…

Mybatis 事务接口

当我们从数据源中得到一个可用的数据库连接之后,就可以开启一个数据库事务了,事务成功开启之后,我们才能修改数据库中的数据。 在修改完成之后,我们需要提交事务,完成整个事务内的全部修改操作,如果修改过…

JAVA:利用JUnit进行高效的单元测试

1、简述 在软件开发中,单元测试是确保代码质量和可维护性的关键步骤。JUnit作为Java领域最流行的单元测试框架之一,提供了简单而强大的测试工具,可以帮助开发者在项目开发过程中及时发现和修复代码中的问题。本文将介绍JUnit的基本用法以及一…

【每日一题】一周中的第几天

文章目录 Tag题目来源解题思路方法一:模拟 写在最后 Tag 【模拟】【数学】【2023-12-30】 题目来源 1185. 一周中的第几天 解题思路 方法一:模拟 思路 题目中的日期是在 1971 到 2100 年之间的有效日期,即 1971-01-01 到 2100-12-31 范围…

【c语言】飞机大战2

1.优化边界问题 之前视频中当使用drawAlpha函数时,是为了去除飞机后面变透明,当时当飞机到达边界的时候,会出现异常退出,这是因为drawAlpha函数不稳定,昨天试过制作掩码图,下载了一个ps,改的话&#xff0c…

易舟云财务软件使用教程【文章目录】

易舟云财务软件使用教程【文章目录】 1、财务软件导论2、易舟云财务软件3、财务软件原理4、账套5、会计凭证6、资金日记账7、发票8、员工工资9、固定资产10、期末处理(结转与结账)11、会计账簿12、财务报表13、财务软件设置 1、财务软件导论 财务软件导论 2、易舟云财务软件 …

Java循环高级(无限循环,break,continue,Random,逢七过,平方根,判断是否是质数,猜数字小游戏)

文章目录 1.无限循环概念:for格式:while格式:do...while格式:无限循环的注意事项: 2.条件控制语句break:continue: 3. Random使用步骤: 4. 逢七过5. 平方根6.判断是否为质数7. 猜数字小游戏 1.无限循环 概…

初识SpringBoot(2023最后一篇文章)

初识SpringBoot 1、SpringBoot概述 Spring是什么? Spring是一个于2003 年兴起的一个轻量级开源Java开发框架,由Rod Johnson 在其著作《Expert One-On-One J2EE Development and Design》。Spring是为了解决企业级应用开发的复杂性而创建的,使…

Linux系统安装DockerDocker-Compose

1、Docker安装 下载Docker依赖的组件 yum -y install yum-utils device-mapper-persistent-data lvm2 设置下载Docker服务的镜像源,设置为阿里云 yum-config-manager --add-repo http://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo 安装Docker服务 …

【中南林业科技大学】计算机组成原理复习包括题目讲解(超详细)

来都来了点个赞收藏关注一下再走呗🌹🌹🌹🌹 第1章:绪论 1.冯诺依曼机特点,与现代计算机的区别 冯诺依曼计算机的基本思想是:程序和数据以二进制形式表示,存储程序控制。在计算机中&…