Flink问题解决及性能调优-【Flink不同并行度引起sink2es报错问题】

最近需求,仅想提高sink2es的qps,所以仅调节了sink2es的并行度,但在调节不同算子并行度时遇到一些问题,找出问题的根本原因解决问题,并分析整理。

实例代码

--SET table.exec.state.ttl=86400s; --24 hour,默认: 0 ms
SET table.exec.state.ttl=2592000s; --30 days,默认: 0 msCREATE TABLE kafka_table (mid bigint,db string,sch string,tab string,opt string,ts bigint,ddl string,err string,src map<string,string>,cur map<string,string>,cus map<string,string>,account_id AS IF(cur['account_id'] IS NOT NULL , cur['account_id'], src ['account_id']),publish_time AS IF(cur['publish_time'] IS NOT NULL , cur['publish_time'], src ['publish_time']),msg_status AS IF(cur['msg_status'] IS NOT NULL , cur['msg_status'], src ['msg_status']),send_type AS IF(cur['send_type'] IS NOT NULL , cur['send_type'], src ['send_type'])--event_time as cast(IF(cur['update_time'] IS NOT NULL , cur['update_time'], src ['update_time']) AS TIMESTAMP(3)), -- TIMESTAMP(3)/TIMESTAMP_LTZ(3)--WATERMARK FOR event_time AS event_time - INTERVAL '1' MINUTE     --SECOND
) WITH ('connector' = 'kafka','topic' = 't1','properties.bootstrap.servers' = 'xx.xx.xx.xx:9092','properties.group.id' = 'g1','scan.startup.mode' = 'earliest-offset',  --group-offsets/earliest-offset/latest-offset--  'properties.enable.auto.commit',= 'true' -- default:false, 如果为false,则在发生checkpoint时触发offset提交'format' = 'json'
);CREATE TABLE es_sink(send_type      STRING,account_id     STRING,publish_time   STRING,grouping_id       INTEGER,init           INTEGER,init_cancel    INTEGER,push          INTEGER,succ           INTEGER,fail           INTEGER,init_delete    INTEGER,update_time    STRING,PRIMARY KEY (group_id,send_type,account_id,publish_time) NOT ENFORCED
)
with ('connector' = 'elasticsearch-6','index' = 'es_sink','document-type' = 'es_sink','hosts' = 'http://xxx:9200','format' = 'json','filter.null-value'='true','sink.bulk-flush.max-actions' = '1000','sink.bulk-flush.max-size' = '10mb'
);CREATE view  tmp as
selectsend_type,account_id,publish_time,msg_status,case when UPPER(opt) = 'INSERT' and msg_status='0'  then 1 else 0 end AS init,case when UPPER(opt) = 'UPDATE' and send_type='1' and msg_status='4' then 1 else 0 end AS init_cancel,case when UPPER(opt) = 'UPDATE' and msg_status='3' then 1 else 0 end AS push,case when UPPER(opt) = 'UPDATE' and (msg_status='1' or msg_status='5') then 1 else 0 end AS succ,case when UPPER(opt) = 'UPDATE' and (msg_status='2' or msg_status='6') then 1 else 0 end AS fail,case when UPPER(opt) = 'DELETE' and send_type='1' and msg_status='0' then  1 else 0 end AS init_delete,event_time,opt,ts
FROM kafka_table
where (UPPER(opt) = 'INSERT' and msg_status='0' )
or        (UPPER(opt) = 'UPDATE' and msg_status in ('1','2','3','4','5','6'))
or        (UPPER(opt) = 'DELETE' and send_type='1' and msg_status='0');--send_type=1          send_type=0
--初始化->0             初始化->0
--取消->4
--推送->3               推送->3
--成功->1               成功->5
--失败->2               失败->6CREATE view  tmp_groupby as
selectCOALESCE(send_type,'N') AS send_type
,COALESCE(account_id,'N') AS account_id
,COALESCE(publish_time,'N') AS publish_time
,case when send_type is null and account_id is null and publish_time is null then 1when send_type is not null and account_id is null and publish_time is null then 2when send_type is not null and account_id is not null and publish_time is null then 3when send_type is not null and account_id is not null and publish_time is not null then 4end grouping_id
,sum(init) as init
,sum(init_cancel) as init_cancel
,sum(push) as push
,sum(succ) as succ
,sum(fail) as fail
,sum(init_delete) as init_delete
from tmp
--GROUP BY GROUPING SETS ((send_type,account_id,publish_time), (send_type,account_id),(send_type), ())
GROUP BY ROLLUP (send_type,account_id,publish_time); --等同于以上INSERT INTO es_sink
selectsend_type,account_id,publish_time,grouping_id,init,init_cancel,push,succ,fail,init_delete,CAST(LOCALTIMESTAMP AS STRING) as update_time
from tmp_groupby

发现问题

由于groupby或join聚合等算子操作的并行度与sink2es算子操作的并行度不同,上游算子同一个key的数据可能会下发到下游多个不同算子中。
导致sink2es出现多个subtask同时操作同一个key(这里key作为主键id),报错如下:

...Caused by: [test1/cfPTBYhcRIaYTIh3oavCvg][[test1][2]] ElasticsearchException[Elasticsearch exception [type=version_conflict_engine_exception, reason=[test1][4_1_92_2024-01-15 16:30:00]: version conflict, required seqNo [963], primary term [1]. current document has seqNo [964] and primary term [1]]]at org.elasticsearch.ElasticsearchException.innerFromXContent(ElasticsearchException.java:510)at org.elasticsearch.ElasticsearchException.fromXContent(ElasticsearchException.java:421)at org.elasticsearch.action.bulk.BulkItemResponse.fromXContent(BulkItemResponse.java:135)at org.elasticsearch.action.bulk.BulkResponse.fromXContent(BulkResponse.java:198)at org.elasticsearch.client.RestHighLevelClient.parseEntity(RestHighLevelClient.java:653)at org.elasticsearch.client.RestHighLevelClient.lambda$performRequestAsyncAndParseEntity$3(RestHighLevelClient.java:549)at org.elasticsearch.client.RestHighLevelClient$1.onSuccess(RestHighLevelClient.java:580)at org.elasticsearch.client.RestClient$FailureTrackingResponseListener.onSuccess(RestClient.java:621)at org.elasticsearch.client.RestClient$1.completed(RestClient.java:375)at org.elasticsearch.client.RestClient$1.completed(RestClient.java:366)at org.apache.http.concurrent.BasicFuture.completed(BasicFuture.java:122)at org.apache.http.impl.nio.client.DefaultClientExchangeHandlerImpl.responseCompleted(DefaultClientExchangeHandlerImpl.java:177)at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.processResponse(HttpAsyncRequestExecutor.java:436)at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.inputReady(HttpAsyncRequestExecutor.java:326)at org.apache.http.impl.nio.DefaultNHttpClientConnection.consumeInput(DefaultNHttpClientConnection.java:265)at org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:81)at org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:39)at org.apache.http.impl.nio.reactor.AbstractIODispatch.inputReady(AbstractIODispatch.java:114)at org.apache.http.impl.nio.reactor.BaseIOReactor.readable(BaseIOReactor.java:162)at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvent(AbstractIOReactor.java:337)at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvents(AbstractIOReactor.java:315)at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:276)at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:588)... 1 more[CIRCULAR REFERENCE:[test1/cfPTBYhcRIaYTIh3oavCvg][[test1][2]] ElasticsearchException[Elasticsearch exception [type=version_conflict_engine_exception, reason=[test1][4_1_92_2024-01-15 16:30:00]: version conflict, required seqNo [963], primary term [1]. current document has seqNo [964] and primary term [1]]]]

问题原因

Flink中存在八种分区策略,常用Operator Chain链接方式有三种分区器:

  • forward:上下游并行度相同,且不发生shuffle,直连的分区器
  • hash:将数据按照key的Hash值下发到下游的算子中
  • rebalance:数据会被循环或者随机下发到下游算子中,改变并行度若无keyby,默认使用RebalancePartitioner分区策略

rebalance分区器,可能会将上游算子的同一个key随机下发到下游不同算子中,因而引起报错,如下图:
在这里插入图片描述在这里插入图片描述模型如下:

在这里插入图片描述

解决方案

  • 分组聚合算子与sink2es算子配置成相同的并行度,即使用forward分区器,如下图:
    在这里插入图片描述在这里插入图片描述
    另外sink2es forward分区器上游operator chain已经通过hash分区器保证了同一个key只能下发到下游一个subtask实例中

模型如下:
![在这里插入图片描述](https://img-blog.csdnimg.cn/direct/215b7ba208b142da803a78d85b0f474e.png

  • sink2es算子的并行度配置为1,如下图:

operator chain为forward分区器模型如下:
在这里插入图片描述

总结

归根结底就是需要保证:上游subtask中同一个key只能下发到下游一个subtask中

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/653352.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

MySQL查询—联合查询、子查询

关于表格的创建&#xff0c;请看上一篇文章——MySQL查询—连接查询 1、联合查询&#xff1a;把多次查询的结果合并&#xff0c;形成一共新的查询集。 UNION&#xff0c;UNION ALL 语法&#xff1a; SELECT 字段列表 FROM 表&#xff21;&#xff0e;&#xff0e;&#…

qt-C++笔记之使用信号和槽实现跨类成员变量同步响应

qt-C笔记之使用信号和槽实现跨类成员变量同步响应 —— 杭州 2024-01-24 code review! 文章目录 qt-C笔记之使用信号和槽实现跨类成员变量同步响应1.运行2.main.cpp3.test.pro4.编译 1.运行 2.main.cpp 代码 #include <QCoreApplication> #include <QObject> #…

Linux下Docker搭建部署Typecho博客【详细版】

Linux下Docker搭建部署Typecho博客【详细版】 一、环境准备1.1.准备阿里云服务器【新用户免费使用三个月】1.2.准备远程工具【FinalShell】1.3.系统信息1.4.安装所需软件包1.5.设置docker镜像源1.6.更新yum软件包索引1.7.确认停用selinux 二、安装Docker2.1.安装Docker-Ce2.2.查…

【RTP】webrtc 学习3: webrtc对h264的rtp解包

rtp_rtcp\source\video_rtp_depacketizer_h264.cc【RTP】webrtc 学习2: webrtc对h264的rtp打包 中分析了打包过程的代码,这样再来看解析过程的源码就容易多了:本代码主要基于m79,m98类似。这里注明了jitterbuffer 会再次 做 解析stap-a 变为NAL units解析ParseFuaNalu 第一…

ACL、VLAN、NAT笔记

一、ACL ---访问控制列表 1.ACL的作用 1&#xff0c;访问控制&#xff1a;在路由器流量流入或流出的接口上&#xff0c;匹配流量&#xff0c;然后 执行设定好的动作。 ---- permit 允许 , deny 拒绝 2&#xff0c;抓取感兴趣流&#xff1a;ACL可以和其他服务结合使用。ACL只…

MyBatis 如何整合 Druid 连接池?

Mybatis 如何整合 Druid 数据连接池呢&#xff1f;首先打开创建的 Maven 工程&#xff0c;找到 pom.xml 文件&#xff0c;添加 Druid 依赖。 <!--druid连接池--> <dependency><groupId>com.alibaba</groupId><artifactId>druid</artifactId&…

【C语言】数组的应用:三子棋游戏

由于代码较长&#xff0c;为了增加可读性&#xff0c;我们把代码分别写到game.h&#xff0c;game.c&#xff0c;test.c&#xff0c;里面&#xff0c;其中game.h用来声明函数&#xff0c;实现函数功能的代码在game.c&#xff0c;测试游戏的代码在test.c 为了方便后续的更改&…

ThreadLocal学习笔记

ThreadLocal类图 ThreadLocal/InheritableThreadLocal/ \TransmittableThreadLocal(阿里巴巴) TransmissibleThreadLocal(阿里巴巴)ThreadLocal 这是Thread类的局部变量&#xff0c;每个线程私有。 它主要用于解决多线程中的数据共享问题&#xff0c;保…

k8s 版本发布与回滚

一、实验环境准备&#xff1a; kubectl get pods -o wide kubectl get nodes -o wide kubectl get svc 准备两个nginx镜像&#xff0c;版本号一个是V3&#xff0c;一个是V4 二、准备一个nginx.yaml文件 apiVersion: apps/v1 kind: Deployment metadata:name: nginx-deploylab…

翻译: GPT-4 Vision静态图表转换为动态数据可视化 升级Streamlit 三

GPT-4 Vision 系列: 翻译: GPT-4 with Vision 升级 Streamlit 应用程序的 7 种方式一翻译: GPT-4 with Vision 升级 Streamlit 应用程序的 7 种方式二 1. 将任何静态图表转换为动态数据可视化 ChatGPT Vision 不仅可以将涂鸦变成功能齐全的 Streamlit 应用程序&#xff0c;还…

SpringBoot整合nacos的入门Demo

Nacos介绍 Nacos /nɑ:kəʊs/ 是 Dynamic Naming and Configuration Service的首字母简称&#xff0c;一个更易于构建云原生应用的动态服务发现、配置管理和服务管理平台。 Nacos 致力于帮助您发现、配置和管理微服务。Nacos 提供了一组简单易用的特性集&#xff0c;帮助您快速…

操作系统(6)----线程相关

目录 1.线程与进程的关系 2.线程的属性 3.线程的实现方式 用户级线程 内核级线程 多线程模型 一对一模型: 多对一模型&#xff1a; 多对多模型&#xff1a; 4.线程的状态和转换 5.线程的组织与控制 1.线程与进程的关系 可以把线程理解为“轻量级进程”。线程是一个…

大创项目推荐 题目:基于卷积神经网络的手写字符识别 - 深度学习

文章目录 0 前言1 简介2 LeNet-5 模型的介绍2.1 结构解析2.2 C1层2.3 S2层S2层和C3层连接 2.4 F6与C5层 3 写数字识别算法模型的构建3.1 输入层设计3.2 激活函数的选取3.3 卷积层设计3.4 降采样层3.5 输出层设计 4 网络模型的总体结构5 部分实现代码6 在线手写识别7 最后 0 前言…

【Spring实战】31 Spring Boot3 集成 Gateway 微服务网关

文章目录 1. 定义2. 功能3. 示例代码1) 创建一个业务服务2&#xff09;创建一个网关服务3&#xff09;启动服务4&#xff09;验证 4. 代码参考结语 1. 定义 Spring Cloud Gateway 是一个基于 Spring Framework 的开源网关服务&#xff0c;用于构建微服务架构中的 API 网关。它…

C51 单片机学习(一):基础外设

参考 51单片机入门教程 1. 单片机简介 1.1 定义 单片机&#xff08;Micro Controller Unit&#xff0c;简称 MCU&#xff09; 内部集成了 CPU、RAM、ROM、定时器、中断系统、通讯接口等一系列电脑的常用硬件功能单片机的任务是信息采集&#xff08;依靠传感器&#xff09;、处…

休息日的思考与额外题——链表

文章目录 前言链表知识点 一、 92. 反转链表 II二、21. 合并两个有序链表总结 前言 一个本硕双非的小菜鸡&#xff0c;备战24年秋招&#xff0c;计划二刷完卡子哥的刷题计划&#xff0c;加油&#xff01; 二刷决定精刷了&#xff0c;于是参加了卡子哥的刷题班&#xff0c;训练…

富文本编辑器CKEditor4简单使用-01

富文本编辑器CKEditor4简单使用-01 1. 快速体验入门1.1 通过从 CDN 加载 CKEditor 来快速体验1.2 从官方网站下载软件包1.2.1 官网下载1.2.2 解压、简单使用&#xff08;自带index页面示例&#xff09;1.2.3 将 CKEditor 4 添加到自己的页面1.2.3.1 目录结构1.2.3.2 效果1.2.3.…

TensorFlow2实战-系列教程6:迁移学习实战

&#x1f9e1;&#x1f49b;&#x1f49a;TensorFlow2实战-系列教程 总目录 有任何问题欢迎在下面留言 本篇文章的代码运行界面均在Jupyter Notebook中进行 本篇文章配套的代码资源已经上传 1、迁移学习 用已经训练好模型的权重参数当做自己任务的模型权重初始化一般全连接层需…

【机器学习】工程实践问题概述

机器学习实际应用时的工程问题与面临的挑战 一、实现细节问题 1.1 训练样本 训练样本与标注对各类机器学习算法和模型的精度影响 训练样本的选择对各类机器学习算法和模型的影响 训练样本的优化 如何进行数据增强&#xff1f; 如何进行数据清洗&#xff1f; 样本的标注对各类机…

数据结构(二)------单链表

制作不易&#xff0c;三连支持一下呗&#xff01;&#xff01;&#xff01; 文章目录 前言一.什么是链表二.链表的分类三.单链表的实现总结 前言 上一节&#xff0c;我们介绍了顺序表的实现与一些经典算法。 但是顺序表这个数据结构依然有不少缺陷&#xff1a; 1.顺序表指定…