线程中如何使用对象_在 Flink 算子中使用多线程如何保证不丢数据?

1da4d12da968aeb22342c682f85c785c.png

简介: 本人通过分析痛点、同步批量请求优化为异步请求、多线程 Client 模式、Flink 算子内多线程实现以及总结四部分帮助大家理解 Flink 中使用多线程的优化及在 Flink 算子中使用多线程如何保证不丢数据。

分析痛点


笔者线上有一个 Flink 任务消费 Kafka 数据,将数据转换后,在 Flink 的 Sink 算子内部调用第三方 api 将数据上报到第三方的数据分析平台。这里使用批量同步 api,即:每 50 条数据请求一次第三方接口,可以通过批量 api 来提高请求效率。由于调用的外网接口,所以每次调用 api 比较耗时。假如批次大小为 50,且请求接口的平均响应时间为 50ms,使用同步 api,因此第一次请求响应以后才会发起第二次请求。请求示意图如下所示:

29c0dec312bc14f8465403c54f594b6c.png


平均下来,每 50 ms 向第三方服务器发送 50 条数据,也就是每个并行度 1 秒钟处理 1000 条数据。假设当前业务数据量为每秒 10 万条数据,那么 Flink Sink 算子的并行度需要设置为 100 才能正常处理线上数据。从 Flink 资源分配来讲,100 个并行度需要申请 100 颗 CPU,因此当前 Flink 任务需要占用集群中 100 颗 CPU 以及不少的内存资源。请问此时 Flink Sink 算子的 CPU 或者内存压力大吗?


上述请求示意图可以看出 Flink 任务发出请求到响应这 50ms 期间,Flink Sink 算子只是在 wait,并没有实质性的工作。因此,CPU 使用率肯定很低,当前任务的瓶颈明显在网络 IO。最后结论是 Flink 任务申请了 100 颗 CPU,导致 yarn 或其他资源调度框架没有资源了,但是这 100 颗 CPU 的使用率并不高,这里能不能优化通过提高 CPU 的使用率,从而少申请一些 CPU 呢?

同步批量请求优化为异步请求


首先可以想到的是将同步请求改为异步请求,使得任务不会阻塞在网络请求这一环节,请求示意图如下所示。

2b4a75937da34e9cd41473d48a0c354f.png


异步请求相比同步请求而言,优化点在于每次发出请求时,不需要等待请求响应后再发送下一次请求,而是当下一批次的 50 条数据准备好之后,直接向第三方服务器发送请求。每次发送请求后,Flink Sink 算子的客户端需要注册监听器来等待响应,当响应失败时需要做重试或者回滚策略。


通过异步请求的方式,可以优化网络瓶颈,假如 Flink Sink 算子的单个并行度平均 10ms 接收到 50 条数据,那么使用异步 api 的方式平均 1 秒可以处理 5000 条数据,整个 Flink 任务的性能提高了 5 倍。对于每秒 10 万数据量的业务,这里仅需要申请 20 颗 CPU 资源即可。关于异步 api 的具体使用,可以根据场景具体设计,这里不详细讨论。

多线程 Client 模式


对于一些不支持异步 api 的场景,可能并不能使用上述优化方案,同样,为了提高 CPU 使用率,可以在 Flink Sink 端使用多线程的方案。如下图所示,可以在 Flink Sink 端开启 5 个请求第三方服务器的 Client 线程:Client1、Client2、Client3、Client4、Client5。


这五个线程内分别使用同步批量请求的 Client,单个 Client 还是保持 50 条记录为一个批次,即 50 条记录请求一次第三方 api。请求第三方 api 耗时主要在于网络 IO(性能瓶颈在于网络请求延迟),因此如果变成 5 个 Client 线程,每个 Client 的单次请求平均耗时还能保持在 50ms,除非网络请求已经达到了带宽上限或整个任务又遇到其他瓶颈。所以,多线程模式下使用同步批量 api 也能将请求效率提升 5 倍。

3cf9d80e0c1688e1d4549ac4f9e2f1af.png

说明:多线程的方案,不仅限于请求第三方接口,对于非 CPU 密集型的任务也可以使用该方案,在降低 CPU 数量的同时,单个 CPU 承担多个线程的工作,从而提高 CPU 利用率。例如:请求 HBase 的任务或磁盘 IO 是瓶颈的任务,可以降低任务的并行度,使得每个并行度内处理多个线程。

Flink 算子内多线程实现


Sink 算子的单个并行度内现在有 5 个 Client 用于消费数据,但 Sink 算子的数据都来自于上游算子。如下图所示,一个简单的实现方式是 Sink 算子接收到上游数据后通过轮循或随机的策略将数据分发给 5 个 Client 线程。

f35f2747b9feeef3107da9d3c3ec1a4f.png


但是轮循或者随机策略会存在问题,假如 5 个 Client 中 Client3 线程消费较慢,会导致给 Client3 分发数据时被阻塞,从而使得其他正常消费的线程 Client1、2、4、5 也被分发不到数据。


为了解决上述问题,可以在 Sink 算子内申请一个数据缓冲队列,队列有先进先出(FIFO)的特性。Sink 算子接收到的数据直接插入到队列尾部,五个 Client 线程不断地从队首取数据并消费,即:Sink 算子先接收的数据 Client 先消费,后接收的数据 Client 后消费。

599ea1bfd26bb787f937a0be53d983b7.png
  • 若队列一直是满的,说明 Client 线程消费较慢、Sink 算子上游生产数据较快。
  • 若队列一直为空,说明 Client 线程消费较快、Sink 算子的上游生产数据较慢。

五个线程共用同一个队列完美地解决了单个线程消费慢的问题,当 Client3 线程阻塞时,不影响其他线程从队列中消费数据。这里使用队列还起到了削峰填谷的作用。

4f61f51a9c3b8b09f460b581033897ba.png

代码实现


原理明白了,具体代码如下所示,首先是消费数据的 Client 线程代码,代码逻辑很简单,一直从 bufferQueue 中 poll 数据,取出数据后,执行相应的消费逻辑即可,在本案例中消费逻辑便是 Client 积攒批次并调用第三方 api。

public class MultiThreadConsumerClient implements Runnable {private LinkedBlockingQueue<String> bufferQueue;public MultiThreadConsumerClient(LinkedBlockingQueue<String> bufferQueue) {this.bufferQueue = bufferQueue;}@Overridepublic void run() {String entity;while (true){// 从 bufferQueue 的队首消费数据entity = bufferQueue.poll();// 执行 client 消费数据的逻辑doSomething(entity);}}// client 消费数据的逻辑private void doSomething(String entity) {// client 积攒批次并调用第三方 api}
}

Sink 算子代码如下所示,在 open 方法中需要初始化线程池、数据缓冲队列并创建开启消费者线程,在 invoke 方法中只需要往 bufferQueue 的队尾添加数据即可。

public class MultiThreadConsumerSink extends RichSinkFunction<String> {// Client 线程的默认数量private final int DEFAULT_CLIENT_THREAD_NUM = 5;// 数据缓冲队列的默认容量private final int DEFAULT_QUEUE_CAPACITY = 5000;private LinkedBlockingQueue<String> bufferQueue;@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);// new 一个容量为 DEFAULT_CLIENT_THREAD_NUM 的线程池ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(DEFAULT_CLIENT_THREAD_NUM, DEFAULT_CLIENT_THREAD_NUM,0L,TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());// new 一个容量为 DEFAULT_QUEUE_CAPACITY 的数据缓冲队列this.bufferQueue = Queues.newLinkedBlockingQueue(DEFAULT_QUEUE_CAPACITY);// 创建并开启消费者线程MultiThreadConsumerClient consumerClient = new MultiThreadConsumerClient(bufferQueue);for (int i=0; i < DEFAULT_CLIENT_THREAD_NUM; i++) {threadPoolExecutor.execute(consumerClient);}}@Overridepublic void invoke(String value, Context context) throws Exception {// 往 bufferQueue 的队尾添加数据bufferQueue.put(value);}
}

代码逻辑相对比较简单,请问上述 Sink 能保证 Exactly Once 吗?


答:不能保证 Exactly Once,Flink 要想端对端保证 Exactly Once,必须要求外部组件支持事务,这里第三方接口明显不支持事务。


那么上述 Sink 能保证 At Lease Once 吗?言外之意,上述 Sink 会丢数据吗?


答:会丢数据。因为上述案例中使用的批量 api 来消费数据,假如批量 api 是每积攒 50 条数据请求一次第三方接口,当做 Checkpoint 时可能只积攒了 30 条数据,所以做 Checkpoint 时内存中可能还有数据未发送到外部系统。而且数据缓冲队列中可能还有缓存的数据,因此上述 Sink 在做 Checkpoint 时会出现 Checkpoint 之前的数据未完全消费的情况。


例如,Flink 任务消费的 Kafka 数据,当做 Checkpoint 时,Flink 任务消费到 offset 为 10000 的位置,但实际上 offset 10000 之前的一小部分数据可能还在数据缓冲队列中尚未完全消费,或者因为没积攒够一定批次所以数据缓存在 client 中,并未请求到第三方。当任务失败后,Flink 任务从 Checkpoint 处恢复,会从 offset 为 10000 的位置开始消费,此时 offset 10000 之前的一小部分缓存在内存缓冲队列中的数据不会再被消费,于是就出现了丢数据情况。

e6d5fa910aea0a31881f4eec6414122e.png

处理丢数据情况


如何保证数据不丢失呢?很简单,可以在 Checkpoint 时强制将数据缓冲区的数据全部消费完,并对 client 执行 flush 操作,保证 client 端不会缓存数据。
实现思路:Sink 算子可以实现 CheckpointedFunction 接口,当做 Checkpoint 时,会调用 snapshotState 方法,方法内可以触发 client 的 flush 操作。但 client 在 MultiThreadConsumerClient 对应的五个线程中,需要考虑线程同步的问题,即:Sink 算子的 snapshotState 方法中做一个操作,要使得五个 Client 线程感知到当前正在执行 Checkpoint,此时应该把数据缓冲区的数据全部消费完,并对 client 执行过 flush 操作。
如何实现呢?需要借助 CyclicBarrier。CyclicBarrier 会让所有线程都等待某个操作完成后才会继续下一步行动。在这里可以使用 CyclicBarrier,让 Checkpoint 等待所有的 client 将数据缓冲区的数据全部消费完并对 client 执行过 flush 操作,言外之意,offset 10000 之前的数据必须全部消费完成才允许 Checkpoint 执行完成。这样就可以保证 Checkpoint 时不会有数据被缓存在内存,可以保证数据源 offset 10000 之前的数据都消费完成。
MultiThreadConsumerSink 具体代码如下所示:

public class MultiThreadConsumerSink extends RichSinkFunction<String> {// Client 线程的默认数量private final int DEFAULT_CLIENT_THREAD_NUM = 5;// 数据缓冲队列的默认容量private final int DEFAULT_QUEUE_CAPACITY = 5000;private LinkedBlockingQueue<String> bufferQueue;@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);// new 一个容量为 DEFAULT_CLIENT_THREAD_NUM 的线程池ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(DEFAULT_CLIENT_THREAD_NUM, DEFAULT_CLIENT_THREAD_NUM,0L,TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());// new 一个容量为 DEFAULT_QUEUE_CAPACITY 的数据缓冲队列this.bufferQueue = Queues.newLinkedBlockingQueue(DEFAULT_QUEUE_CAPACITY);// 创建并开启消费者线程MultiThreadConsumerClient consumerClient = new MultiThreadConsumerClient(bufferQueue);for (int i=0; i < DEFAULT_CLIENT_THREAD_NUM; i++) {threadPoolExecutor.execute(consumerClient);}}@Overridepublic void invoke(String value, Context context) throws Exception {// 往 bufferQueue 的队尾添加数据bufferQueue.put(value);}
}


MultiThreadConsumerSink 实现了 CheckpointedFunction 接口,在 open 方法中增加了 CyclicBarrier 的初始化,CyclicBarrier 预期容量设置为 client 线程数加一,表示当 client 线程数加一个线程都执行了 await 操作时,所有的线程的 await 方法才会执行完成。这里为什么要加一呢?因为除了 client 线程外, snapshotState 方法中也需要执行过 await。


当做 Checkpoint 时 snapshotState 方法中执行 clientBarrier.await(),等待所有的 client 线程将缓冲区数据消费完。snapshotState 方法执行过程中 invoke 方法不会被执行,即:

Checkpoint 过程中数据缓冲队列不会增加数据,所以 client 线程很快就可以将缓冲队列中的数据消费完。


MultiThreadConsumerClient 具体代码如下所示:

public class MultiThreadConsumerSink extends RichSinkFunction<String> implements CheckpointedFunction {private Logger LOG = LoggerFactory.getLogger(MultiThreadConsumerSink.class);// Client 线程的默认数量private final int DEFAULT_CLIENT_THREAD_NUM = 5;// 数据缓冲队列的默认容量private final int DEFAULT_QUEUE_CAPACITY = 5000;private LinkedBlockingQueue<String> bufferQueue;private CyclicBarrier clientBarrier;@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);// new 一个容量为 DEFAULT_CLIENT_THREAD_NUM 的线程池ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(DEFAULT_CLIENT_THREAD_NUM, DEFAULT_CLIENT_THREAD_NUM,0L,TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());// new 一个容量为 DEFAULT_QUEUE_CAPACITY 的数据缓冲队列this.bufferQueue = Queues.newLinkedBlockingQueue(DEFAULT_QUEUE_CAPACITY);// barrier 需要拦截 (DEFAULT_CLIENT_THREAD_NUM + 1) 个线程this.clientBarrier = new CyclicBarrier(DEFAULT_CLIENT_THREAD_NUM + 1);// 创建并开启消费者线程MultiThreadConsumerClient consumerClient = new MultiThreadConsumerClient(bufferQueue, clientBarrier);for (int i=0; i < DEFAULT_CLIENT_THREAD_NUM; i++) {threadPoolExecutor.execute(consumerClient);}}@Overridepublic void invoke(String value, Context context) throws Exception {// 往 bufferQueue 的队尾添加数据bufferQueue.put(value);}@Overridepublic void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {LOG.info("snapshotState : 所有的 client 准备 flush !!!");// barrier 开始等待clientBarrier.await();}@Overridepublic void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {}}


从数据缓冲队列中 poll 数据时,增加了 timeout 时间为 50ms。如果从队列中拿到数据,则执行消费数据的逻辑,若拿不到数据说明数据缓冲队列中数据消费完了。此时需要判断是否有等待的 CyclicBarrier,如果有等待的 CyclicBarrier 说明此时正在执行 Checkpoint,所以 client 需要执行 flush 操作。flush 完成后,Client 线程执行 barrier.await() 操作。当所有的 Client 线程都执行到 await 时,所有的 barrier.await() 都会被执行完。此时 Sink 算子的 snapshotState 方法就会执行完。通过这种策略可以保证 Checkpoint 时将数据缓冲区中的数据消费完,client 执行 flush 操作可以保证 client 端不会缓存数据。

总结


分析到这里,我们设计的 Sink 终于可以保证不丢失数据了。对 CyclicBarrier 不了解的同学请 Google 或百度查询。再次强调这里多线程的方案,不仅限于请求第三方接口,对于非 CPU 密集型的任务都可以使用该方案来提高 CPU 利用率,且该方案不仅限于 Sink 算子,各种算子都适用。本文主要希望帮助大家理解 Flink 中使用多线程的优化及在 Flink 算子中使用多线程如何保证不丢数据。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/552098.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

linux回到桌面的命令符_三 基本的base shell 命令

1、启动shell进入 /etc/passwd 看到知道默认的是base shell ctrlaltt 或者双击终端&#xff0c;就进入如下图界面&#xff1b;如果你还是CLI新手&#xff0c;请记住&#xff0c;在输入shell命令之后&#xff0c;需要按回车键才能让shell执行你输入的命令。2、bash 手册man 命令…

java单纯形法_单纯形法 - fjzzq2002 - 博客园

看了集训队答辩&#xff0c;感觉要学习的有杜教筛高级版、线性规划、FFT、仙人掌、高级版线段树不出意外的话一个月内博客内都不会有别的东西了QAQ首先是喜闻乐见的单纯形法解线性规划。今年(2016年)和线性规划有关的集训队论文有两篇&#xff0c;大家可以自行翻一下集训队论文…

python调用js获取异步返回的数据_Python怎么获取js动态加载的数据

展开全部 import selenium from selenium import webdriver from selenium.common.exceptions import NoSuchElementException from selenium.webdriver.common.keys import Keys import time browser webdriver.Firefox() # Get local session of firefox browser.get("…

一个柱状图里两种数据_分享一些数据分析常用的统计图图表

无论是报表分析还是可视化分析中&#xff0c;最直观传达数据走向趋势的就是各式各样的统计图&#xff0c;比如想要比较分析两种不同的变量数据&#xff0c;可以用柱状图&#xff1b;想要查看某一数据在整体数据中所占的比例&#xff0c;可以用饼图来展示&#xff1b;想要查看某…

java 单例 读写锁_你用对锁了吗?浅谈 Java “锁” 事

每个时代&#xff0c;都不会亏待会学习的人大家好&#xff0c;我是yes。本来打算继续写消息队列的东西的&#xff0c;但是最近在带新同事&#xff0c;发现新同事对于锁这方面有一些误解&#xff0c;所以今天就来谈谈“锁”事和 Java 中的并发安全容器使用有哪些注意点。不过在这…

word无法启动转换器recovr32_迅捷PDF转换器3.0.1Mod会员版

特别声明所有软件皆来源于网上收集整理&#xff0c;仅供学习与交流技术,不得用作其它用途&#xff0c;如有侵犯你的权益&#xff0c;请联系我们,我们将于24小时内进行删除&#xff0c;谢谢你的配合&#xff01;1 迅捷PDF转换器作为一款专业实用的文件格式转换器&#xff0c;不仅…

项目管理知识体系指南_MP考前冲刺丨项目管理知识体系指南(PMBOK)串讲(11)...

第一单元&#xff1a;必考知识点08 项目质量管理(下)根本原因分析因果图因果图 Cause and Effect Diagram根本原因分析在被视为特殊偏差的不良结果与飞随机原因之间建立联系&#xff0c;基于这种联系&#xff0c;采取纠正措施&#xff0c;小区在控制图中呈现的特殊偏差。直方图…

nfa状态转换图正规式_0x02 从NFA到DFA

书接上文&#xff0c;上回说道NFA已经可以完全描述正则语言的全部内容。那么&#xff0c;我们在这一章探索一下一个比较复杂的正则表达式在用NFA做匹配的时候会有什么“不足“。NFA匹配的"不足"为了言之有物&#xff0c;不妨设要讨论的模式为d?(c(a|b)*)*(b|c)图1-1…

java filter教程_Java Web Filter 过滤器学习教程(推荐)

一、Filter简介Filter也称之为过滤器&#xff0c;它是Servlet技术中最激动人心的技术&#xff0c;WEB开发人员通过Filter技术&#xff0c;对web服务器管理的所有web资源&#xff1a;例如Jsp, Servlet, 静态图片文件或静态 html 文件等进行拦截&#xff0c;从而实现一些特殊的功…

vue修改计算属性的值_「Vue学习记录五」计算属性和侦听器

1&#xff1a; 计算属性&#xff1a; &#xff08;内置缓存机制&#xff09;当更改age的时候&#xff0c; fullName 函数不执行&#xff1b;当更改fristName的时候&#xff0c; fullName 函数才执行<div id "app"><span>{{fullName}}</span> <…

git为私有仓库设置密码_真香!在局域网下行云流水般使用git

最近公司要开发一个新的项目&#xff0c;开发一个新的项目就要有一个好的代码版本管理工具。对于代码开发版本控制工具&#xff0c;我们之前是使用svn这个代码版本控制工具&#xff0c;但是项目经理说统一使用git开发版本控制工具&#xff0c;来到这里我们一般会选择gitee或者g…

xss img onerror java_java后台防止XSS的脚本攻击

import java.util.regex.Pattern;//具体过滤关键字符public class XSSUtil {private static Pattern[] patterns new Pattern[]{// Script fragmentsPattern.compile("", Pattern.CASE_INSENSITIVE),// src...Pattern.compile("src[\r\n]*[\r\n]*\\\(.*?)\\\&…

网口监视报文工具_真是神器!这款网络排查工具!

常用的 ping&#xff0c;tracert&#xff0c;nslookup 一般用来判断主机的网络连通性&#xff0c;其实 Linux 下有一个更好用的网络联通性判断工具&#xff0c;它可以结合ping nslookup tracert 来判断网络的相关特性&#xff0c;这个命令就是 mtr。mtr 全称 my traceroute&…

snmp服务 2003 镜像_美国掌握全球70%根服务器,一旦对中国关闭,我们将无法上网?...

“如果在上网和男朋友(女朋友)之间只能选一个&#xff0c;你会选哪个&#xff1f;”曾经有媒体在街头做这样的调查&#xff0c;出人意料的是很多人都选择了“上网”&#xff1b;因为在现代年轻人看来&#xff0c;如果进入一个没有“不能上网”的生活实在太恐怖了&#xff0c;那…

java写入carbondata_Carbondata使用过程中遇到的几个问题及解决办法

本文总结了几个本人在使用 Carbondata 的时候遇到的几个问题及其解决办法。这里使用的环境是&#xff1a;Spark 2.1.0、Carbondata 1.2.0。必须指定 HDFS nameservices在初始化 CarbonSession 的时候&#xff0c;如果不指定 HDFS nameservices&#xff0c;在数据导入是没啥问题…

商品审核网页界面_Shopee新手指南:Shopee卖家中心用户界面介绍

1.Shopee各站点前台网页链接&#xff1a;2.Shopee各站点后台网页链接3.Shopee APP下载&#xff1a;安卓版下载链接&#xff1a;https://pan.baidu.com/s/1eSp8M1k#list/path%2Fios版&#xff1a;可在App Store中直接搜索下载使用。台湾站点ios版本请搜索关键字“虾皮”下载&…

pdf exe如何提取pdf文件_python应用:如何用python提取pdf文件中的文字

从pdf中提取文字&#xff0c;相信很多人都干过这事&#xff0c;怎么在python中实现呢&#xff0c;今天带大家看看。第一步导入库import PyPDF2第二步导入pdf文件pdf_file open(dataset/laban.1027.pdf, rb)第三步读取pdf并检查加密情况read_pdf PyPDF2.PdfFileReader(pdf_file…

java轻量分布式框架_5个强大的Java分布式缓存框架推荐

在开发中大型Java软件项目时&#xff0c;很多Java架构师都会遇到数据库读写瓶颈&#xff0c;如果你在系统架构时并没有将缓存策略考虑进去&#xff0c;或者并没有选择更优的 缓存策略&#xff0c;那么到时候重构起来将会是一个噩梦。本文主要是分享了5个常用的Java分布式缓存框…

python三次样条插值拟合的树行线_数学建模笔记——插值拟合模型(一)

啊好像距离上次写作又过了七天&#xff0c;啊好像我之前计划的一周两三篇&#xff0c;啊辣鸡小说毁我青春&#xff0c;啊我是一只可怜的鸽子。不管怎样&#xff0c;我又回来了&#xff0c;并坚定地更新着hhh。再过两三天就是我们学校数学建模选拔&#xff0c;再过八九天就是期末…

密度图的密度估计_不同类型的二维密度图小教程

R相关小教程链接&#xff1a;用R构建气泡图案例小教程【小教程】散点图、饼图怎么在我的文章中完美展示小教程热图在论文发表中完美呈现小教程R与密度、函数、变量的微妙关系北京市计算中心医用数据库建设解决方案更多内容&#xff0c;请关注“生信会议”公众号Different types…