flink sink kafka

接上文:一文说清flink从编码到部署上线
之前写了kafka source,现在补充kafka sink。完善kafka相关操作。

环境说明:MySQL:5.7;flink:1.14.0;hadoop:3.0.0;操作系统:CentOS 7.6;JDK:1.8.0_401;kafka_2.12-2.5.0。

1. kafka 创建 topic

topic:rv-test-sink。
在这里插入图片描述

2.添加依赖

<!--flink cdc kafka 相关依赖--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.11</artifactId><version>${flink.version}</version></dependency>

3.创建运行环境

package com.zl.utils;import org.apache.flink.configuration.Configuration;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import java.time.Duration;
import java.time.ZoneOffset;
import java.util.concurrent.TimeUnit;/*** EnvUtil* @description:*/
public class EnvUtil {/*** 设置flink执行环境* @param parallelism 并行度*/public static StreamExecutionEnvironment setFlinkEnv(int parallelism) {// System.setProperty("HADOOP_USER_NAME", "用户名") 对应的是 hdfs文件系统目录下的路径:/user/用户名的文件夹名,本文为rootSystem.setProperty("HADOOP_USER_NAME", "root");Configuration conf = new Configuration();conf.setInteger("rest.port", 1000);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);if (parallelism >0 ){//设置并行度env.setParallelism(parallelism);} else {env.setParallelism(1);// 默认1}// 添加重启机制
//        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(50, Time.minutes(6)));// 没有这个配置,会导致“Flink 任务没报错,但是无法同步数据到doris”。// 启动checkpoint,设置模式为精确一次 (这是默认值),10*60*1000=60000env.enableCheckpointing(60000, CheckpointingMode.EXACTLY_ONCE);//rocksdb状态后端,启用增量checkpointenv.setStateBackend(new EmbeddedRocksDBStateBackend(true));//设置checkpoint路径CheckpointConfig checkpointConfig = env.getCheckpointConfig();// 同一时间只允许一个 checkpoint 进行(默认)checkpointConfig.setMaxConcurrentCheckpoints(1);//最小间隔,10*60*1000=60000checkpointConfig.setMinPauseBetweenCheckpoints(60000);// 取消任务后,checkpoint仍然保存checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);//checkpoint容忍失败的次数checkpointConfig.setTolerableCheckpointFailureNumber(5);//checkpoint超时时间 默认10分钟checkpointConfig.setCheckpointTimeout(TimeUnit.MINUTES.toMillis(10));//禁用operator chain(方便排查反压)env.disableOperatorChaining();return env;}public static StreamTableEnvironment getFlinkTenv(StreamExecutionEnvironment env) {StreamTableEnvironment tenv = StreamTableEnvironment.create(env);//设置时区 东八tenv.getConfig().setLocalTimeZone(ZoneOffset.ofHours(8));Configuration configuration = tenv.getConfig().getConfiguration();// 开启miniBatchconfiguration.setString("table.exec.mini-batch.enabled", "true");// 批量输出的间隔时间configuration.setString("table.exec.mini-batch.allow-latency", "5 s");// 防止OOM设置每个批次最多缓存数据的条数,可以设为2万条configuration.setString("table.exec.mini-batch.size", "20000");// 开启LocalGlobalconfiguration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE");//设置TTL API指定tenv.getConfig().setIdleStateRetention(Duration.ofHours(25));return tenv;}}

4.核心代码

package com.zl.kafka;import com.alibaba.fastjson.JSONObject;
import com.zl.utils.EnvUtil;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.kafka.clients.producer.ProducerRecord;import javax.annotation.Nullable;
import java.nio.charset.StandardCharsets;
import java.util.Properties;public class KafkaExampleSink {public static void main(String[] args) throws Exception {// 配置运行环境,并行度1StreamExecutionEnvironment env = EnvUtil.setFlinkEnv(1);// 程序间隔离,每个程序单独设置env.getCheckpointConfig().setCheckpointStorage("hdfs://10.86.97.191:9000/flinktest/KafkaExampleSink");/// ===== 构造kafka sink =====// 相关参数配置可以参考下面这两个文档:①https://cloud.tencent.com/developer/article/2089393// ②https://www.bilibili.com/opus/819228616166473783// kafka配置Properties prop = new Properties();prop.setProperty("bootstrap.servers", "10.86.97.21:9092,10.86.97.21:9093,10.86.97.21:9094");// 当设置为“true”时,生产者将确保流中只写入每条消息的一个副本。prop.setProperty("enable.idempotence", "true");// 指定了生产者在接收到服务器相应之前可以发送多个消息,值越高,占用的内存越大,// 当然也可以提升吞吐量,发生错误时,可能会造成数据的发送顺序改变,其默认值是5.prop.setProperty("max.in.flight.requests.per.connection", "5");prop.setProperty("acks", "all");// 在kafka中消息发送失败时,指定生产者可以重发消息的次数,默认情况下,// 生产者在每次重试之间默认等待100ms,可以通过参数retey.backoff.ms参数来改变这个时间间隔。retries的缺省值:0.prop.setProperty("retries", "5");// 事务超时时间prop.setProperty("transaction.timeout.ms", 15 * 60 * 1000 + "");String topic = "rv-test-sink";FlinkKafkaProducer<String> flinkKafkaProducer = new FlinkKafkaProducer<String>(topic,// topicnew KafkaSerializationSchema<String>() {@Overridepublic ProducerRecord<byte[], byte[]> serialize(String s, @Nullable Long aLong) {return new ProducerRecord<>(topic, s.getBytes(StandardCharsets.UTF_8));}},prop,FlinkKafkaProducer.Semantic.EXACTLY_ONCE);/// ===== 构造模拟数据 =====JSONObject rvJsonObject = new JSONObject();rvJsonObject.put("dt","2024-12-20");// 日期取当天rvJsonObject.put("uuid","data-stream-1");rvJsonObject.put("report_time",1733881971621L);String mockJson = JSONObject.toJSONString(rvJsonObject);/// ===== sink kafka =====env.fromElements(mockJson).addSink(flinkKafkaProducer).setParallelism(3).name("kafka-sink").uid("kafka-sink");env.execute("kafka-sink-job");}// main}

5.运行

由于不是持续输入流,运行完会结束。
在这里插入图片描述
sink到kafka的数据如下:
在这里插入图片描述

6.完整代码

完整代码见:完整代码

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/64978.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

WebRTC搭建与应用(五)-Coturn踩坑记

WebRTC搭建与应用(五)-Coturn踩坑记 近期由于项目需要在研究前端WebGL渲染转为云渲染&#xff0c;借此机会对WebRTC等有了初步了解&#xff0c;在此记录一下&#xff0c;以防遗忘。 第五章 WebRTC搭建与应用(五)-Coturn踩坑记 文章目录 WebRTC搭建与应用(五)-Coturn踩坑记前…

@vue/cli启动异常:ENOENT: no such file or directory, scandir

参考:https://blog.csdn.net/qq_44355188/article/details/122239566 首先异常报错是&#xff1a;ENOENT: no such file or directory, scandir ‘D:\Data\Project\VueProject\hello\node_modulesvue\cli-plugin-eslint\locales’&#xff1b;我的vue/cli版本是 4.5.15 重点是…

Git进阶:本地或远程仓库如何回滚到之前的某个commit

在Git的使用过程中&#xff0c;我们经常会遇到需要回滚到之前某个commit的情况。无论是为了修复错误、撤销更改&#xff0c;还是为了重新组织代码&#xff0c;回滚到特定commit都是一个非常有用的技能。本文将介绍几种常用的回滚方法&#xff0c;帮助读者更好地掌握Git版本控制…

【java设计模式】1 - 软件设计原则

1&#xff0c;软件设计原则 在软件开发中&#xff0c;为了提高软件系统的可维护性和可复用性&#xff0c;增加软件的可扩展性和灵活性&#xff0c;程序员要尽量根据6条原则来开发程序&#xff0c;从而提高软件开发效率、节约软件开发成本和维护成本。 1.1 开闭原则 对扩展开…

如何安全获取股票实时数据API并在服务器运行?

以下是安全获取股票实时数据 API 并在服务器运行的方法&#xff1a; 选择合适的券商或交易平台 评估自身需求&#xff1a;明确自己的交易策略、交易品种、交易频率等需求&#xff0c;以及对 股票api 的功能、性能、稳定性等方面的要求。调研券商或平台&#xff1a;了解不同券商…

kali切换root用户显示su: Authentication failure解决方案

1.切换root用户显示su: Authentication failure 2.解决方式&#xff1a;使用sudo su命令 3.密码新版的应该都是kali

一篇文章学会HTML

目录 页面结构 网页基本标签 图像标签 超链接标签 文本链接 图像链接 锚链接 功能链接 列表 有序列表 无序列表 自定义列表 表格 跨列/跨行 表头 媒体元素 视频 音频 网站的嵌套 表单 表单元素 文本框 单选框 多选框 按钮 下拉框 文本域和文件域 表…

【开源】一款基于SpringBoot的智慧小区物业管理系统

一、下载项目文件 项目文件源码链接&#xff1a;https://pan.quark.cn/s/3998d958e182如出现网盘空间不够存的情况&#xff01;&#xff01;&#xff01;解决办法是先用夸克手机app注册&#xff0c;然后保存上方链接&#xff0c;就可以得到1TB空间了&#xff01;&#xff01;&…

北理工计算机考研难度分析

总体情况概述 北京理工大学计算机学院2024届考研呈现出学硕扩招、专硕稳定的特点。学硕实际录取27人(含非全统考)&#xff0c;复试线360分&#xff0c;复试录取率76%&#xff1b;计算机技术专硕(不含珠海)实际录取29人&#xff0c;复试线324分&#xff0c;复试录取率86%。两个…

细说STM32F407单片机轮询方式读写SPI FLASH W25Q16BV

目录 一、工程配置 1、时钟、DEBUG 2、GPIO 3、SPI2 4、USART6 5、NVIC 二、软件设计 1、FALSH &#xff08;1&#xff09;w25flash.h &#xff08;2&#xff09; w25flash.c 1&#xff09;W25Q16基本操作指令 2&#xff09;计算地址的辅助功能函数 3&#xff09;器…

Redis+注解实现限流机制(IP、自定义等)

简介 在项目的使用过程中&#xff0c;限流的场景是很多的&#xff0c;尤其是要提供接口给外部使用的时候&#xff0c;但是自己去封装的话&#xff0c;相对比较耗时。 本方式可以使用默认&#xff08;方法&#xff09;&#xff0c;ip、自定义参数进行限流&#xff0c;根据时间…

仿闲鱼的二手交易小程序软件开发闲置物品回收平台系统源码

市场前景 闲置物品交易软件的市场前景广阔&#xff0c;主要基于以下几个方面的因素&#xff1a; 环保意识提升&#xff1a;随着人们环保意识的增强&#xff0c;越来越多的人开始关注资源的循环利用&#xff0c;闲置物品交易因此受到了广泛的关注。消费升级与时尚节奏加快&…

FastJson读取resources下的json文件并且转成对象

读取resources下的json文件并且转成对象 json文件路径是: ​​ ‍ 读取代码 ‍ import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.TypeReference; import lombok.extern.slf4j.Slf4j; import org.apache.commons.…

深圳龙岗戴尔dell r730xd服务器故障维修

深圳龙岗一台DELL POWEREDGE R730XD服务器系统故障问题处理&#xff1a; 1&#xff1a;客户工厂年底产线整改&#xff0c;时不时的会意外断电&#xff0c;导致服务器也频繁停机&#xff0c; 2&#xff1a;多次异常停机后导致服务器开机后windows server系统无法正常启动了&…

绕组识别标签规范

有标签名称的要标记&#xff0c;没有的不用标记 需要标注的工具、器材 图像中文名称标签名称od脱模剂watering can2铁铲shovel1记号笔&#xff0c;白色着重标bluepen/whitepen6纸质标签label3钢尺scale5玻璃纤维带&#xff08;卷&#xff09;红色网格布red grid4白色网格布wh…

中国信通院致信感谢易保全:肯定贡献能力,期许未来合作

近日&#xff0c;中国信息通信研究院&#xff08;以下简称“中国信通院”&#xff09;向易保全发感谢信表达谢意&#xff0c;对其在中国信通院牵头的“铸基计划”——企业数字化转型高质量发展推进行动实施中展现出的重要贡献给予了高度评价和肯定&#xff0c;并展望了双方至20…

WebRTC服务质量(08)- 重传机制(05) RTX机制

一、前言&#xff1a; RTX协议&#xff08;Retransmission&#xff0c;即重传协议&#xff09;是 WebRTC 中用于处理丢包恢复的一部分。由于网络通信中的丢包不可避免&#xff0c;WebRTC RTP协议栈支持多种丢包恢复机制&#xff0c;其中之一便是通过RTX协议实现的重传机制。 …

国自然联合项目|影像组学智能分析理论与关键技术|基金申请·24-12-25

小罗碎碎念 该项目为国自然联合基金项目&#xff0c;执行年限为2019年1月至2022年12月&#xff0c;直接费用为204万元。 项目研究内容包括影像组学分析、智能计算、医疗风险评估等&#xff0c;旨在通过模拟医生诊断过程&#xff0c;推动人工智能在医疗领域的创新。 项目取得了…

轮播图带详情插件、uniApp插件

超级好用的轮播图 介绍访问地址参数介绍使用方法&#xff08;简单使用&#xff0c;参数结构点击链接查看详情&#xff09;图片展示 介绍 带有底部物品介绍以及价格的轮播图组件&#xff0c;持续维护&#xff0c;uniApp插件&#xff0c;直接下载填充数据就可以在项目里面使用 …

Java 本地缓存实现:Guava Cache、Caffeine、Ehcache 和 Spring Cache

文章目录 一、引言二、Guava Cache理论介绍实战演示 三、Caffeine理论介绍实战演示 四、Ehcache理论介绍实战演示 五、Spring Cache理论介绍实战演示 六、总结 一、引言 在现代应用程序开发中&#xff0c;缓存是提高性能和响应速度的关键技术之一。Java 提供了多种本地缓存解决…