flink实时流遇到的问题排查——部分数据未落库redis问题

flink实时流遇到的问题排查

    • 1、技术和环境
    • 2、问题表述
    • 3、简化的代码
    • 4、问题排查思路
    • 5、结论
    • 6、后续补充

1、技术和环境

技术:kafka、zookeeper、DataStream、redis
环境表述:kafka生产者KafkaProducerTest类mock 3条日志后,FlinkDataRealTimeFlowDeal类有建消费者,消费日志数据进行实时流DataStream处理,进行日志清洗、数据落库redis。

2、问题表述

理论上:
KafkaProducerTest生产者每次执行x条日志,消费者实际读取x条日志,实际落库x条处理结果。

实际:
(有问题)KafkaProducerTest生产者第1次执行3条日志,消费者实际读取3条日志,实际落库2条处理结果。
(正常)KafkaProducerTest生产者第2次执行之前的3条日志,消费者实际读取3条日志,实际落库3条处理结果。
(有问题)KafkaProducerTest生产者第3次执行之前的3条日志,消费者实际读取3条日志,实际落库2条处理结果。
(正常)KafkaProducerTest生产者第4次执行之前的3条日志,消费者实际读取3条日志,实际落库3条处理结果。

问题总结:奇数次执行时数据漏掉了1条没有落库,偶数次全部落库成功。
注意:每次执行的是相同的日志数据(测试用)。

3、简化的代码

DataStream.flatMap(进行日志清洗-Collector收集).keyBy(0).countWindow(2).reduce(进行聚合).process(进行redis落库)

4、问题排查思路

首先查看日志,发现:
进行【日志清洗-Collector收集】的方法:内部的错误日志信息打印处理了3次,符合要求,说明3条日志均进行了日志清洗。

然后,结合看方法:flatMap(String value, Collector<Tuple2<String, List>> out),flatMap扁平化得到的Collector是一个Tuple2<String, List类型的收集。

接着,keyby(0)后(以Tuple2的第一个参数对数据进行平铺)得到:
String key1,List< UserEventAction>111

String key2,List< UserEventAction>11
String key2,List< UserEventAction>22

String key3,List< UserEventAction>1
String key3,List< UserEventAction>2
String key3,List< UserEventAction>3

第一条日志(未处理):(key1平铺结果:处理0条,未处理1条)
第二条日志(处理):(key2平铺结果:处理2条,未处理0条)
第三条日志(部分处理):(key3平铺结果:处理2条,未处理1条)
备注:本业务场景,一条日志对应一个key。

进行【聚合】的方法reduce:内部共处理了4条Tuple2<String, List< UserEventAction>里的平铺元素(日志清洗的对象结果)。缺少了2条未处理。

观察到:对应日志条数:未处理1条。处理了2条日志(1条处理,1条部分处理)。

聚合前一步是countWindow。
推断是countWindow(2)出现问题。

上述结果推测:
key1暂不处理:具备1条数据,在countWindow(2)时数据每满2个触发一次,会处理。暂时不处理List< UserEventAction>111。等下一次key1有新数据时候,满2处理。

key2处理:具备2条数据,在countWindow(2)时数据每满2个触发一次,会处理2条:List< UserEventAction>11,List< UserEventAction>22。

key3部分处理:具备3条数据,在countWindow(2)时数据每满2个触发一次,会处理2条:List< UserEventAction>1,List< UserEventAction>2。暂时不处理List< UserEventAction>3。等下一次key3有新数据时候,满2条处理。

备注:也可以从落库redis的数据反序列化后得到印证。

5、结论

把countWindow改成1就可以都落库了。

DataStream.flatMap(进行日志清洗-Collector收集).keyBy(0).countWindow(1).reduce(进行聚合).process(进行redis落库)

问题解决了,问题在countWindow(2),也就是当根据keyBy(0)分组之后,数据的数量每次达到2时进行输出。

日志清洗后的结果是一条日志对应一个key,一个key对应多个List< UserEventAction>或者单个List< UserEventAction>。

设置2的时候,对应的key的List< UserEventAction>如果只有一条就不会落库,kafka生产者执行两次时候就会累积到两条相同的key的数据,每满2条处理后续操作,所以之前有奇数次执行和偶数次执行区别。

6、后续补充

主要是和flatMap平铺后收集到的key的种类数量有关系。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/549160.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

CA0503:无法显示额外的代码分析警告或错误

项目团队正在使用VSS进行协同开发&#xff0c;由于人数较多&#xff0c;一开始为了保证开发工作正常进行&#xff0c;打开了代码分析&#xff0c;强制了签入策略&#xff0c;进行代码分析。当项目进行了一段时间后&#xff0c;每次编译都会产生一个错误“CA0503:无法显示额外的…

VS2008中对.Net 3.5 sp1程序打安装包的前提系统环境的配置

众所周知&#xff0c;.net 程序必须要有frame work才能进行运行&#xff0c;这就需要发布程序时一起进行发布&#xff0c;于是我们需要配置本地机器的环境&#xff0c;而不是让用户去网站上下载这些框架。这里只介绍了操作系统的环境设置&#xff0c;对于打包的过程这里不再说明…

封装自定义的redis切库工具类ByteArrayRedisTemplate,读取byte数组反序列化成List<Object>

封装自定义的redis切库工具类ByteArrayRedisTemplate&#xff0c;读取byte数组反序列化成List&#xff1c;Object&#xff1e;&#xff08;使用lettuce连接池&#xff09;代码环境框架&#xff1a;springboot依赖&#xff1a;spring-boot-starter-data-redis步骤1&#xff1a;注…

随便唠叨下 最近的事情

闵行交大 闵行公安 闵行建交委 闵行大联动 闵行人口办 闵行... 看来到上海后,一直跟闵行有缘啊. 某天早上起来,赶紧洗个衣服,其实也就是几件衣服. 在卫生间冲洗后,晾晒时发现:一只袜子跑了. 望着手中仅剩的一只白袜,我感到很蛋疼.马上找了一遍,无果 这件灵异事件一直困扰着我,导…

redis的zset使用(java)——存取List< Object>

1 需求 要往redis存取List< Object>。 2 条件 1&#xff09;Object&#xff1a;是一个UserEvent对象&#xff0c;对应3个字段&#xff1a; Integer productId; String eventCode; Long timestamp;2&#xff09;要求 每个key里存取对象个数不超过xx个。 超过xx个&…

ogre研究之第一个程序(一)

第一次发送超过字数了&#xff0c;被迫剪成两篇&#xff01; 上一篇我们介绍了如何搭建开发环境&#xff0c;并创建了一个空白的窗口程序。 这里我们主要是实现在程序中装载一个简单的模型并显示出来。 首先看一下效果吧&#xff0c;&#xff08;模型就是ogre例子中的robot.mes…

Executor源码解读

Executor源码解读〇、[源码版本] jdk 1.8一、不再显式创建线程[举例1]代码示例二、不严格要求执行是异步的[举例1]代码示例三、任务在调用者线程之外的某个线程中执行[举例1]代码示例四、施加了某种限制的复合执行器[举例1]代码示例五、concurrent包中提供的Executor的实现对象…

Javascript高级程序设计第二版第十二章--Event--笔记

今天给诸位分享一下 chapter 12 Events所谓事件就是页面与文档窗口发生交互的瞬间。当年事件发生时它可以被预定&#xff08;程序处理&#xff09;。事件有两个过程&#xff0c;冒泡过程&#xff0c;或捕获过程。冒泡是自上而下&#xff0c;捕获是自下而上。这个顺序是document…

ExecutorService源码解读

ExecutorService源码解读〇、[源码版本] jdk 1.8一、ExecutorService接口详解1、ExecutorService关闭方法概述[举例1]代码示例2、ExecutorService任务执行方法概述3、Executors工厂方法概述[举例1]代码示例4、内存一致性影响二、接口方法详解1、shutdown方法2、shutdownNow方法…

英语生词本

英语生词本 1、daemon [di:mən] 守护进程2、phase [英] [feiz] 阶段, 时期3、methodology [英] [ˌmeθəˈdɔlədʒi:] 方法学,方法论4、algorithmalgorithm [英] [ˈlɡəriəm] [美] [ˈlɡəˌrɪəm] 运算法则2. 算法&#xff1b;演算法&#xff1b;计算程序3. 演示5、…

Executors源码解读——创建ExecutorService线程池

Executors源码解读——创建ExecutorService线程池〇、[源码版本] jdk 1.8一、线程池概述二、线程池创建三、Executors源码解读newFixedThreadPool()newWorkStealingPool()newSingleThreadExecutor()newCachedThreadPool()newSingleThreadScheduledExecutor()〇、[源码版本] jdk…

《大话设计模式》读书笔记-索引

《大话设计模式》读书笔记-第1章 简单工厂模式 《大话设计模式》读书笔记-第2章 策略模式 《大话设计模式》读书笔记-第3章 单一职责原则 《大话设计模式》读书笔记-第4章 开放-封闭原则 《大话设计模式》读书笔记-第5章 依赖倒转原则 《大话设计模式》读书笔记-第6章 装饰模式…

Future源码解读

Future源码解读〇、[源码版本] jdk 1.8一、Future概述[举例1]示例代码[举例2]示例代码内存一致性影响二、Future接口的方法cancel方法isCancelled方法isDone方法get方法〇、[源码版本] jdk 1.8 一、Future概述 Future表示异步计算的结果。提供了检查计算是否完成、等待计算完…

RIP,EIGRP,OSPF融合网络互通实验(原创)

首先看拓扑&#xff1a; 一个面试考官问我一个这样的问题&#xff0c;拓扑如上&#xff0c;为什么R1上的lo0 PING 不通 R6 上的lo0,说是一切都按正常配置&#xff0c;说是考我EIGRP的特性。我当然很纳闷&#xff0c;如果都正常配置怎么会不通呢&#xff0c;最后他告诉我主要是考…

谷歌A/B实验——重叠实验基础设施解读

谷歌A/B实验——重叠实验基础设施解读〇、来源一、背景介绍二、如何划分参数三、谷歌设计的ab实验系统3.1 域和层的设计3.1.1 基础重叠域和层设计3.1.2 具备非重叠和重叠的域和层设计3.1.2 具备非重叠的域的嵌套设计优点3.1.3 具备非重叠的域的嵌套设计缺点3.1.4 启动层&#x…

oracle 备份

1.首先以 sysdba的身份登录数据库 SQL> conn sys/oracle as sysdba SQL> col name for a50 SQL> select * from v$controlfile; 找到控制文件所在目录 STATUS ------- NAME --------------------------------------- F:\ORACLE\ORADATA\AFIS40\CONTROL01.CTL F:\O…

Mysql 5.7 创建索引官方解读

一、环境 Mysql 5.7 二、Mysql索引创建解读 1.概述 通常我们在使用CREATE TABLE时会创建所有的索引。索引的创建对于 InnoDB 表尤其重要&#xff0c;其中主键决定了数据文件中行的物理布局。 CREATE INDEX是另一种添加索引的方式&#xff0c;针对已经创建的表添加索引。注…

ntp时间服务解析

网络时间协议NTP&#xff08;Network Time Protocol&#xff09;是用于互联网中时间同步的标准互联网协议。NTP的用途是把计算机的时间同步到某些时间标准。目前采用的时间标准是世界协调时UTC&#xff08;Universal Time Coordinated&#xff09;。NTP的主要开发者是美国特拉华…

java循环中list.add对象的坑——后加入元素覆盖早期数据成相同值问题

原因 list.add&#xff08;对象&#xff09;&#xff0c;放入的实质是对象的引用。当对象在循环外进行new后&#xff0c;第一次循环add对象1&#xff0c;第二次循环add对象2&#xff0c;由于放入实质是对象的引用&#xff0c;引用指向了对象2&#xff0c;故变成list里有两个对…

AspectJ切面自定义注解实现参数分组校验——基础概念(1)

AspectJ切面自定义注解实现参数分组校验——基础概念&#xff08;1&#xff09;一、环境二、validation-api源码解读2-1.Default源码解读2-2.valid源码解读2-3.Validation源码解读一、环境 maven 需要引入的依赖&#xff1a; <dependency><groupId>javax.valida…