flink写入到kafka 大坑解析。

1.kafka能不能发送null消息?

   能!

2 flink能不能发送null消息到kafka?

不能!

public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();Properties properties = new Properties();properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"9.135.68.201:9092" );FlinkKafkaProducer<String> flinkKafkaProducer = new FlinkKafkaProducer<>( "cc_test",new SimpleStringSchema(),properties);env.fromCollection(Lists.newArrayList("111", "222", "333")).map(s->{return s.equals("222")?null:s;}).addSink(flinkKafkaProducer);env.execute("ContractLabelJob");
}

 

 

这里就报了java的最常见错误 空指针,原因就是flink要把kafka的消息getbytes。所以flink不能发送null到kafka。

这种问题会造成什么后果?

flink直接挂掉。

如果我们采取了失败重试机制会怎样?

env.setRestartStrategy(  RestartStrategies.fixedDelayRestart(3, Time.seconds(5))  );

数据重复或者丢失。

还有此时kafka的offset由flink在管理, 消费的offset 一直没有被commit,所以一直重复消费。

来个demo

    public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.enableCheckpointing(6000);
//        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);Properties properties = new Properties();properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"9.135.68.201:9092" );FlinkKafkaProducer<String> sink = new FlinkKafkaProducer<>( "cc_test2",new SimpleStringSchema(),properties);KafkaSource<String> source = KafkaSource.<String>builder().setTopics("cc_test").setGroupId("cc_test1234").setBootstrapServers("9.135.68.201:9092").setValueOnlyDeserializer(new SimpleStringSchema()).setStartingOffsets(OffsetsInitializer.earliest()).build();DataStreamSource<String> stringDataStreamSource = env.fromSource(source, WatermarkStrategy.noWatermarks(), "kafka");
//        stringDataStreamSource.print("kafka msg");stringDataStreamSource.addSink(sink);env.execute("test");

 从topic cc_test消费 然后写到cc_test2里面去

cc_test里的数据

 cc_test_2里写入的数据

 可以看到一个null 报错了,然后它分区的333就会一直被提交。

总之大家小心这个问题。

不加检查点 flink报错后就会直接停掉。。

加了检查点env.enableCheckpointing(6000); flink失败后会一直重试

加了重试机制 env.setRestartStrategy(RestartStrategies.failureRateRestart(3,Time.of(5000, TimeUnit.SECONDS),Time.of(5000,TimeUnit.SECONDS))); 失败的任务只会重试几次。

还是得熟悉源码呀。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/13461.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

JavaWeb+jsp+Tomcat的叮当书城项目

点击以下链接获取源码&#xff1a; https://download.csdn.net/download/qq_64505944/88123111?spm1001.2014.3001.5503 技术&#xff1a;ssm jsp JDK1.8 MySQL5.7 Tomcat8.3 源码数据库课程设计 功能&#xff1a;管理员与普通用户和超级管理员三个角色&#xff0c;管理员可…

浅谈 Spring AOP 思想

Spring AOP AOP 切面编程普通代理类JDK动态代理Cglib动态代理AOPAOP术语AOP切面编程的优势Advice通知类型&#xff08;5种&#xff09;通知的执行顺序 Order切入点表达式表达式execution注解annotation Spring事务管理Transactional 及 Transactional 的两个属性Transactional …

常用直线模组的类型

目前&#xff0c;直线模组的应用非常广泛&#xff0c;而且直线模组的种类也有很多可以满足每个行业的应用要求&#xff0c;那么常见的直线模组类型有哪些&#xff0c;大家知道吗&#xff1f; 1、全封闭滚珠丝杆直线模组&#xff1a; 在半封闭式的基础上增加了不锈钢带防尘结构…

[Angular] Import TranslateModule in Angular 16

1.Background Angular 更新至V16版后&#xff0c;支援 standalone&#xff0c;故移除了 NgModule&#xff0c;而TranslateModule 又要在AppModule中 import&#xff0c;那该如何做呢&#xff1f; 2.NPM packages installation npm install ngx-translate/core npm install n…

BUUCTF——reverse3 适合新手的关于base64加密算法代码的分析

作为一个逆向小白&#xff0c;学了点加密算法就来BUU找点乐子&#xff0c;前7题蛮简单的&#xff0c;然后做到了reverse3&#xff0c;典型的base64加密算法&#xff0c;让我折腾了好久&#xff0c;写篇博客记录一下 顺便说下很多博客并没有对这里的加密算法进行代码上的分析&a…

分享VMware Workstation Pro ESXI7创建虚拟机和配置硬盘空间(分享自己的学习历程意在帮助有需要的小伙伴)

背景&#xff1a;因公司项目需求改用VMware Workstation Pro&#xff0c;已经使用1个月目前除了中途出现过一次问题被解决后一直稳定运行至今&#xff0c; 1:这里贴出拿出现的问题提示及解决方法的链接&#xff1a;解决vmWare ESXI 7.3报错; 2:如果你是第一次接触VMware Work…

Echarts 文字太长用省略号代替

xAxis: [{type: category,data: [materialUserEchartsDate.value[0] ? materialUserEchartsDate.value[0].name : ,materialUserEchartsDate.value[1] ? materialUserEchartsDate.value[1].name : ,materialUserEchartsDate.value[2] ? materialUserEchartsDate.value[2].na…

二、前端高德地图、渲染标记(Marker)引入自定义icon,手动设置zoom

要实现这个效果&#xff0c;我们先看一下目前的页面展示&#xff1a; 左边有一个图例&#xff0c;我们可以方法缩小地图&#xff0c;右边是动态的marker标记&#xff0c;到时候肯定时候是后端将对应的颜色标识、文字展示、坐标点给咱们返回、我们肯定可以拿到一个list&#xf…

【广州华锐互动】无人值守变电站AR虚拟测控平台

无人值守变电站AR虚拟测控平台是一种基于增强现实技术的电力设备巡检系统&#xff0c;它可以利用增强现实技术将虚拟信息叠加在真实场景中&#xff0c;帮助巡检人员更加高效地完成巡检任务。这种系统的出现&#xff0c;不仅提高了巡检效率和准确性&#xff0c;还降低了巡检成本…

企业级敏捷转型探索与实践︱极狐Gitlab战略运营部PMO郝韫

极狐Gitlab战略运营部PMO郝韫先生受邀为由PMO评论主办的2023第十二届中国PMO大会演讲嘉宾&#xff0c;演讲议题&#xff1a;企业级敏捷转型探索与实践。大会将于8月12-13日在北京举办&#xff0c;敬请关注&#xff01; 议题简要&#xff1a; 打造持续创新、快速成长的敏捷组织…

python调用百度ai将图片/pdf识别为表格excel

python调用百度ai将图片识别为表格excel 表格文字识别(异步接口)图片转excel 表格文字识别V2图片/pdf转excel通用 表格文字识别(异步接口) 图片转excel 百度ai官方文档&#xff1a;https://ai.baidu.com/ai-doc/OCR/Ik3h7y238 使用的是表格文字识别(异步接口)&#xff0c;同步…

【C#】微软的Roslyn 是个啥?

一、说明 Roslyn 是微软重写的C#编译器并开源。 Roslyn 是 C# 和 Visual Basic.NET 开源编译器的代号。以下是它如何在过去十年企业Microsoft的最黑暗中开始&#xff0c;并成为所有C#&#xff08;和VB&#xff09;的开源&#xff0c;跨平台&#xff0c;公共语言引擎&#xff0c…

springboot+mybatis-plus+vue+element+vant2实现短视频网站,模拟西瓜视频移动端

目录 一、前言 二、管理后台 1.登录 2.登录成功&#xff0c;进入欢迎页 ​编辑 3.视频分类管理 4. 视频标签管理 5.视频管理 6.评论管理 ​编辑 7.用户管理 8.字典管理 &#xff08;类似于后端的枚举&#xff09; 9.参数管理&#xff08;富文本录入&#xff09; 10.管…

Docker容器监控之 CAdvisor+InfluxDB+Granfana

通过docker stats命令可以很方便的看到当前宿主机上所有容器的CPU,内存以及网络流量等数据&#xff0c;一般小公司够用了。但是&#xff0c;docker stats统计结果只能是当前宿主机的全部容器&#xff0c;数据资料是实时的&#xff0c;没有地方存储、没有健康指标过线预警等功能…

VMware搭建Hadoop集群 for Windows(完整详细,实测可用)

目录 一、VMware 虚拟机安装 &#xff08;1&#xff09;虚拟机创建及配置 &#xff08;2&#xff09;创建工作文件夹 二、克隆虚拟机 三、配置虚拟机的网络 &#xff08;1&#xff09;虚拟网络配置 &#xff08;2&#xff09;配置虚拟机 主机名 &#xff08;3&#xf…

(树) 剑指 Offer 26. 树的子结构 ——【Leetcode每日一题】

❓剑指 Offer 26. 树的子结构 难度&#xff1a;中等 输入两棵二叉树 A 和 B&#xff0c;判断 B 是不是 A 的子结构。(约定空树不是任意一个树的子结构) B 是 A 的子结构&#xff0c; 即 A 中有出现和B相同的结构和节点值。 例如: 给定的树 A: 3/ \4 5/ \1 2给定的树 B&…

stable-diffusion-webui汉化教程

第一种方法 1.打开stable diffusion webui&#xff0c;进入"Extensions"选项卡 2.点击"Install from URL" 3、注意"URL for extension’s git repository"下方的输入框 4、填入地址&#xff1a;https://github.com/VinsonLaro/stable-diffus…

C++多线程编程(包含c++20内容)

C多线程编程(包含c20内容) 文章目录 C多线程编程(包含c20内容)线程通过函数指针创建线程通过函数对象创建线程通过lambda创建线程通过成员函数创建线程线程本地存储取消线程自动join线程从线程获得结果 原子操作库原子操作原子智能指针原子引用使用原子类型等待原子变量 互斥互…

[JAVAee]文件操作-IO

本文章讲述了通过java对文件进行IO操作 IO:input/output,输入/输出. 建议配合文章末尾实例食用 目录 文件 文件的管理 文件的路径 文件的分类 文件系统的操作 File类的构造方法 File的常用方法 文件内容的读写 FileInputStream读取文件 构造方法 常用方法 Scan…

平均列顺序对列排斥能的影响

( A, B )---3*30*2---( 1, 0 )( 0, 1 ) 让网络的输入只有3个节点&#xff0c;AB训练集各由5张二值化的图片组成&#xff0c;让A有6个1&#xff0c;B有4个1&#xff0c;并且让这10个1的位置没有重合。比较迭代次数的顺序。 其中有9组数据 差值结构 A-B 迭代次数 构造平均列 …