[flink 实时流基础] 输出算子(Sink)

学习笔记
Flink作为数据处理框架,最终还是要把计算处理的结果写入外部存储,为外部应用提供支持。
image.png


文章目录

      • **连接到外部系统**
      • **输出到文件**
      • 输出到 Kafka
      • 输出到 mysql
      • 自定义 sink

连接到外部系统

Flink的DataStream API专门提供了向外部写入数据的方法:addSink。与addSource类似,addSink方法对应着一个“Sink”算子,主要就是用来实现与外部系统连接、并将数据提交写入的;Flink程序中所有对外的输出操作,一般都是利用Sink算子完成的。
Flink1.12以前,Sink算子的创建是通过调用DataStream的.addSink()方法实现的。
stream.addSink(new SinkFunction(…));
addSink方法同样需要传入一个参数,实现的是SinkFunction接口。在这个接口中只需要重写一个方法invoke(),用来将指定的值写入到外部系统中。这个方法在每条数据记录到来时都会调用。
Flink1.12开始,同样重构了Sink架构,
stream.sinkTo(…)
当然,Sink多数情况下同样并不需要我们自己实现。之前我们一直在使用的print方法其实就是一种Sink,它表示将数据流写入标准控制台打印输出。Flink官方为我们提供了一部分的框架的Sink连接器。如下图所示,列出了Flink官方目前支持的第三方系统连接器:

https://nightlies.apache.org/flink/flink-docs-release-1.18/zh/docs/connectors/datastream/overview/
image.png

我们可以看到,像Kafka之类流式系统,Flink提供了完美对接,source/sink两端都能连接,可读可写;而对于Elasticsearch、JDBC等数据存储系统,则只提供了输出写入的sink连接器。
除Flink官方之外,Apache Bahir框架,也实现了一些其他第三方系统与Flink的连接器。
image.png
除此以外,就需要用户自定义实现sink连接器了。

输出到文件

Flink专门提供了一个流式文件系统的连接器:FileSink,为批处理和流处理提供了一个统一的Sink,它可以将分区文件写入Flink支持的文件系统。
FileSink支持行编码(Row-encoded)和批量编码(Bulk-encoded)格式。这两种不同的方式都有各自的构建器(builder),可以直接调用FileSink的静态方法:

  • 行编码: FileSink.forRowFormat(basePath,rowEncoder)。
  • 批量编码: FileSink.forBulkFormat(basePath,bulkWriterFactory)。
public class SinkFile {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 每个目录中,都有 并行度个数的 文件在写入env.setParallelism(2);// 必须开启checkpoint,否则一直都是 .inprogressenv.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);DataGeneratorSource<String> dataGeneratorSource = new DataGeneratorSource<>(new GeneratorFunction<Long, String>() {@Overridepublic String map(Long value) throws Exception {return "Number:" + value;}},Long.MAX_VALUE,RateLimiterStrategy.perSecond(1000),Types.STRING);DataStreamSource<String> dataGen = env.fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(), "data-generator");// 输出到文件系统FileSink<String> fieSink = FileSink// 输出行式存储的文件,指定路径、指定编码.<String>forRowFormat(new Path("f:/tmp"), new SimpleStringEncoder<>("UTF-8"))// 输出文件的一些配置: 文件名的前缀、后缀.withOutputFileConfig(OutputFileConfig.builder().withPartPrefix("atguigu-").withPartSuffix(".log").build())// 按照目录分桶:如下,就是每个小时一个目录.withBucketAssigner(new DateTimeBucketAssigner<>("yyyy-MM-dd HH", ZoneId.systemDefault()))// 文件滚动策略:  1分钟 或 1m.withRollingPolicy(DefaultRollingPolicy.builder().withRolloverInterval(Duration.ofMinutes(1)).withMaxPartSize(new MemorySize(1024*1024)).build()).build();dataGen.sinkTo(fieSink);env.execute();}
}

输出到 Kafka

(1)添加Kafka 连接器依赖
由于我们已经测试过从Kafka数据源读取数据,连接器相关依赖已经引入,这里就不重复介绍了。
(2)启动Kafka集群
(3)编写输出到Kafka的示例代码

public class SinkKafka {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 如果是精准一次,必须开启checkpoint(后续章节介绍)env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);SingleOutputStreamOperator<String> sensorDS = env.socketTextStream("hadoop102", 7777);/*** Kafka Sink:* TODO 注意:如果要使用 精准一次 写入Kafka,需要满足以下条件,缺一不可* 1、开启checkpoint(后续介绍)* 2、设置事务前缀* 3、设置事务超时时间:   checkpoint间隔 <  事务超时时间  < max的15分钟*/KafkaSink<String> kafkaSink = KafkaSink.<String>builder()// 指定 kafka 的地址和端口.setBootstrapServers("hadoop102:9092,hadoop103:9092,hadoop104:9092")// 指定序列化器:指定Topic名称、具体的序列化.setRecordSerializer(KafkaRecordSerializationSchema.<String>builder().setTopic("ws").setValueSerializationSchema(new SimpleStringSchema()).build())// 写到kafka的一致性级别: 精准一次、至少一次.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)// 如果是精准一次,必须设置 事务的前缀.setTransactionalIdPrefix("atguigu-")// 如果是精准一次,必须设置 事务超时时间: 大于checkpoint间隔,小于 max 15分钟.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10*60*1000+"").build();sensorDS.sinkTo(kafkaSink);env.execute();}
}

自定义序列化器,实现带key的record:

public class SinkKafkaWithKey {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);env.setRestartStrategy(RestartStrategies.noRestart());SingleOutputStreamOperator<String> sensorDS = env.socketTextStream("hadoop102", 7777);/*** 如果要指定写入kafka的key,可以自定义序列化器:* 1、实现 一个接口,重写 序列化 方法* 2、指定key,转成 字节数组* 3、指定value,转成 字节数组* 4、返回一个 ProducerRecord对象,把key、value放进去*/KafkaSink<String> kafkaSink = KafkaSink.<String>builder().setBootstrapServers("hadoop102:9092,hadoop103:9092,hadoop104:9092").setRecordSerializer(new KafkaRecordSerializationSchema<String>() {@Nullable@Overridepublic ProducerRecord<byte[], byte[]> serialize(String element, KafkaSinkContext context, Long timestamp) {String[] datas = element.split(",");byte[] key = datas[0].getBytes(StandardCharsets.UTF_8);byte[] value = element.getBytes(StandardCharsets.UTF_8);return new ProducerRecord<>("ws", key, value);}}).setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE).setTransactionalIdPrefix("atguigu-").setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10 * 60 * 1000 + "").build();sensorDS.sinkTo(kafkaSink);env.execute();}
}

输出到 mysql

写入数据的MySQL的测试步骤如下。
(1)添加依赖
添加MySQL驱动:

<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.27</version>
</dependency>

官方还未提供flink-connector-jdbc的1.17.0的正式依赖,暂时从apache snapshot仓库下载,pom文件中指定仓库路径:

<repositories><repository><id>apache-snapshots</id><name>apache snapshots</name>
<url>https://repository.apache.org/content/repositories/snapshots/</url></repository>
</repositories>

添加依赖:

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc</artifactId><version>1.17-SNAPSHOT</version>
</dependency>

如果不生效,还需要修改本地maven的配置文件,mirrorOf中添加如下标红内容:

<mirror><id>aliyunmaven</id><mirrorOf>*,!apache-snapshots</mirrorOf><name>阿里云公共仓库</name><url>https://maven.aliyun.com/repository/public</url>
</mirror>

(2)启动MySQL,在test库下建表ws

mysql>
CREATE TABLE ws (
id varchar(100) NOT NULL,
ts bigint(20) DEFAULT NULL,
vc int(11) DEFAULT NULL,
PRIMARY KEY (id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8

(3)编写输出到MySQL的示例代码

public class SinkMySQL {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<WaterSensor> sensorDS = env
.socketTextStream("hadoop102", 7777)
.map(new WaterSensorMapFunction());/*** TODO 写入mysql* 1、只能用老的sink写法: addsink* 2、JDBCSink的4个参数:*    第一个参数: 执行的sql,一般就是 insert into*    第二个参数: 预编译sql, 对占位符填充值*    第三个参数: 执行选项 ---》 攒批、重试*    第四个参数: 连接选项 ---》 url、用户名、密码*/
SinkFunction<WaterSensor> jdbcSink = JdbcSink.sink("insert into ws values(?,?,?)",new JdbcStatementBuilder<WaterSensor>() {@Overridepublic void accept(PreparedStatement preparedStatement, WaterSensor waterSensor) throws SQLException {//每收到一条WaterSensor,如何去填充占位符preparedStatement.setString(1, waterSensor.getId());preparedStatement.setLong(2, waterSensor.getTs());preparedStatement.setInt(3, waterSensor.getVc());}},JdbcExecutionOptions.builder().withMaxRetries(3) // 重试次数.withBatchSize(100) // 批次的大小:条数.withBatchIntervalMs(3000) // 批次的时间.build(),new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl("jdbc:mysql://hadoop102:3306/test?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8").withUsername("root").withPassword("000000").withConnectionCheckTimeoutSeconds(60) // 重试的超时时间.build()
);sensorDS.addSink(jdbcSink);env.execute();
}
}

(4)运行代码,用客户端连接MySQL,查看是否成功写入数据。

自定义 sink

如果我们想将数据存储到我们自己的存储设备中,而Flink并没有提供可以直接使用的连接器,就只能自定义Sink进行输出了。与Source类似,Flink为我们提供了通用的SinkFunction接口和对应的RichSinkDunction抽象类,只要实现它,通过简单地调用DataStream的.addSink()方法就可以自定义写入任何外部存储。
stream.addSink(new MySinkFunction());
在实现SinkFunction的时候,需要重写的一个关键方法invoke(),在这个方法中我们就可以实现将流里的数据发送出去的逻辑。
这种方式比较通用,对于任何外部存储系统都有效;不过自定义Sink想要实现状态一致性并不容易,所以一般只在没有其它选择时使用。实际项目中用到的外部连接器Flink官方基本都已实现,而且在不断地扩充,因此自定义的场景并不常见。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/785958.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

HTTP,Servlet

HTTP 概念&#xff1a;HyperTextTransferProtocol&#xff0c;超文本传输协议&#xff0c;规定了浏览器和服务器之间数据传输的规则 HTTP协议特点&#xff1a; 1.基于TCP协议&#xff1a;面向连接&#xff0c;安全 2.基于请求-响应模型的&#xff1a;一次请求对应一次响应 …

【学习笔记】java项目—苍穹外卖day06

文章目录 苍穹外卖-day06课程内容1. HttpClient1.1 介绍1.2 入门案例1.2.1 GET方式请求1.2.2 POST方式请求 2. 微信小程序开发2.1 介绍2.2 准备工作2.3 入门案例2.3.1 小程序目录结构2.3.2 编写和编译小程序2.3.3 发布小程序 3. 微信登录3.1 导入小程序代码3.2 微信登录流程3.3…

基于Java的车辆出入校预约管理系统设计与实现(论文+源码)_kaic

摘 要 二十一世纪以来我国科技水平得到很大提升&#xff0c;人们对于生活的美好向往更加强烈&#xff0c;而目前的车辆出入校预约管理由于存在管理不规范等缺点&#xff0c;严重影响了校园的安全&#xff0c;因此&#xff0c;需要设计一个车辆出入校预约管理系统对人们出入校园…

帝国CMS十合一源码/字典/成语/古诗词/二十四节气/英语单词/百家姓/范文文库/词语等

帝国CMS十合一源码/字典/成语/古诗词/二十四节气/英语单词/百家姓/范文文库/词语等 功能包含: 成语大全 二十四节气 英语单词 古诗词 近反义词 词语造句 汉语字典 英文缩写 百家姓 范文文库 文件目录:1个数据库 1个系统源码 1个伪静态规则 安装方式:把1.2G的…

跑spark的yarn模式时RM连不上的情况

在linux控制台跑spark on yarn一个测试案例&#xff0c;日志中总显示RM连yarn服务的时候是&#xff1a;0.0.0.0:8032 具体情况如下图&#xff1a; 我问题出现的原因&#xff0c;总结如下&#xff1a; 1.防火墙没关闭&#xff0c;关闭 2.spark-env.sh这个文件的YARN_CONF_DIR…

T1 神奇苹果桶 (25分) - 小米前端笔试编程题解

考试平台&#xff1a; 赛码 题目类型&#xff1a; 20道选择 2道编程题 考试时间&#xff1a; 2024-03-23 &#xff08;两小时&#xff09; 题目描述 小希在森林冒险的时候发现一个神奇的木桶&#xff0c;某些时会凭空出现一些苹果&#xff0c;小希很解地大家分享了这一个神奇…

CCIE-01-VLAN-Trunk

目录 实验条件网络拓朴逻辑拓扑物理拓扑实验目的 开始配置配置SW1配置SW2检验证配置结果 实验条件 网络拓朴 逻辑拓扑 物理拓扑 实验目的 SW1和SW2之间的E2/0-3配置为trunk&#xff0c;使用802.1q协议&#xff0c;不需要配置捆绑根据逻辑图和物理图标识&#xff0c;使得R1~R7…

蓝桥杯(更新中)

递归与递推 递归 1.指数型枚举 解析&#xff1a;从 1 ∼ n 这 n 个整数中随机选取任意多个&#xff0c;输出所有可能的选择方案。 思路&#xff1a;枚举每一位对应的数字选与不选&#xff0c;例如&#xff1a;第一位对应的数字为1&#xff0c;有一种方案是选1&#xff0c;另…

Gitea的简单介绍

1、Gitea&#xff08;Gitea - 轻量级全功能 DevSecOps 平台&#xff09; Gitea 是一个基于 Go 语言编写的轻量级、开源、自托管的 Git 服务软件&#xff0c;它的设计目标是易于安装、快速运行并且提供出色的用户体验。Gitea 提供了一个类似于 GitHub 或 GitLab 的 web 界面&…

ZKFair 步入Dargon Slayer 新阶段,未来还有哪些财富效应?

在当前区块链技术的发展中&#xff0c;Layer 2&#xff08;L2&#xff09;解决方案已成为提高区块链扩容性、降低交易成本和提升交易速度的关键技术&#xff0c;但它仍面临一些关键问题和挑战&#xff0c;例如用户体验的改进、跨链互操作性、安全性以及去中心化程度。在这些背景…

Python 全栈体系【四阶】(十八)

第五章 深度学习 一、基本理论 4. 神经网络的改进 4.1 神经网络的局限 全连接神经网络的局限&#xff08;一&#xff09; 未考虑数据的“形状”&#xff0c;会破坏数据空间结构。例如&#xff0c;输入数据是图像时&#xff0c;图像通常是高长通道方向上的 3 维形状。但是&a…

皓学IT:WEB07_ JSP

一、Jsp基础语法 1.1. JSP模板元素 JSP页面中的HTML内容称之为JSP模版元素。 JSP模版元素定义了网页的基本骨架&#xff0c;即定义了页面的结构和外观。 1.2. JSP脚本片段 JSP脚本片断用于在JSP页面中编写多行Java代码&#xff08;在<%%>不能定义方法&#xff09;。…

VisionOS应用开发需要哪些工具

标题: VisionOS应用开发需要哪些工具 标签: [VisionOS, 空间计算] 分类: [VisionOS, 开发工具] 说下开发visionOS空间应用需要哪些准备&#xff0c;这里我找了下&#xff0c;列出来给大家。 xcode 15.22d: SwiftUI3d: RealityKit/Unity 3D实体空间: ARKitIntel Mac上可以运行X…

mysql8.0下载安装详细步骤 图文教程

下载mysql 保证电脑上之前没有安装过mysql&#xff0c;或者已经卸载完毕。 mysqk8.0 下载地址 解压 新建一个专门存放mysql文件夹&#xff0c;将下载的压缩包解压到这个文件夹里面。 配置 添加一个data文件夹&#xff0c;用来存放数据 新建一个my.txt文本&#xff0c;复制…

C++语言·入门

现在我们讲完了数据结构初阶部分的内容&#xff0c;数据结构剩下的内容会在C语言讲解的差不多的时候加入。 1. 什么是C C语言是结构化模块化的语言&#xff0c;适合处理较小规模的程序。对于复杂的问题&#xff0c;规模较大的程序&#xff0c;需要高度抽象和建模时&#xff0c…

软件测试-用例篇

目录 1 测试用例的基本要素2 测试用例给我们带来的好处3 测试用例的设计方法3.1 基于需求进行测试用例的设计3.1.1 功能需求测试分析3.1.2 非功能需求测试分析 4 具体的设计方法4.1 等价类4.2 边界值4.3 错误猜测法4.4 场景设计法4.5 因果图4.5.1 因果图需要掌握的基本知识4.5.…

用一个程序解决SQLite常见的各项操作(实用篇)

文章说明&#xff1a; 本篇文章是在之前的一篇文章SQLite3进行数据库各项常用操作基础上写的&#xff0c;将SQLite涉及到的常用的几种操作&#xff0c;以函数的形式处理成相互调用的形式。 因为之前的文章对基础操作已经解释过了&#xff0c;所以这里直接放置可执行代码和结果…

基于YOLOv8的绝缘子检测系统

&#x1f4a1;&#x1f4a1;&#x1f4a1;本文摘要&#xff1a;基于YOLOv8的绝缘子小目标检测&#xff0c;阐述了整个数据制作和训练可视化过程 1.YOLOv8介绍 Ultralytics YOLOv8是Ultralytics公司开发的YOLO目标检测和图像分割模型的最新版本。YOLOv8是一种尖端的、最先进的&a…

kali Linux上安装docker过程记录

安装情况&#xff1a; 直接安装提示错误&#xff01;&#xff01;&#xff01; 安装程序命令&#xff1a; apt install -y docker.io 安装结果提示安装失败&#xff01;&#xff01;&#xff01;看别人安装直接成功到我这怎么失败&#xff01;&#xff01;&#xff01;找原因…

引用,内联函数,auto函数,指针nullptr

一&#xff1a;引用 1.1 该文章的引用是对上一篇引用的进行补充和完善 按理来说&#xff0c;double可以隐式转换为int&#xff0c;那起别名的时候为什么不可以类型转换呢&#xff1f; 那是因为&#xff0c;在类型转换的时候&#xff0c;会创建一个临时变量&#xff0c;让后再…