Flink--6、输出算子(连接到外部系统、文件、kafka、MySQL、自定义Sink)

在这里插入图片描述
                       星光下的赶路人star的个人主页

                      世间真正温煦的春色,都熨帖着大地,潜伏在深谷

文章目录

  • 1、输出算子(Sink)
    • 1.1 连接到外部系统
    • 1.2 输出到文件
    • 1.3 输出到Kafka
    • 1.4 输出到MySQL(JDBC)
    • 1.4 自定义Sink输出

1、输出算子(Sink)

Flink作为数据处理框架,最终还是要把计算处理的结果写入外部储存,为外部应用提供支持。
在这里插入图片描述

1.1 连接到外部系统

Flink的DataStream API专门提供了向外部提供写入数据的方法:addSink。与addSource类似,addSink方法对应着一个“Sink”算子,主要就是用来实现与外部系统连接、并将数据提交写入的。Flink程序中所有对外的输出操作,一般都是利用Sink算子完成的。
Flink1.12以前,Sink算子的创建是通过调用DataStream的.addSink()方法实现的。

stream.addSink(new SinkFunction());

addSink方法同样需要传入一个参数,实现的是SinkFunction接口。在这个接口中只需要重写一个方法invoke(),用来将指定的值写入到外部系统中。这个方法在每条数据记录到来时都会调用。

Flink1.12开始,同样重构了Sink架构,

stream.sinkTo()

当然,Sink多数情况下同样并不需要我们自己实现。之前我们一直在使用的print方法其实就是一种Sink,它表示将数据流写入标准控制台打印输出。Flink官方为我们提供了一部分的框架的Sink连接器。如下图所示,列出了Flink官方目前支持的第三方系统连接器:

在这里插入图片描述
我们可以看到,像Kafka之类流式系统,Flink提供了完美对接,source/sink两端都能连接,可读可写;而对于Elasticsearch、JDBC等数据存储系统,则只提供了输出写入的sink连接器。

除Flink官方之外,Apache Bahir框架,也实现了一些其他第三方系统与Flink的连接器。
在这里插入图片描述
除此以外,就需要用户自定义实现sink连接器了。

1.2 输出到文件

Flink专门提供了一个流式文件系统的连接器:FileSink,为批处理和流处理提供了一个统一的Sink,它可以将分区文件写入Flink支持的文件系统。
FileSink支持行编码(Row-encoded)和批量编码(Bulk-encoded)格式。这两种不同的方式都有各自的构建器(builder),可以直接调用FileSink的静态方法:

  • 行编码: FileSink.forRowFormat(basePath,rowEncoder)。
  • 批量编码: FileSink.forBulkFormat(basePath,bulkWriterFactory)。
public class SinkFile {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 每个目录中,都有 并行度个数的 文件在写入env.setParallelism(2);// 必须开启checkpoint,否则一直都是 .inprogressenv.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);DataGeneratorSource<String> dataGeneratorSource = new DataGeneratorSource<>(new GeneratorFunction<Long, String>() {@Overridepublic String map(Long value) throws Exception {return "Number:" + value;}},Long.MAX_VALUE,RateLimiterStrategy.perSecond(1000),Types.STRING);DataStreamSource<String> dataGen = env.fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(), "data-generator");// 输出到文件系统FileSink<String> fieSink = FileSink// 输出行式存储的文件,指定路径、指定编码.<String>forRowFormat(new Path("f:/tmp"), new SimpleStringEncoder<>("UTF-8"))// 输出文件的一些配置: 文件名的前缀、后缀.withOutputFileConfig(OutputFileConfig.builder().withPartPrefix("atguigu-").withPartSuffix(".log").build())// 按照目录分桶:如下,就是每个小时一个目录.withBucketAssigner(new DateTimeBucketAssigner<>("yyyy-MM-dd HH", ZoneId.systemDefault()))// 文件滚动策略:  1分钟 或 1m.withRollingPolicy(DefaultRollingPolicy.builder().withRolloverInterval(Duration.ofMinutes(1)).withMaxPartSize(new MemorySize(1024*1024)).build()).build();dataGen.sinkTo(fieSink);env.execute();}
}

1.3 输出到Kafka

(1)添加Kafka 连接器依赖
由于我们已经测试过从Kafka数据源读取数据,连接器相关依赖已经引入,这里就不重复介绍了。
(2)启动Kafka集群
(3)编写输出到Kafka的示例代码

输出无key的record:

public class SinkKafka {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 如果是精准一次,必须开启checkpoint(后续章节介绍)env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);SingleOutputStreamOperator<String> sensorDS = env.socketTextStream("hadoop102", 7777);/*** Kafka Sink:* TODO 注意:如果要使用 精准一次 写入Kafka,需要满足以下条件,缺一不可* 1、开启checkpoint(后续介绍)* 2、设置事务前缀* 3、设置事务超时时间:   checkpoint间隔 <  事务超时时间  < max的15分钟*/KafkaSink<String> kafkaSink = KafkaSink.<String>builder()// 指定 kafka 的地址和端口.setBootstrapServers("hadoop102:9092,hadoop103:9092,hadoop104:9092")// 指定序列化器:指定Topic名称、具体的序列化.setRecordSerializer(KafkaRecordSerializationSchema.<String>builder().setTopic("ws").setValueSerializationSchema(new SimpleStringSchema()).build())// 写到kafka的一致性级别: 精准一次、至少一次.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)// 如果是精准一次,必须设置 事务的前缀.setTransactionalIdPrefix("atguigu-")// 如果是精准一次,必须设置 事务超时时间: 大于checkpoint间隔,小于 max 15分钟.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10*60*1000+"").build();sensorDS.sinkTo(kafkaSink);env.execute();}
}

自定义序列化器,实现带key的record:

public class SinkKafkaWithKey {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);env.setRestartStrategy(RestartStrategies.noRestart());SingleOutputStreamOperator<String> sensorDS = env.socketTextStream("hadoop102", 7777);/*** 如果要指定写入kafka的key,可以自定义序列化器:* 1、实现 一个接口,重写 序列化 方法* 2、指定key,转成 字节数组* 3、指定value,转成 字节数组* 4、返回一个 ProducerRecord对象,把key、value放进去*/KafkaSink<String> kafkaSink = KafkaSink.<String>builder().setBootstrapServers("hadoop102:9092,hadoop103:9092,hadoop104:9092").setRecordSerializer(new KafkaRecordSerializationSchema<String>() {@Nullable@Overridepublic ProducerRecord<byte[], byte[]> serialize(String element, KafkaSinkContext context, Long timestamp) {String[] datas = element.split(",");byte[] key = datas[0].getBytes(StandardCharsets.UTF_8);byte[] value = element.getBytes(StandardCharsets.UTF_8);return new ProducerRecord<>("ws", key, value);}}).setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE).setTransactionalIdPrefix("atguigu-").setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10 * 60 * 1000 + "").build();sensorDS.sinkTo(kafkaSink);env.execute();}
}

运行代码,在Linux主机启动一个消费者,查看是否收到数据

bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic ws

1.4 输出到MySQL(JDBC)

(1)添加依赖

<!--mysql驱动 -->
<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.27</version>
</dependency>
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc</artifactId><version>3.1.0-1.17</version>
</dependency>

(2)启动MySQL,在test库下建表

CREATE TABLE `ws` (`id` varchar(100) NOT NULL,`ts` bigint(20) DEFAULT NULL,`vc` int(11) DEFAULT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8

(3)输出到MySQL的示例代码

public class SinkMySQL {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("hadoop102", 7777).map(new WaterSensorMapFunction());/*** TODO 写入mysql* 1、只能用老的sink写法: addsink* 2、JDBCSink的4个参数:*    第一个参数: 执行的sql,一般就是 insert into*    第二个参数: 预编译sql, 对占位符填充值*    第三个参数: 执行选项 ---》 攒批、重试*    第四个参数: 连接选项 ---》 url、用户名、密码*/SinkFunction<WaterSensor> jdbcSink = JdbcSink.sink("insert into ws values(?,?,?)",new JdbcStatementBuilder<WaterSensor>() {@Overridepublic void accept(PreparedStatement preparedStatement, WaterSensor waterSensor) throws SQLException {//每收到一条WaterSensor,如何去填充占位符preparedStatement.setString(1, waterSensor.getId());preparedStatement.setLong(2, waterSensor.getTs());preparedStatement.setInt(3, waterSensor.getVc());}},JdbcExecutionOptions.builder().withMaxRetries(3) // 重试次数.withBatchSize(100) // 批次的大小:条数.withBatchIntervalMs(3000) // 批次的时间.build(),new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl("jdbc:mysql://hadoop102:3306/test?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8").withUsername("root").withPassword("000000").withConnectionCheckTimeoutSeconds(60) // 重试的超时时间.build());sensorDS.addSink(jdbcSink);env.execute();}
}

(4)运行代码,用客户端连接MySQL,查看是否成功写入数据。

1.4 自定义Sink输出

如果我们想将数据存储到我们自己的存储设备中,而Flink并没有提供可以直接使用的连接器,就只能自定义Sink进行输出了。与Source类似,Flink为我们提供了通用的SinkFunction接口和对应的RichSinkDunction抽象类,只要实现它,通过简单地调用DataStream的.addSink()方法就可以自定义写入任何外部存储。

stream.addSink(new MySinkFunction<String>());

在实现SinkFunction的时候,需要重写的一个关键方法invoke(),在这个方法中我们就可以实现将流里的数据发送出去的逻辑。
这种方式比较通用,对于任何外部存储系统都有效;不过自定义Sink想要实现状态一致性并不容易,所以一般只在没有其它选择时使用。实际项目中用到的外部连接器Flink官方基本都已实现,而且在不断地扩充,因此自定义的场景并不常见。
在这里插入图片描述
                      您的支持是我创作的无限动力

在这里插入图片描述
                      希望我能为您的未来尽绵薄之力

在这里插入图片描述
                      如有错误,谢谢指正;若有收获,谢谢赞美

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/88625.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【数学建模】2023华为杯研究生数学建模F题思路详解

强对流降水临近预报 我国地域辽阔&#xff0c;自然条件复杂&#xff0c;因此灾害性天气种类繁多&#xff0c;地区差异大。其中&#xff0c;雷雨大风、冰雹、龙卷、短时强降水等强对流天气是造成经济损失、危害生命安全最严重的一类灾害性天气[1]。以2022年为例&#xff0c;我国…

vue_Delete `␍`eslint(prettier/prettier)

Delete ␍eslint(prettier/prettier) 错误的解决方案 问题背景 在Windows笔记本上新拉完代码&#xff0c;在执行pre-commit时&#xff0c;出现如下错误&#xff1a; Delete ␍eslint(prettier/prettier)问题根源 罪魁祸首是git的一个配置属性&#xff1a;core.autocrlf 由于…

MATLAB实战 | 粮食储仓的通风控制问题

粮食储仓的通风控制问题 01、应用实战 【例1】粮食储仓的通风控制问题。在粮食储备中&#xff0c;合适的湿度是保证粮食质量的前提。一般来说&#xff0c;若粮食水分的吸收和蒸发量相等&#xff0c;这个湿度称为平衡点湿度。只有实际湿度处于平衡点湿度以下&#xff0c;粮食质…

CSS笔记——基本语法及相关知识

CSS层叠样式表是用于定义 HTML 或 XML 文档的样式和布局的语言。它可以让开发者更加灵活地控制页面元素的样式和排版&#xff0c;从而提高页面的可读性和用户体验 一、css样式书写顺序和规范 CSS样式的书写顺序和规范是为了让代码更易读、易维护和易扩展。下面是一些常见的规…

【空间-光谱联合注意网络:多时相遥感图像】

A Spatial–Spectral Joint Attention Network for Change Detection in Multispectral Imagery &#xff08;一种用于多光谱图像变化检测的空间-光谱联合注意网络&#xff09; 变化检测是通过比较双时相图像来确定和评估变化&#xff0c;这是遥感领域的一项具有挑战性的任务…

MySQL强制使用索引的两种方式及优化索引,使用MySQL存储过程创建测试数据。

一、MySQL强制使用索引的两种方式 1、使用 FORCE INDEX 语句&#xff1a; explainselect*fromtbl_test force index (index_item_code)where(item_code between 1 and 1000) and (random between 50000 and 1000000)order byrandomlimit 1; 使用 FORCE INDEX&#xff08;索引…

链表(单链表、双链表)

前言&#xff1a;链表是算法中比较难理解的部分&#xff0c;本博客记录单链表、双链表学习&#xff0c;理解节点和指针的使用&#xff0c;主要内容包括&#xff1a;使用python创建链表、实现链表常见的操作。 目录 单链表 双链表 单链表 引入链表的背景&#xff1a; 先来看…

使用ElementUI结合Vue完善主页的导航菜单和书籍管理以及后台数据分页查询

目录 动态树 数据表 案列 书籍管理 动态树 动态树&#xff08;Dynamic tree&#xff09;是一种数据结构&#xff0c;它可以在树中动态地插入、删除和修改节点。与静态树不同&#xff0c;静态树的节点是固定的&#xff0c;一旦构建完成就无法再进行修改。而动态树可以在运行时…

任意文件的上传和下载

1.任意文件下载&#xff08;高危&#xff09; 定义 一些网站由于业务需求&#xff0c;往往需要提供文件查看或文件下载功能&#xff0c;但若对用户查看或下载的文件不做限制&#xff0c;则恶意用户就能够查看或下载任意敏感文件&#xff0c;这就是文件查看与下载漏洞。 可以下载…

OpenCV显示10bit Raw数据

参考&#xff1a;10 12 14bit图像存储格式&#xff0c;利用Opencv显示10bit Raw数据,并根据鼠标的移动显示对应位置的灰度值。其他bit位数的Raw数据方法类似。 代码实现&#xff1a; #include<opencv2/opencv.hpp> #include<iostream> #include<opencv/highgu…

【Vue.js】使用Element入门搭建登入注册界面axios中GET请求与POST请求跨域问题

一&#xff0c;ElementUI是什么&#xff1f; Element UI 是一个基于 Vue.js 的桌面端组件库&#xff0c;它提供了一套丰富的 UI 组件&#xff0c;用于构建用户界面。Element UI 的目标是提供简洁、易用、美观的组件&#xff0c;同时保持灵活性和可定制性 二&#xff0c;Element…

一创聚宽的实盘就要关闭了,有没有好用的实盘平台推荐

挺多的&#xff0c;比较普遍的是QMT和Ptrade&#xff0c;python语言&#xff0c;易上手&#xff0c;通用性好&#xff0c;要说适用性可以考虑Ptrade&#xff0c;问一下你的客户经理有没有&#xff0c;用Ptrade的券商也多&#xff0c;如果之前用一创聚宽你可以无缝切换&#xff…

【工具使用】Audition软件导入.sesx文件报错问题

一&#xff0c;简介 本文主要介绍了在使用Audition导入新的wav文件后&#xff0c;保存&#xff0c;然后再打开.sesx文件时报错&#xff1a;“ 错误: 文件已损坏或使用了不受支持的格式 XML FATAL ERROR: (line: 2835, col: 69) [ D:\Project\AE_Y2311\16channel_test\16_chann…

【趣味JavaScript】5年前端开发都没有搞懂toString和valueOf这两个方法!

&#x1f680; 个人主页 极客小俊 ✍&#x1f3fb; 作者简介&#xff1a;web开发者、设计师、技术分享博主 &#x1f40b; 希望大家多多支持一下, 我们一起进步&#xff01;&#x1f604; &#x1f3c5; 如果文章对你有帮助的话&#xff0c;欢迎评论 &#x1f4ac;点赞&#x1…

基于微信小程序的语言课学习系统设计与实现(源码+lw+部署文档+讲解等)

前言 &#x1f497;博主介绍&#xff1a;✌全网粉丝10W,CSDN特邀作者、博客专家、CSDN新星计划导师、全栈领域优质创作者&#xff0c;博客之星、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java、小程序技术领域和毕业项目实战✌&#x1f497; &#x1f447;&#x1f3fb;…

备受以太坊基金会青睐的 Hexlink,构建亿级用户涌入 Web3的入口

早在 2021 年 9 月&#xff0c;以太坊创始人 Vitalik Buterin 就曾提出了 EIP-4337&#xff08;账户抽象&#xff09;提案&#xff0c;并在去年 10 月对该提案进一步更新&#xff0c;引发行业的进一步关注。在今年 3 月&#xff0c;EIP-4337 提案正式通过审计&#xff0c;并成为…

解决电脑桌面软件图标变白的问题

文章目录 前言一、软件图标变白的原因二、解决方法1、显示隐藏项目2、清除图标缓存 前言 桌面软件太多了&#xff0c;导致有些杂乱&#xff0c;换了个显示器后&#xff0c;想着将桌面的软件分类&#xff0c;将其放到不同的目录下&#xff0c;结果有些软件放入文件夹后图标变成…

BERT: 面向语言理解的深度双向Transformer预训练

参考视频&#xff1a; BERT 论文逐段精读【论文精读】_哔哩哔哩_bilibili 背景 BERT算是NLP里程碑式工作&#xff01;让语言模型预训练出圈&#xff01; 使用预训练模型做特征表示的时候一般有两类策略&#xff1a; 1. 基于特征 feature based &#xff08;Elmo&#xff09;…

SQLAlchemy关联表删除策略设置

目录 SQLAlchemy关联表 常用的级联选项 外键 SQLAlchemy关联表 SQLAlchemy 是一个 Python 的 ORM&#xff08;对象关系映射&#xff09;库&#xff0c;它允许你在 Python 中使用类来表示数据库中的表&#xff0c;从而更方便地进行数据库操作。在 SQLAlchemy 中&#xff0c;可…

idea没有maven工具栏解决方法

背景&#xff1a;接手的一些旧项目&#xff0c;有pom文件&#xff0c;但是用idea打开的时候&#xff0c;没有认为是maven文件&#xff0c;所以没有maven工具栏&#xff0c;不能进行重新加载pom文件中的依赖。 解决方法&#xff1a;选中pom.xml文件&#xff0c;右键 选择添加为…