Flink Flink数据写入Kafka

一、环境准备

官网地址

flink官方集成了通用的 Kafka 连接器,使用时需要根据生产环境的版本引入相应的依赖

    <properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><flink.version>1.14.6</flink.version><spark.version>2.4.3</spark.version><hadoop.version>2.8.5</hadoop.version><hbase.version>1.4.9</hbase.version><hive.version>2.3.5</hive.version><java.version>1.8</java.version><scala.version>2.11.8</scala.version><mysql.version>8.0.22</mysql.version><scala.binary.version>2.11</scala.binary.version><maven.compiler.source>${java.version}</maven.compiler.source><maven.compiler.target>${java.version}</maven.compiler.target></properties>
        <dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.12</artifactId><version>${flink.version}</version></dependency>

二、KafkaSink介绍

在这里插入图片描述
在这里插入图片描述

三、正确理解序列化器

什么叫序列化和反序列化?
1.序列化:把对象转换为字节序列的过程称为对象的序列化.
2.反序列化:把字节序列恢复为对象的过程称为对象的反序列化.

序列化器的作用是将flink数据转换成 kafka的ProducerRecord
在这里插入图片描述
使用预定义的序列化器
将 DataStream 数据转换为 Kafka消息中的value,key为默认值null,timestamp为默认值

        KafkaSink<String> kafkaSink = KafkaSink.<String>builder()// 指定 kafka 的地址和端口.setBootstrapServers("localhost:9092")// 指定序列化器:指定Topic名称、具体的序列化(产生方需要序列化,接收方需要反序列化).setRecordSerializer(KafkaRecordSerializationSchema.<String>builder().setTopic("testtopic01")// 指定value的序列化器.setValueSerializationSchema(new SimpleStringSchema()).build())

源码解析

    public <T extends IN> KafkaRecordSerializationSchemaBuilder<T> setValueSerializationSchema(SerializationSchema<T> valueSerializationSchema) {this.checkValueSerializerNotSet();KafkaRecordSerializationSchemaBuilder<T> self = this.self();self.valueSerializationSchema = (SerializationSchema)Preconditions.checkNotNull(valueSerializationSchema);return self;}

使用自定义的序列化器

KafkaSink<String> kafkaSink = KafkaSink.<String>builder()// TODO 必填项:配置 kafka 的地址和端口.setBootstrapServers("localhost:9092")// TODO 必填项:配置消息序列化器信息 Topic名称、消息序列化器类型.setRecordSerializer(new KafkaRecordSerializationSchema<String>() {...............}).build();

四、容错保证级别

KafkaSink 总共支持三种不同的语义保证(DeliveryGuarantee)

DeliveryGuarantee.NONE 不提供任何保证
消息有可能会因 Kafka broker 的原因发生丢失或因 Flink 的故障发生重复

DeliveryGuarantee.AT_LEAST_ONCE 至少一次
sink 在 checkpoint 时会等待 Kafka 缓冲区中的数据全部被 Kafka producer 确认。
消息不会因 Kafka broker 端发生的事件而丢失,但可能会在 Flink 重启时重复,因为 Flink 会重新处理旧数据。

DeliveryGuarantee.EXACTLY_ONCE 精确一次
该模式下,Kafka sink 会将所有数据通过在 checkpoint 时提交的事务写入。
因此,如果 consumer 只读取已提交的数据(参见 Kafka consumer 配置 isolation.level),在 Flink 发生重启时不会发生数据重复。
然而这会使数据在 checkpoint 完成时才会可见,因此请按需调整 checkpoint 的间隔。
请确认事务 ID 的前缀(transactionIdPrefix)对不同的应用是唯一的,以保证不同作业的事务 不会互相影响!此外,强烈建议将 Kafka 的事务超时时间调整至远大于 checkpoint 最大间隔 + 最大重启时间,否则 Kafka 对未提交事务的过期处理会导致数据丢失。

五、案例—Flink将Socket数据写入Kafka(精准一次)

注意:如果要使用 精准一次 写入 Kafka,需要满足以下条件,缺一不可
1、开启 checkpoint
2、设置事务前缀
3、设置事务超时时间: checkpoint 间隔 < 事务超时时间 < max 的 15 分钟

package com.flink.DataStream.Sink;import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.util.Properties;public class flinkSinkKafka {public static void main(String[] args) throws Exception {StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();streamExecutionEnvironment.setParallelism(1);// 如果是精准一次,必须开启 checkpointstreamExecutionEnvironment.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);DataStreamSource<String> streamSource = streamExecutionEnvironment.socketTextStream("localhost", 8888);/*** TODO Kafka Sink* TODO 注意:如果要使用 精准一次 写入 Kafka,需要满足以下条件,缺一不可* 1、开启 checkpoint* 2、设置事务前缀* 3、设置事务超时时间: checkpoint 间隔 < 事务超时时间 < max 的 15 分钟*/Properties properties=new Properties();properties.put("transaction.timeout.ms",10 * 60 * 1000 + "");KafkaSink<String> kafkaSink = KafkaSink.<String>builder()// 指定 kafka 的地址和端口.setBootstrapServers("localhost:9092")// 指定序列化器:指定Topic名称、具体的序列化(产生方需要序列化,接收方需要反序列化).setRecordSerializer(KafkaRecordSerializationSchema.<String>builder().setTopic("testtopic01")// 指定value的序列化器.setValueSerializationSchema(new SimpleStringSchema()).build())// 写到 kafka 的一致性级别: 精准一次、至少一次.setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)// 如果是精准一次,必须设置 事务的前缀.setTransactionalIdPrefix("flinkkafkasink-")// 如果是精准一次,必须设置 事务超时时间: 大于 checkpoint间隔,小于 max 15 分钟.setKafkaProducerConfig(properties)//.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10 * 60 * 1000 + "").build();streamSource.sinkTo(kafkaSink);streamExecutionEnvironment.execute();}
}

理解ProduceerConfig配置源码
在这里插入图片描述

六、启动Zookeeper、Kafka

#启动zookeeper
${ZK_HOME}/bin/zkServer.sh start
#查看zookeeper状态
${ZK_HOME}/bin/zkServer.sh status
#启动kafka
${KAFKA_HOME}/bin/kafka-server-start.sh ${KAFKA_HOME}/config/server.properties
#查看topic
${KAFKA_HOME}/bin/kafka-topics.sh --list --zookeeper localhost:2181
#创建topic
${KAFKA_HOME}/bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic testtopic02 --partitions 2 --replication-factor 1
#删除topic
${KAFKA_HOME}/bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic testtopic02
#生产消息
${KAFKA_HOME}/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic testtopic01
#消费消息
${KAFKA_HOME}/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic testtopic01 --from-beginning

通过socket模拟数据写入Flink之后,Flink将数据写入Kafka
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/203832.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

CCF编程能力等级认证GESP—C++1级—20230611

CCF编程能力等级认证GESP—C1级—20230611 单选题&#xff08;每题 2 分&#xff0c;共 30 分&#xff09;判断题&#xff08;每题 2 分&#xff0c;共 20 分&#xff09;编程题 (每题 25 分&#xff0c;共 50 分)时间规划累计相加 答案及解析单选题判断题编程题1编程题2 单选题…

关于小红书商单变现的一些答疑

AI小红书商单训练营也过去1个月了&#xff0c;今天给大家汇总几个常遇到的问题&#xff0c;希望对大家在运营过程中有所帮助。 1.账号封面是否要统一模版&#xff1f; 为了让账号主页呈现整洁美观的效果&#xff0c;建议统一封面设计&#xff0c;视频开头可以设置一个固定画面…

景联文科技:高质量垂直领域数据集助力AI技术突破

随着人工智能技术的飞速发展&#xff0c;垂直领域数据集在提升模型性能、解决领域问题、推动创新应用以及提升竞争力等方面的重要性日益凸显。 提高模型性能&#xff1a;垂直领域数据集专注于特定任务或领域&#xff0c;使用这些数据集进行训练可以让模型更好地理解和解决特定领…

redis应用-分布式锁

目录 什么是分布式锁 分布式锁的基本实现 引入过期时间 引入校验id 引入lua 引入看门狗 引入redlock算法 什么是分布式锁 在一个分布式系统中,也会涉及到多个节点访问同一个公共资源的情况,此时就需要通过锁来做互斥控制,避免出现类似于"线程安全"的问题. 而…

@Autowired注入多态

如IBizStudyService接口有多个实现类BizStudyServiceImpl和BizStudyServiceExImpl&#xff0c;在Autowired注入时要用Qualifier指定实现类名称。 Autowired Qualifier("BizStudyServiceImpl") private IBizStudyService bizStudyService; 在实现类定义时要加上名称…

【开源】基于Vue和SpringBoot的计算机机房作业管理系统

项目编号&#xff1a; S 017 &#xff0c;文末获取源码。 \color{red}{项目编号&#xff1a;S017&#xff0c;文末获取源码。} 项目编号&#xff1a;S017&#xff0c;文末获取源码。 目录 一、摘要1.1 项目介绍1.2 项目录屏 二、功能模块2.1 登录注册模块2.2 课程管理模块2.3 课…

【QT】Qt类库的模块

目录 1.Qt基本模块 2.Qt附加模块 3.增值模块 4.技术预览模块 5.Qt工具 1.Qt基本模块 Qt基本模块是Qt在所有平台上的基本功能&#xff0c;它们在所有的开发平台和目标平台上都可用&#xff0c;在Qt5所有版本上是源代码和二进制兼容的。 Qtcore模块是Qt类库的核心&#xff0c;所有…

【2021研电赛】基于EAIDK-310的云端互联无人驾驶系统

本作品介绍参与极术社区的有奖征集|分享研电赛作品扩大影响力&#xff0c;更有重磅电子产品免费领取! 参赛单位&#xff1a;上海理工大学 参赛队伍&#xff1a;你说的都是对的 指导老师&#xff1a;蒋全 参赛队员&#xff1a;童锐&#xff0c;邹祖奇&#xff0c;胡涛 获奖情况&…

分享一个Python网络爬虫数据采集利器

前言 你是否曾为获取重要数据而感到困扰&#xff1f;是否因为数据封锁而无法获取所需信息&#xff1f;是否因为数据格式混乱而头疼&#xff1f;现在&#xff0c;所有这些问题都可以迎刃而解。让我为大家介绍一款强大的数据收集平台——亮数据Bright Data。 作为世界领先的数据…

聚观早报 |JFrog发布新功能;中科百孚减持龙芯中科股票

【聚观365】12月7日消息 JFrog发布新功能 中科百孚减持龙芯中科股票 商汤集团再回应做空报告 xAI融资新进展 苹果市值再次突破 JFrog发布新功能 流式软件公司、企业软件供应链平台提供商JFrog发布新功能&#xff0c;推出业界首款致力于加速安全软件建构与发布的端到端平台…

什么是神经网络的超参数

1 引言 超参数在神经网络的设计和训练中起着至关重要的作用。它们是在开始训练之前设置的参数&#xff0c;与网络的结构、训练过程和优化算法有关。正确的超参数选择对于达到最优模型性能至关重要。 2 神经网络结构的超参数 层数&#xff08;Layers&#xff09;&#xff1a; 决…

Elastcsearch:通过 Serverless 提供更多服务

作者&#xff1a;Ken Exner 人们使用 Elasticsearch 解决最大数据挑战的方式一直令我们感到惊讶。 从超过 40 亿次下载、70,000 次提交、1,800 名贡献者以及我们全球社区的反馈中可以清楚地看出这一点。 Elastic 在广泛的用例中发挥的作用促使我们简化复杂性&#xff0c;让搜索…

不敢想象,会用大数据分析工具有多爽!

当业务人、小白会用大数据分析工具会有多爽&#xff1f;1、再不用去跟IT沟通需求&#xff0c;等IT取数开发报表&#xff1b;2、有新的分析需求&#xff0c;我当场就能分析数据&#xff0c;获取信息&#xff1b;3、有足够多的数据信息支撑业务分析决策&#xff0c;实现从经验决策…

【富文本编辑器】原生JS使用WangEditor和vue上传图片前后端demo

【富文本编辑器】原生JS使用WangEditor上传图片前后端demo 第一步 HTML 第二步 初始化WangEditor与图片上传回调函数 第三步 后端返回数据体封装 第四步 后端接口上传图片&#xff0c;并返回图片地址 最近&#xff0c;我遇到了这样一个问题&#xff1a;因为我们的项目是基于…

MySQL和MongoDB简介以及它们之间的区别

本文主要介绍MySQL和MongoDB的简介以及它们之间的区别。 目录 MySQL简介MySQL的优缺点MySQL的应用场景MongoDB简介MongoDB的优缺点MongoDB的应用场景MySQL和MongoDB的区别 MySQL简介 MySQL是一种开源的关系型数据库管理系统&#xff0c;是世界上最流行的数据库之一。它支持多用…

DAPP开发【10】express.js的使用

Express.js 是一种流行、轻量级的开源 Web 应用程序框架&#xff0c;用于开发基于 Node.js 的服务器端 Web 应用程序。它提供了强大的功能集&#xff0c;适用于 Web 和移动应用程序。Express.js 旨在支持单页、多页和混合式 Web 应用程序的开发。Express.js 提供了广泛的功能&a…

Linux软件包管理器yum

yum—Linux应用商店 前言Linux的软件安装1. 源代码安装2. rpm安装使用rpm安装升级或者更新.rpm软件包卸载指定的.rpm软件包查询已安装的.rpm软件包优缺点 3. yum安装&#xff08;推荐&#xff09;yum源使用yum命令&#xff08;检测是否有网&#xff1a;ping指令&#xff09;优缺…

Nginx的反向代理与负载均衡

概念介绍 1). 正向代理 正向代理服务器是一个位于客户端和原始服务器(origin server)之间的服务器&#xff0c;为了从原始服务器取得内容&#xff0c;客户端向代理发送一个请求并指定目标(原始服务器)&#xff0c;然后代理向原始服务器转交请求并将获得的内容返回给客户端。 …

51单片机的硬件组成的功能以及40个引脚的功能

AT89S51单片机的硬件组成 本文主要涉及AT89S51单片机的硬件结构&#xff0c;与89C51还是存在一定的区别文中有说明&#xff0c;介绍了单片机的各硬件的基本功能&#xff0c;并详细介绍了单片机40个引脚的功能 文章目录 AT89S51单片机的硬件组成一、 AT89S51单片机的硬件组成1.1…

Qt开发学习笔记01

设置窗口背景图 在 .h 文件中添加引用和方法 #include <QPainter> #include <QPixmap> void paintEvent(QPaintEvent *);.cpp 文件中实现 paintEvent void sur_dev::paintEvent(QPaintEvent *ev) {QPainter painter(this);QPixmap pix;pix.load(":/image/bj01…