【Flink-1.17-教程】-【四】Flink DataStream API(7)输出算子(Sink)

【Flink-1.17-教程】-【四】Flink DataStream API(7)输出算子(Sink)

  • 1)连接到外部系统
  • 2)输出到文件
  • 3)输出到 Kafka
  • 4)输出到 MySQL(JDBC)
  • 5)自定义 Sink 输出

Flink 作为数据处理框架,最终还是要把计算处理的结果写入外部存储,为外部应用提供支持。

在这里插入图片描述

1)连接到外部系统

Flink 的 DataStream API 专门提供了向外部写入数据的方法:addSink。与 addSource 类似,addSink 方法对应着一个“Sink”算子,主要就是用来实现与外部系统连接、并将数据提交写入的;Flink 程序中所有对外的输出操作,一般都是利用 Sink 算子完成的。

Flink1.12 以前,Sink 算子的创建是通过调用 DataStream 的.addSink()方法实现的。

stream.addSink(new SinkFunction());

addSink 方法同样需要传入一个参数,实现的是 SinkFunction 接口。在这个接口中只需要重写一个方法 invoke(),用来将指定的值写入到外部系统中。这个方法在每条数据记录到来时都会调用。

Flink1.12 开始,同样重构了 Sink 架构,

stream.sinkTo()

当然,Sink 多数情况下同样并不需要我们自己实现。之前我们一直在使用的 print 方法其实就是一种 Sink,它表示将数据流写入标准控制台打印输出。Flink 官方为我们提供了一部分的框架的 Sink 连接器。如下图所示,列出了 Flink 官方目前支持的第三方系统连接器:

在这里插入图片描述

我们可以看到,像 Kafka 之类流式系统,Flink 提供了完美对接,source / sink 两端都能连接,可读可写;而对于 Elasticsearch、JDBC 等数据存储系统,则只提供了输出写入的 sink 连接器。

除 Flink 官方之外,Apache Bahir 框架,也实现了一些其他第三方系统与 Flink 的连接器。

在这里插入图片描述

除此以外,就需要用户自定义实现 sink 连接器了。

2)输出到文件

Flink 专门提供了一个流式文件系统的连接器:FileSink,为批处理和流处理提供了一个统一的 Sink,它可以将分区文件写入 Flink 支持的文件系统。

FileSink 支持行编码(Row-encoded)和批量编码(Bulk-encoded)格式。这两种不同的方式都有各自的构建器(builder),可以直接调用 FileSink 的静态方法:

  • 行编码: FileSink.forRowFormat(basePath,rowEncoder)。
  • 批量编码: FileSink.forBulkFormat(basePath,bulkWriterFactory)。

示例:

public class SinkFile {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// TODO 每个目录中,都有 并行度个数的 文件在写入env.setParallelism(2);// 必须开启checkpoint,否则一直都是 .inprogressenv.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);DataGeneratorSource<String> dataGeneratorSource = new DataGeneratorSource<>(new GeneratorFunction<Long, String>() {@Overridepublic String map(Long value) throws Exception {return "Number:" + value;}},Long.MAX_VALUE,RateLimiterStrategy.perSecond(1000),Types.STRING);DataStreamSource<String> dataGen = env.fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(), "data-generator");// TODO 输出到文件系统FileSink<String> fieSink = FileSink// 输出行式存储的文件,指定路径、指定编码.<String>forRowFormat(new Path("f:/tmp"), new SimpleStringEncoder<>("UTF-8"))// 输出文件的一些配置: 文件名的前缀、后缀.withOutputFileConfig(OutputFileConfig.builder().withPartPrefix("atguigu-").withPartSuffix(".log").build())// 按照目录分桶:如下,就是每个小时一个目录.withBucketAssigner(new DateTimeBucketAssigner<>("yyyy-MM-dd HH", ZoneId.systemDefault()))// 文件滚动策略:  1分钟 或 1m.withRollingPolicy(DefaultRollingPolicy.builder().withRolloverInterval(Duration.ofMinutes(1)).withMaxPartSize(new MemorySize(1024*1024)).build()).build();dataGen.sinkTo(fieSink);env.execute();}
}

3)输出到 Kafka

1、添加 Kafka 连接器依赖。

由于我们已经测试过从 Kafka 数据源读取数据,连接器相关依赖已经引入,这里就不重复介绍了。

2、启动 Kafka 集群。

3、编写输出到 Kafka 的示例代码。

(1)输出无 key 的 record:

public class SinkKafka {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 如果是精准一次,必须开启checkpoint(后续章节介绍)env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);SingleOutputStreamOperator<String> sensorDS = env.socketTextStream("hadoop102", 7777);/*** Kafka Sink:* TODO 注意:如果要使用 精准一次 写入Kafka,需要满足以下条件,缺一不可* 1、开启checkpoint(后续介绍)* 2、设置事务前缀* 3、设置事务超时时间:   checkpoint间隔 <  事务超时时间  < max的15分钟*/KafkaSink<String> kafkaSink = KafkaSink.<String>builder()// 指定 kafka 的地址和端口.setBootstrapServers("hadoop102:9092,hadoop103:9092,hadoop104:9092")// 指定序列化器:指定Topic名称、具体的序列化.setRecordSerializer(KafkaRecordSerializationSchema.<String>builder().setTopic("ws").setValueSerializationSchema(new SimpleStringSchema()).build())// 写到kafka的一致性级别: 精准一次、至少一次.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)// 如果是精准一次,必须设置 事务的前缀.setTransactionalIdPrefix("atguigu-")// 如果是精准一次,必须设置 事务超时时间: 大于checkpoint间隔,小于 max 15分钟.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10*60*1000+"").build();sensorDS.sinkTo(kafkaSink);env.execute();}
}

(2)自定义序列化器,实现带 key 的 record:

public class SinkKafkaWithKey {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);env.setRestartStrategy(RestartStrategies.noRestart());SingleOutputStreamOperator<String> sensorDS = env.socketTextStream("hadoop102", 7777);/*** 如果要指定写入kafka的key* 可以自定义序列器:* 1、实现 一个接口,重写 序列化 方法* 2、指定key,转成 字节数组* 3、指定value,转成 字节数组* 4、返回一个 ProducerRecord对象,把key、value放进去**/KafkaSink<String> kafkaSink = KafkaSink.<String>builder().setBootstrapServers("hadoop102:9092,hadoop103:9092,hadoop104:9092").setRecordSerializer(new KafkaRecordSerializationSchema<String>() {@Nullable@Overridepublic ProducerRecord<byte[], byte[]> serialize(String element, KafkaSinkContext context, Long timestamp) {String[] datas = element.split(",");byte[] key = datas[0].getBytes(StandardCharsets.UTF_8);byte[] value = element.getBytes(StandardCharsets.UTF_8);return new ProducerRecord<>("ws", key, value);}}).setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE).setTransactionalIdPrefix("atguigu-").setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10 * 60 * 1000 + "").build();sensorDS.sinkTo(kafkaSink);env.execute();}
}

4、运行代码,在 Linux 主机启动一个消费者,查看是否收到数据。

[hadoop102 ~]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic ws

4)输出到 MySQL(JDBC)

写入数据的 MySQL 的测试步骤如下。

1、添加依赖。

添加 MySQL 驱动:

<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.27</version>
</dependency>

官方还未提供 flink-connector-jdbc 的 1.17.0 的正式依赖,暂时从 apache snapshot 仓库下载,pom 文件中指定仓库路径:

<repositories>
<repository>
<id>apache-snapshots</id>
<name>apache snapshots</name>
<url>https://repository.apache.org/content/repositories/snapshots/</url>
</repository>
</repositories>

添加依赖:

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc</artifactId>
<version>1.17-SNAPSHOT</version>
</dependency>

如果不生效,还需要修改本地 maven 的配置文件,mirrorOf 中添加如下标红内容:

<mirror>
<id>aliyunmaven</id>
<mirrorOf>*,!apache-snapshots</mirrorOf>
<name>阿里云公共仓库</name>
<url>https://maven.aliyun.com/repository/public</url>
</mirror>

2、启动 MySQL,在 test 库下建表 ws。

mysql>
CREATE TABLE `ws` (
`id` varchar(100) NOT NULL,
`ts` bigint(20) DEFAULT NULL,
`vc` int(11) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8

3、编写输出到 MySQL 的示例代码。

public class SinkMySQL {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("hadoop102", 7777).map(new WaterSensorMapFunction());/*** TODO 写入mysql* 1、只能用老的sink写法: addsink* 2、JDBCSink的4个参数:*    第一个参数: 执行的sql,一般就是 insert into*    第二个参数: 预编译sql, 对占位符填充值*    第三个参数: 执行选项 ---》 攒批、重试*    第四个参数: 连接选项 ---》 url、用户名、密码*/SinkFunction<WaterSensor> jdbcSink = JdbcSink.sink("insert into ws values(?,?,?)",new JdbcStatementBuilder<WaterSensor>() {@Overridepublic void accept(PreparedStatement preparedStatement, WaterSensor waterSensor) throws SQLException {//每收到一条WaterSensor,如何去填充占位符preparedStatement.setString(1, waterSensor.getId());preparedStatement.setLong(2, waterSensor.getTs());preparedStatement.setInt(3, waterSensor.getVc());}},JdbcExecutionOptions.builder().withMaxRetries(3) // 重试次数.withBatchSize(100) // 批次的大小:条数.withBatchIntervalMs(3000) // 批次的时间.build(),new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl("jdbc:mysql://hadoop102:3306/test?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8").withUsername("root").withPassword("000000").withConnectionCheckTimeoutSeconds(60) // 重试的超时时间.build());sensorDS.addSink(jdbcSink);env.execute();}
}

4、运行代码,用客户端连接 MySQL,查看是否成功写入数据。

5)自定义 Sink 输出

如果我们想将数据存储到我们自己的存储设备中,而 Flink 并没有提供可以直接使用的连接器,就只能自定义 Sink 进行输出了。与 Source 类似,Flink 为我们提供了通用的 SinkFunction 接口和对应的 RichSinkDunction 抽象类,只要实现它,通过简单地调用 DataStream 的 .addSink() 方法就可以自定义写入任何外部存储。

stream.addSink(new MySinkFunction<String>());

在实现 SinkFunction 的时候,需要重写的一个关键方法 invoke(),在这个方法中我们就可以实现将流里的数据发送出去的逻辑。

这种方式比较通用,对于任何外部存储系统都有效;不过自定义 Sink 想要实现状态一致性并不容易,所以一般只在没有其它选择时使用。实际项目中用到的外部连接器 Flink 官方基本都已实现,而且在不断地扩充,因此自定义的场景并不常见。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/652883.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C++力扣题目416--分割等和子集 1049--最后一块石头的重量II

416. 分割等和子集 力扣题目链接(opens new window) 题目难易&#xff1a;中等 给定一个只包含正整数的非空数组。是否可以将这个数组分割成两个子集&#xff0c;使得两个子集的元素和相等。 注意: 每个数组中的元素不会超过 100 数组的大小不会超过 200 示例 1: 输入: […

k8s-配置管理

一、ConfigMap 1.1 创建ConfigMap 1.2 在环境种使用ConfigMap ConfigMap最为常见的使用方式就是在环境变量和Volume中引用。 1.3 在Volume中引用ConfigMap 在Volume中引用ConfigMap&#xff0c;就是通过文件的方式直接将ConfigMap的每条数据填入Volume&#xff0c;每条数据是…

【JavaSE篇】——数组的定义与使用

目录 本章的目标&#xff1a; &#x1f388;数组的基本概念 &#x1f36d;创建数组 &#x1f36d;数组的初始化 &#x1f36d;数组的使用 &#x1f449;数组中元素访问 &#x1f449;遍历数组 &#x1f388;数组是引用类型 &#x1f36d;初始JVM的内存分布 &#x1f…

【周赛】第382场周赛

&#x1f525;博客主页&#xff1a; A_SHOWY&#x1f3a5;系列专栏&#xff1a;力扣刷题总结录 数据结构 云计算 数字图像处理 力扣每日一题_ 从这一场&#xff08;第382场周赛&#xff09;周赛开始记录&#xff0c;目标是尽快达到准确快速AC前三道题&#xff0c;每场比赛…

Windows XP x86 sp3 安装 Google Chrome 49.0.2623.112 (正式版本) (32 位)

1 下载地址&#xff1b; https://dl.google.com/release2/h8vnfiy7pvn3lxy9ehfsaxlrnnukgff8jnodrp0y21vrlem4x71lor5zzkliyh8fv3sryayu5uk5zi20ep7dwfnwr143dzxqijv/49.0.2623.112_chrome_installer.exe 2 直接 双击 49.0.2623.112_chrome_installer.exe 安装&#xff1b; 3 …

第二百九十二回

文章目录 1. 概念介绍2. 方法与细节2.1 实现方法2.2 具体细节 3. 示例代码4. 内容总结 我们在上一章回中介绍了"如何混合选择图片和视频文件"相关的内容&#xff0c;本章回中将介绍如何混合选择多个图片和视频文件.闲话休提&#xff0c;让我们一起Talk Flutter吧。 1…

BGP:04 fake-as

使用 fake-as 可以将本地真实的 AS 编号隐藏&#xff0c;其他 AS 内的对等体在指定本端对等体所在的AS 编号时&#xff0c;应该设置成这个伪AS 编号。 这是实验拓扑&#xff0c;IBGP EBGP 邻居都使用物理接口来建立 基本配置&#xff1a; R1: sys sysname R1 int loo0 ip add…

带libc源码gdb动态调试(导入glibc库使得可执行文件动态调试时可看见调用库函数源码)

文章目录 查看源码是否编译时有-g调试信息和符号表在 gdb 中加载 debug 文件/符号表将 debug 文件放入 ".debug" 文件夹通过 gdb 命令 set debug-file-directory directories GCC的gcc和g区别指定gcc/g&#xff0c;glibc的版本进行编译指定gcc/g的版本指定glibc的和l…

小电影网站上线之nginx配置不带www域名301重定向到www域名+接入腾讯云安全防护edgeone

背景 写了个电影网站&#xff08;纯粹搞着玩的&#xff09;&#xff0c;准备买个域名然后上线&#xff0c;但是看日志经常被一些恶意IP进行攻击&#xff0c;这里准备接入腾讯云的安全以及加速产品edgeone&#xff0c;记录下当时的步骤。 一、nginx配置重定向以及日志格式 ng…

【数据分析】Excel中使用VBA进行宏编程

目录 0 准备工作1 VBA简介1.1 Excel VBA应用程序的构成1.2 事件驱动1.3 宏1.3.1 创建宏1.3.2 宏安全 2 VBA基础2.1 注释2.2 数据类型2.2.1 基本数据类型2.2.2 枚举类型2.2.3 用户自定义数据类型 2.2 变量2.3 常量2.4 运算符2.5 程序结构2.6 过程2.7 函数 3 Excel应用程序开发流…

【Apollo CyberRT】源码分析之 “component” 模块

代码位置 apollo/cyber/component 功能 在自动驾驶系统中&#xff0c;模块&#xff08;如感知、定位、控制系统等&#xff09;在 Cyber ​​RT 下以 Component 的形式存在。不同 Component 之间通过 Channel 进行通信。Component 概念不仅解耦了模块&#xff0c;还为将模块拆…

实现图片分块化(使用einops库)

背景介绍 在进行机器学习的模型训练任务的时候&#xff0c;针对图像数据集的处理&#xff0c;常常会对数据集进行分块的操作&#xff1b;具体到模型结构中&#xff0c;在ViT框架中&#xff0c;把每一个图像看作是一个的patch&#xff0c;每一个patch可以当作是一个NLP领域的一…

mac上搭建hbase伪集群

1. 前言 之前我们已经搭建过了 hbase单点环境&#xff0c;(单机版搭建参见&#xff1a; https://blog.csdn.net/a15835774652/article/details/135569456) 但是 为了模拟一把集群环境 我们还是尝试搭建一个伪集群版 2. 环境准备 jdk环境 1.8hdfs &#xff08;hadoop环境 可选…

Android双指缩放ScaleGestureDetector检测放大因子大图移动到双指中心点ImageView区域中心,Kotlin(2)

Android双指缩放ScaleGestureDetector检测放大因子大图移动到双指中心点ImageView区域中心&#xff0c;Kotlin&#xff08;2&#xff09; 在 Android ScaleGestureDetector检测双指缩放Bitmap基于Matrix动画移动到双指捏合中心点ImageView区域中心&#xff0c;Kotlin-CSDN博客 …

程序员开发要素—Java篇

这是个预留板块&#xff0c;打算写一写作为程序员的基本要素和技术。主要包含基本工具&#xff0c;基础知识&#xff0c;基础插件应用&#xff0c;环境搭建等内容。 具体内容后续补充完整&#xff0c;Ps&#xff1a;请假申请已提交https://blog.csdn.net/qq_18237141/article/…

跟着小德学C++之TOTP

嗨&#xff0c;大家好&#xff0c;我是出生在达纳苏斯的一名德鲁伊&#xff0c;我是要立志成为海贼王&#xff0c;啊不&#xff0c;是立志成为科学家的德鲁伊。最近&#xff0c;我发现我们所处的世界是一个虚拟的世界&#xff0c;并由此开始&#xff0c;我展开了对我们这个世界…

网络安全B模块(笔记详解)- 越权与下载

1.使用渗透机场景kali中工具扫描服务器场景,将web端口号当作Flag提交; 2.使用渗透机场景windows7访问服务器场景mingling.php,将页面中的Flag提交; 3.使用渗透机场景windows7访问服务器场景mingling.php,分析页面内容,查看系统配置信息,并将产品id的最后5位数作为Flag提…

NLP自然语言处理的发展:从初创到人工智能的里程碑

自然语言处理&#xff08;Natural Language Processing&#xff0c;NLP&#xff09;人工智能领域中备受关注的重要分支之一。它使得计算机能够理解、解释和使用人类语言。随着技术的不断发展&#xff0c;NLP经历了从初创时期到深度学习时代的巨大演变&#xff0c;推动了互联网产…

链表相加---链表OJ---两数之和

https://leetcode.cn/problems/add-two-numbers/?envType=study-plan-v2&envId=top-100-liked 对于本题,可以选择用数组实现,那样比较简单;我们这里就用纯链表实现。 纯链表实现有许多细节,比如链表长度不一样,进位,尾结点如果是0我们就要删除尾结点。 首先…

线程调度(Java Android)

关于作者&#xff1a;CSDN内容合伙人、技术专家&#xff0c; 从零开始做日活千万级APP。 专注于分享各领域原创系列文章 &#xff0c;擅长java后端、移动开发、商业变现、人工智能等&#xff0c;希望大家多多支持。 未经允许不得转载 目录 一、导读二、概览2.1、线程的属性 三、…