Flink之Kafka Sink

  • 代码内容
package com.jin.demo;import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.kafka.clients.producer.ProducerConfig;import java.util.Properties;/*** @Author: J* @Version: 1.0* @CreateTime: 2023/6/29* @Description: 测试**/
public class FlinkKafkaSink {public static void main(String[] args) throws Exception {// 创建流环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 设置并行度为1env.setParallelism(1);// 添加数据源(CustomizeSource为自定义数据源,便于测试)SingleOutputStreamOperator<String> mapStream = env.addSource(new CustomizeSource()).map(bean -> bean.toString());// 设置生产者事务超时时间Properties properties = new Properties();properties.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, "10000");// 构建KafkaSinkKafkaSink<String> kafkaSink = KafkaSink.<String>builder()// 配置Kafka服务.setBootstrapServers("lx01:9092")// 配置消息序列化类型.setRecordSerializer(KafkaRecordSerializationSchema.<String>builder()// 配置kafka topic信息.setTopic("tpc-02")// 配置value序列化类型.setValueSerializationSchema(new SimpleStringSchema()).build())// 设置语义.setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)// 设置事务ID前缀.setTransactionalIdPrefix("JL-").build();// 将结果输出到kafkamapStream.sinkTo(kafkaSink);env.execute("Kafka Sink");}
}

结果数据

[root@lx01 bin]# ./kafka-console-consumer.sh --bootstrap-server lx01:9092 --topic tpc-02
CustomizeBean(name=AAA-274, age=64, gender=W, hobbit=钓鱼爱好者)
CustomizeBean(name=AAA-973, age=45, gender=W, hobbit=钓鱼爱好者)
CustomizeBean(name=AAA-496, age=71, gender=W, hobbit=非遗文化爱好者)
CustomizeBean(name=AAA-263, age=45, gender=M, hobbit=天文知识爱好者)
CustomizeBean(name=AAA-790, age=77, gender=W, hobbit=书法爱好者)
CustomizeBean(name=AAA-806, age=38, gender=M, hobbit=非遗文化爱好者)
CustomizeBean(name=AAA-498, age=58, gender=M, hobbit=篮球运动爱好者)
CustomizeBean(name=AAA-421, age=63, gender=M, hobbit=书法爱好者)
CustomizeBean(name=AAA-938, age=56, gender=W, hobbit=乒乓球运动爱好者)
CustomizeBean(name=AAA-278, age=18, gender=M, hobbit=乒乓球运动爱好者)
CustomizeBean(name=AAA-614, age=74, gender=W, hobbit=钓鱼爱好者)
CustomizeBean(name=AAA-249, age=67, gender=W, hobbit=天文知识爱好者)
CustomizeBean(name=AAA-690, age=72, gender=W, hobbit=网吧战神)
CustomizeBean(name=AAA-413, age=69, gender=M, hobbit=美食爱好者)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/13883.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【LeetCode】28. 找出字符串中第一个匹配项的下标

题目&#xff1a; 28. 找出字符串中第一个匹配项的下标 这道题一看就是经典的KMP算法求解字符串模式匹配问题。 但这里我用了java里自带的字符串匹配函数 indexOf(),虽然有点偷懒&#xff0c;但运行结果还不错。主要是怕有时候竞赛会突然忘了一些算法&#xff0c;不过有时候多…

[数据集][目标检测]天牛数据集目标检测数据集VOC格式3050张

数据集格式&#xff1a;Pascal VOC格式(不包含分割路径的txt文件和yolo格式的txt文件&#xff0c;仅仅包含jpg图片和对应的xml) 图片数量(jpg文件个数)&#xff1a;3050 标注数量(xml文件个数)&#xff1a;3050 标注类别数&#xff1a;1 标注类别名称:["longicorn"] …

Tomcat中利用war包部署

在Tomcat中利用war包部署Web应用程序时&#xff0c;默认情况下&#xff0c;应用程序的上下文路径&#xff08;也称为项目名称&#xff09;将是war文件的名称&#xff08;去除.war扩展名&#xff09;。这意味着您在访问Web应用程序时必须在URL中包含项目名称。例如&#xff0c;如…

Minio在windows环境配置https访问

minio启动后&#xff0c;默认访问方式为http&#xff0c;但是有的时候我们的访问场景必须是https&#xff0c;浏览器有的会默认以https进行访问&#xff0c;这个时候就需要我们进行配置上的调整&#xff0c;将minio从http访问升级到https。而查看minio的官方文档&#xff0c;并…

【Spring Cloud Alibaba】限流--Sentinel

文章目录 概述一、Sentinel 是啥&#xff1f;二、Sentinel 的生态环境三、Sentinel 核心概念3.1、资源3.2、规则 四、Sentinel 限流4.1、单机限流4.1.1、引入依赖4.1.2、定义限流规则4.1.3、定义限流资源4.1.4、运行结果 4.2、控制台限流4.2.1、客户端接入控制台4.2.2、引入依赖…

【Qt】QML-02:QQuickView用法

1、先看demo QtCreator自动生成的工程是使用QQmlApplicationEngine来加载qml文件&#xff0c;下面的demo将使用QQuickView来加载qml文件 #include <QGuiApplication> #include <QtQuick/QQuickView>int main(int argc, char *argv[]) {QGuiApplication app(argc,…

layui各种事件无效(例如表格重载或 分页插件按钮失效)的解决方法

下图是我一个系统的操作日志&#xff0c;在分页插件右下角嵌入了一个导出所有数据的按钮 &#xff0c;代码没有任何问题&#xff0c;点击导出按钮却失效 排查之后&#xff0c;发现表格标签table定义了ID又定义了lay-filter&#xff0c;因我使用的layui从2.7.6升级到2.8.11&…

JavaScript高级——ES6基础入门

目录 前言let 和 const块级作用域模板字符串一.模板字符串是什么二.模板字符串的注意事项三. 模板字符串的应用 箭头函数一.箭头函数是什么二.普通函数与箭头函数的转换三.this指向1. 全局作用域中的 this 指向2. 一般函数&#xff08;非箭头函数&#xff09;中的this指向3.箭头…

SSL 证书过期巡检脚本

哈喽大家好&#xff0c;我是咸鱼 我们知道 SSL 证书是会过期的&#xff0c;一旦过期之后需要重新申请。如果没有及时更换证书的话&#xff0c;就有可能导致网站出问题&#xff0c;给公司业务带来一定的影响 所以说我们要每隔一定时间去检查网站上的 SSL 证书是否过期 如果公…

Flutter - 微信朋友圈、十字滑动效果(微博/抖音个人中心效果)

demo 地址: https://github.com/iotjin/jh_flutter_demo 代码不定时更新&#xff0c;请前往github查看最新代码 前言 一般APP都有类似微博/抖音个人中心的效果&#xff0c;支持上下拉刷新&#xff0c;并且顶部有个图片可以下拉放大&#xff0c;图片底部是几个tab&#xff0c;可…

C# IO 相关功能整合

目录 删除文件和删除文件夹 拷贝文件到另一个目录 保存Json文件和读取Json文件 写入和读取TXT文件 打开一个弹框&#xff0c;选择 文件/文件夹&#xff0c;并获取路径 获取项目的Debug目录路径 获取一个目录下的所有文件集合 获取文件全路径、目录、扩展名、文件名称 …

WPF实现DiagramChart

1、文件架构 2、FlowChartStencils.xaml <ResourceDictionary xmlns"http://schemas.microsoft.com/winfx/2006/xaml/presentation"xmlns:x"http://schemas.microsoft.com/winfx/2006/xaml"xmlns:s"clr-namespace:DiagramDesigner"xmlns:c&…

了解Unity编辑器之组件篇Miscellaneous(九)

一、Aim Constraint&#xff1a;是一种动画约束&#xff0c;用于使一个对象朝向另一个对象或一个指定的矢量方向 Activate按钮&#xff1a;用于激活或停用Aim Constraint。当Aim Constraint处于激活状态时&#xff0c;其约束效果将应用于目标对象。 Zero按钮&#xff1a;用于将…

Zabbix监控ActiveMQ

当我们在线上使用了ActiveMQ 后&#xff0c;我们需要对一些参数进行监控&#xff0c;比如 消息是否有阻塞&#xff0c;哪个消息队列阻塞了&#xff0c;总的消息数是多少等等。下面我们就通过 Zabbix 结合 Python 脚本来实现对 ActiveMQ的监控。 一、创建 Activemq Python 监控…

39. Linux系统下在Qt5.9.9中搭建Android开发环境

1. 说明 QT版本:5.9.9 电脑系统:Linux JDK版本:openjdk-8-jdk SDK版本:r24.4.1 NDK版本:android-ndk-r14b 效果展示: 2. 具体步骤 大致安装的步骤如下:①安装Qt5.9.9,②安装jdk,③安装ndk,④安装sdk,⑤在qt中配置前面安装的环境路径 2.1 安装Qt5.9.9 首先下载…

PHP8的数据类型-PHP8知识详解

在PHP8中&#xff0c;变量不需要事先声明&#xff0c;赋值即声明。 不同的数据类型其实就是所储存数据的不同种类。在PHP8.0、8.1中都有所增加。以下是PHP8的15种数据类型&#xff1a; 1、字符串&#xff08;String&#xff09;&#xff1a;用于存储文本数据&#xff0c;可以使…

VScode的简单使用

一、VScode的安装 Visual Studio Code简称VS Code&#xff0c;是一款跨平台的、免费且开源的现代轻量级代码编辑器&#xff0c;支持几乎主流开发语言的语法高亮、智能代码补全、自定义快捷键、括号匹配和颜色区分、代码片段提示、代码对比等特性&#xff0c;也拥有对git的开箱…

node.js 爬虫图片下载

主程序文件 app.js 运行主程序前需要先安装使用到的模块&#xff1a; npm install superagent --save axios要安装指定版,安装最新版会报错&#xff1a;npm install axios0.19.2 --save const {default: axios} require(axios); const fs require(fs); const superagent r…

STM32 UDS Bootloader开发-上位机篇-CANoe制作(2)

文章目录 前言CANoe增加NodeCAPL脚本获取GUI中的参数刷写过程诊断仪在线接收回调函数发送函数总结前言 在上一篇文章中,介绍了UDS Bootloadaer上位机软件基于CANoe的界面设计。本文继续介绍CAPL脚本的编写以实现刷写过程。 CANoe增加Node 在开始编写CAPL之前,需要在Simula…

Android 耗时分析(adb shell/Studio CPU Profiler/插桩Trace API)

1.adb logcat 查看冷启动时间和Activity显示时间&#xff1a; 过滤Displayed关键字&#xff0c;可看到Activity的显示时间 那上面display后面的是时间是指包含哪些过程的时间呢&#xff1f; 模拟在Application中沉睡1秒操作&#xff0c;冷启动情况下&#xff1a; 从上可知&…