PiflowX组件-WriteToKafka

WriteToKafka组件

组件说明

将数据写入kafka。

计算引擎

flink

有界性

Streaming Append Mode

组件分组

kafka

端口

Inport:默认端口

outport:默认端口

组件属性

名称展示名称默认值允许值是否必填描述例子
kafka_hostKAFKA_HOST“”逗号分隔的Kafka broker列表。127.0.0.1:9092
topicTOPIC“”写入的topic名。注意不支持topic列表。test
schemaSCHEMA“”Kafka消息的schema信息。若不指定,将从上游输入数据推断。id:int,name:string,age:int
formatFORMAT“”Set(“json”, “csv”, “avro”, “parquet”, “orc”, “raw”, “protobuf”,“debezium-json”, “canal-json”, “maxwell-json”, “ogg-json”)用来序列化或反序列化Kafka消息的格式。注意:该配置项和 ‘value.format’ 二者必需其一。json
propertiesPROPERTIES“”Kafka source连接器其他配置

WriteToKafka示例配置

{"flow": {"name": "DataGenTest","uuid": "1234","stops": [{"uuid": "0000","name": "DataGen1","bundle": "cn.piflow.bundle.flink.common.DataGen","properties": {"schema": "[{\"filedName\":\"id\",\"filedType\":\"INT\",\"kind\":\"sequence\",\"start\":1,\"end\":10000},{\"filedName\":\"name\",\"filedType\":\"STRING\",\"kind\":\"random\",\"length\":15},{\"filedName\":\"age\",\"filedType\":\"INT\",\"kind\":\"random\",\"max\":100,\"min\":1}]","count": "100","ratio": "5"}},{"uuid": "1111","name": "WriteToKafka1","bundle": "cn.piflow.bundle.flink.kafka.WriteToKafka","properties": {"kafka_host": "hadoop01:9092","topic": "test","schema": "","format": "json","properties": "{}"}},{"uuid": "2222","name": "ReadFromKafka1","bundle": "cn.piflow.bundle.flink.kafka.ReadFromKafka","properties": {"kafka_host": "hadoop01:9092","topic": "test","group": "test","startup_mode": "earliest-offset","schema": "id:int,name:string,age:int","format": "json","properties": "{}"}},{"uuid": "3333","name": "ShowData1","bundle": "cn.piflow.bundle.flink.common.ShowData","properties": {"showNumber": "5000"}}],"paths": [{"from": "DataGen1","outport": "","inport": "","to": "WriteToKafka1"},{"from": "WriteToKafka1","outport": "","inport": "","to": "ReadFromKafka1"},{"from": "ReadFromKafka1","outport": "","inport": "","to": "ShowData1"}]}
}
示例说明

本示例演示了通过DataGen组件生成id,name,age3个字段100条数据,每秒生成5条数据,通过WriteToKafka组件将数据写入到kafka的test topic中,然后通过ReadFromKafka组件从test topic中读取数据,最后使用ShowData组件将数据打印在控制台。

字段描述
[{       "filedName": "id","filedType": "INT","kind": "sequence","start": 1,"end": 10000},{       "filedName": "name","filedType": "STRING","kind": "random","length": 15},{       "filedName": "age","filedType": "INT","kind": "random","max": 100,"min": 1} 
]

1.id字段

id字段类型为INT,使用sequence生成器,序列生成器的起始值为1,结束值为10000.

2.name字段

name字段类型为STRING,使用random生成器,生成字符长度为15。

3.age字段

age字段类型为INT,使用random生成器,随机生成器的最小值为1,最大值为100。
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/584695.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

使用Commons JXPath简化XML/JSON处理

第1章:引言 咱们都知道,在现代软件开发中,处理XML和JSON数据几乎是家常便饭。这两种格式广泛应用于配置文件、数据交换、API响应等领域。不过,要手动解析和操作它们,有时候真是让人头大。 当你面对一堆复杂的XML或JS…

JavaSE语法之十一:接口(超全!!!)

文章目录 1. 概念2. 语法规则3. 接口使用4. 接口特性5. 实现多个接口6. 接口间的继承7. 接口使用实例8. Clonable 接口和深拷贝9. 抽象类和接口的区别(重要!) 1. 概念 在现实生活中的接口比比皆是,如:笔记本上的USB接…

Flink 输出至 Elasticsearch

【1】引入pom.xml依赖 <dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-elasticsearch6_2.12</artifactId><version>1.10.0</version> </dependency>【2】ES6 Scala代码&#xff0c;自动导入的…

私有部署ELK,搭建自己的日志中心(五)-- 使用filebeat采集K8S pod的jvm日志

一、背景 前文采集的日志是来源于虚拟机&#xff0c;在云原生下的容器化部署时&#xff0c;需要采集Pod里的jvm日志。 二、术语 1、临时存储emptyDir emptyDir在Pod被分配到Node时创建的&#xff0c;它的初始内容为空&#xff0c;并且无须指定宿主机上对应的目录文件&#…

分布式系统架构设计之分布式系统架构演进和版本管理

在分布式系统的生命周期中&#xff0c;架构演进和版本管理是很重要的两个环节。本部分会介绍分布式系统架构演进的原则、策略以及版本管理的最佳实践&#xff0c;以帮助研发团队更好地应对需求变化、技术发展和系统升级。 架构演进 演进原则 渐进式演进 采用渐进式演进的原…

VsCode(Visual Studio Code) 安装插件教程

VsCode&#xff08;Visual Studio Code&#xff09; 安装插件教程 1、打开VsCode 桌面上、最近安装程序上找找吧 在桌面上双击&#xff0c;打开 2、打开的效果图 3、点击左侧最后的插件图标 4、打开后如下图所示 5、在输入框中输入Go 6、同样在安装插件页面也可以卸载、安装…

WPF 显示气泡提示框

气泡提示框应用举例 有时候在我们开发的软件经常会遇到需要提示用户的地方&#xff0c;为了让用户更直观&#xff0c;快速了解提示信息&#xff0c;使用简洁、好看又方便的气泡提示框显得更加方便&#xff0c;更具人性化。如下面例子&#xff1a;(当用户未输入账号时&#xff0…

抬头举手阅读YOLOV8NANO

首先用YOLOV8NANO得到PT模型&#xff0c;转换成ONNX,OPENCV调用&#xff0c;PYTHON,C,ANDROID都可以举手写字阅读YOLOV8NANO

android studio 将含有jni c++ 的library项目封装成jar并调用

请参考博客&#xff1a;android studio 4.1.1 将library项目封装成aar 并调用_android studio 4.1 aar release-CSDN博客 一 . 简单叙述 android studio 中可以创建Module 的两种属性&#xff0c;可以在build.gradle 中查看&#xff1a; 1. application属性&#xff1a;可以独…

DM、Oracle、GaussDB、Kingbase8(人大金仓数据库)和HIVE给列增加注释

DM数据库给列增加注释 1、创建表 CREATE TABLE test222 ( id int NOT NULL PRIMARY KEY, name varchar(1000) DEFAULT NULL, email varchar(1000) DEFAULT NULL, phone varchar(1000) DEFAULT NULL ) 2、给列添加注释 comment on column TEST222.NAME is 这是一个列注释; 例如…

[Angular] 笔记 17:提交表单 - ngSubmit

Submitting Forms (ngSubmit) 表单的一般完整写法&#xff1a; 如果表单验证失败&#xff0c;必须 disable 提交按钮&#xff0c;阻止用户提交不合法的数据。 提交表单后&#xff0c;与表单对应的 json 数据 post 到后端&#xff1a; {"id":1,"name":…

【Week-P3】CNN天气识别

文章目录 一、环境配置二、准备数据三、搭建网络结构四、开始训练五、查看训练结果六、总结6.1 不改变学习率的前提下&#xff0c;将训练epoch分别增加到50、60、70、80、90&#xff08;1&#xff09;epoch 50 的训练情况如下&#xff1a;&#xff08;2&#xff09;epoch 60 …

JAVA B/S架构智慧工地源码,PC后台管理端、APP移动端

智慧工地系统充分利用计算机技术、互联网、物联网、云计算、大数据等新一代信息技术&#xff0c;以PC端&#xff0c;移动端&#xff0c;设备端三位一体的管控方式为企业现场工程管理提供了先进的技术手段。让劳务、设备、物料、安全、环境、能源、资料、计划、质量、视频监控等…

前后端分离nodejs+vue+ElementUi网上订餐系统69b9

课题主要分为两大模块&#xff1a;即管理员模块和用户模块&#xff0c;主要功能包括个人中心、用户管理、菜品类型管理、菜品信息管理、留言反馈、在线交流、系统管理、订单管理等&#xff1b; 运行软件:vscode 前端nodejsvueElementUi 语言 node.js 框架&#xff1a;Express/k…

STM32CubeMX教程10 RTC 实时时钟 - 周期唤醒、闹钟A/B事件和备份寄存器

目录 1、准备材料 2、实验目标 3、实验流程 3.0、前提知识 3.1、CubeMX相关配置 3.1.1 、时钟树配置 3.1.2、外设参数配置 3.1.3 、外设中断配置 3.2、生成代码 3.2.1、外设初始化函数调用流程 3.2.2、外设中断函数调用流程 3.2.3、添加其他必要代码 4、常用函数 …

Linux性能优化全景指南

Part1 Linux性能优化 1、性能优化性能指标 高并发和响应快对应着性能优化的两个核心指标&#xff1a;吞吐和延时 应用负载角度&#xff1a;直接影响了产品终端的用户体验系统资源角度&#xff1a;资源使用率、饱和度等 性能问题的本质就是系统资源已经到达瓶颈&#xff0c;但…

深度学习在自然语言处理中的应用

深度学习在自然语言处理中的应用 一、引言 随着人工智能技术的飞速发展&#xff0c;自然语言处理&#xff08;NLP&#xff09;作为其重要分支&#xff0c;已经在诸多领域取得了令人瞩目的成果。深度学习作为当前最炙手可热的技术&#xff0c;为NLP带来了革命性的变革。本文将…

python+django网上银行业务综合管理系统vue_bvj8b

本课题主要研究如何用信息化技术改善传统网上银行综合管理行业的经营和管理模式&#xff0c;简化网上银行综合管理的难度&#xff0c;根据管理实际业务需求&#xff0c;调研、分析和编写系统需求文档&#xff0c;设计编写符合银行需要的系统说明书&#xff0c;绘制数据库结构模…

php获取访客IP、UA、操作系统、浏览器等信息

最近有个需求就是获取下本地的ip地址、网上搜索了相关的教程&#xff0c;总结一下分享给大家、有需要的小伙伴可以参考一下 一、简单的获取 User Agent 信息代码: echo $_SERVER[HTTP_USER_AGENT]; 二、获取访客操作系统信息: /** * 获取客户端操作系统信息,包括win10 * pa…

SAP缓存 表缓存( Table Buffering)

本文主要介绍SAP中的表缓存在查询数据&#xff0c;更新数据时的工作情况以及对应概念。 SAP表缓存的工作 查询数据 更新数据 删除数据 表缓存的概念 表缓存技术设置属性 不允许缓冲&#xff1a; 允许缓冲&#xff0c;但已关闭&#xff1a; 缓冲已激活&#xff1a; 已…