59、Flink 的异步 IO 算子使用线程池查询 MySQL

1、概述

-----------Test1-----------
非静态 dataSource 和 executorService【一个并行度(Task 线程)一个实例】
分区1
dataSource=>915342614
executorService=>2120731873
分区2
dataSource=>1271767714
executorService=>844411403
并行度=2,固定线程池数量=2,即查询并发度 2线程2实例=4

静态 dataSource 和 executorService【一个TM(JVM 进程)一个实例】 分区1 dataSource=>1153938359 dataSource=>1153938359 分区2 executorService=>1974212788 executorService=>1974212788 并行度=2,固定线程池数量=2,即查询并发度 2线程1实例=2

-----------Test2-----------
timeout() 方法中设置 resultFuture.completeExceptionally(new Exception(input + “=获取数据超时”));
Caused by: java.lang.Exception: 1=获取数据超时

timeout() 方法中设置 resultFuture.complete(Collections.singleton(Tuple2.of(input, null))); 1> (1,null)

timeout() 方法中设置 resultFuture.complete(Collections.EMPTY_LIST); 无结果发出

-----------Test3-----------
AsyncDataStream.unorderedWait 模式下,并行度=2,固定线程池数量=2
设置 if(id1 || id2){TimeUnit.SECONDS.sleep(5);}
发送数据 1,2,3,4
输出结果
1> (3,c)
2> (4,d)
1> (1,a)
2> (2,b)

AsyncDataStream.orderedWait 模式下,并行度=2,固定线程池数量=2 设置 if(id==1 || id==2){TimeUnit.SECONDS.sleep(5);} 发送数据 1,2,3,4 输出结果 1> (1,a) 1> (3,c) 2> (2,b) 2> (4,d)

-----------Test4-----------
capacity=1,并行度=2,固定线程池数量=2,即查询并发度 2线程2实例=4
设置 TimeUnit.SECONDS.sleep(5);
发送数据 1,2,3,4,
间隔5秒输出
1> (1,a)
2> (2,b)
间隔5秒输出
1> (3,c)
2> (4,d)

capacity=2,并行度=2,固定线程池数量=2,即查询并发度 2线程2实例=4 设置 TimeUnit.SECONDS.sleep(5); 间隔5秒输出 2> (1,a) 1> (2,b) 2> (3,c) 1> (4,d)

结论:capacity=等待响应队列的容量=吞吐量

-----------Test5-----------
模拟查询超时异常=TimeUnit.SECONDS.sleep(10);,AsyncRetryStrategy=FixedDelayRetryStrategy

没有增加重试策略=>Caused by: java.lang.Exception: 1=获取数据超时 增加重试策略=>正常产出 1> (1,a) 2> (2,b)

2、代码示例

package com.xu.flink.datastream.day11;import com.alibaba.druid.pool.DruidDataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.apache.flink.streaming.util.retryable.AsyncRetryStrategies;
import org.apache.flink.streaming.util.retryable.RetryPredicates;import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.Collections;
import java.util.concurrent.*;
import java.util.function.Supplier;public class _05_AsyncMySqlQueryWithThreadPool {public static void main(String[] args) throws Exception {// 创建 Flink 执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);// 模拟source输出数据SingleOutputStreamOperator<Integer> line = env.socketTextStream("localhost", 8888).map(Integer::parseInt);// 创建异步算子-无重试// 无序
//        AsyncDataStream.unorderedWait(line,
//                        new MySqlAsyncFunction(2),
//                        8,
//                        TimeUnit.SECONDS,
//                        2)
//                .print();// 有序
//        AsyncDataStream.orderedWait(line,
//                        new MySqlAsyncFunction(2),
//                        8,
//                        TimeUnit.SECONDS,
//                        20)
//                .print();// 创建异步算子-重试// 创建重试策略AsyncRetryStrategies.FixedDelayRetryStrategy fixedDelayRetryStrategy = new AsyncRetryStrategies.FixedDelayRetryStrategyBuilder(3, 10000L).ifResult(RetryPredicates.EMPTY_RESULT_PREDICATE).ifException(RetryPredicates.HAS_EXCEPTION_PREDICATE).build();//        AsyncRetryStrategies.ExponentialBackoffDelayRetryStrategy backoffDelayRetryStrategy = new AsyncRetryStrategies.ExponentialBackoffDelayRetryStrategyBuilder<>(3, 2000L, 10, 2000L)
//                // 基于执行异常触发重试
//                .ifException(RetryPredicates.HAS_EXCEPTION_PREDICATE)
//                // 基于返回结果为空触发重试
//                .ifResult(RetryPredicates.EMPTY_RESULT_PREDICATE)
//                .build();// 无序AsyncDataStream.unorderedWaitWithRetry(line,new MySqlAsyncFunction(2),8,TimeUnit.SECONDS,2,fixedDelayRetryStrategy).print();// 有序
//        AsyncDataStream.orderedWaitWithRetry(line,
//                        new MySqlAsyncFunction(2),
//                        8,
//                        TimeUnit.SECONDS,
//                        2,
//                        backoffDelayRetryStrategy)
//                .print();// 触发执行env.execute();}
}/*** -----------Test1-----------* 非静态 dataSource 和 executorService【一个并行度(Task 线程)一个实例】* 分区1* dataSource=>915342614* executorService=>2120731873* 分区2* dataSource=>1271767714* executorService=>844411403* 并行度=2,固定线程池数量=2,即查询并发度 2线程*2实例=4* <p>* 静态 dataSource 和 executorService【一个TM(JVM 进程)一个实例】* 分区1* dataSource=>1153938359* dataSource=>1153938359* 分区2* executorService=>1974212788* executorService=>1974212788* 并行度=2,固定线程池数量=2,即查询并发度 2线程*1实例=2* -----------Test2-----------* timeout() 方法中设置 resultFuture.completeExceptionally(new Exception(input + "=获取数据超时"));* Caused by: java.lang.Exception: 1=获取数据超时* <p>* timeout() 方法中设置 resultFuture.complete(Collections.singleton(Tuple2.of(input, null)));* 1> (1,null)* <p>* timeout() 方法中设置 resultFuture.complete(Collections.EMPTY_LIST);* 无结果发出* -----------Test3-----------* AsyncDataStream.unorderedWait 模式下,并行度=2,固定线程池数量=2* 设置 if(id==1 || id==2){TimeUnit.SECONDS.sleep(5);}* 发送数据 1,2,3,4* 输出结果* 1> (3,c)* 2> (4,d)* 1> (1,a)* 2> (2,b)* <p>* AsyncDataStream.orderedWait 模式下,并行度=2,固定线程池数量=2* 设置 if(id==1 || id==2){TimeUnit.SECONDS.sleep(5);}* 发送数据 1,2,3,4* 输出结果* 1> (1,a)* 1> (3,c)* 2> (2,b)* 2> (4,d)* -----------Test4-----------* capacity=1,并行度=2,固定线程池数量=2,即查询并发度 2线程*2实例=4* 设置 TimeUnit.SECONDS.sleep(5);* 发送数据 1,2,3,4,* 间隔5秒输出* 1> (1,a)* 2> (2,b)* 间隔5秒输出* 1> (3,c)* 2> (4,d)* <p>* capacity=2,并行度=2,固定线程池数量=2,即查询并发度 2线程*2实例=4* 设置 TimeUnit.SECONDS.sleep(5);* 间隔5秒输出* 2> (1,a)* 1> (2,b)* 2> (3,c)* 1> (4,d)* <p>* 结论:capacity=等待响应队列的容量=吞吐量* -----------Test5-----------* 模拟查询超时异常=TimeUnit.SECONDS.sleep(10);,AsyncRetryStrategy=FixedDelayRetryStrategy* <p>* 没有增加重试策略=>Caused by: java.lang.Exception: 1=获取数据超时* 增加重试策略=>正常产出 1> (1,a) 2> (2,b)*/
class MySqlAsyncFunction extends RichAsyncFunction<Integer, Tuple2<Integer, String>> {private int maxConnTotal;private static ExecutorService executorService;private static DruidDataSource dataSource;// 传入最大链接数public MySqlAsyncFunction(int maxConnTotal) {this.maxConnTotal = maxConnTotal;}@Overridepublic void open(Configuration parameters) throws Exception {System.out.println("open 方法被调用");//创建一个线程池,实现并发提交请求executorService = Executors.newFixedThreadPool(maxConnTotal);//创建链接池(异步IO 一个请求对应一个线程,一个请求对应一个链接)dataSource = new DruidDataSource();dataSource.setDriverClassName("com.mysql.jdbc.Driver");dataSource.setUsername("root");dataSource.setPassword("root");dataSource.setUrl("jdbc:mysql://localhost:3306/xlink?characterEncoding=UTF-8&useSSL=false");dataSource.setMaxActive(maxConnTotal);System.out.println("dataSource=>" + dataSource.hashCode());System.out.println("executorService=>" + executorService.hashCode());}@Overridepublic void close() throws Exception {System.out.println("close 方法被调用");executorService.shutdown();dataSource.close();}@Overridepublic void asyncInvoke(Integer input, ResultFuture<Tuple2<Integer, String>> resultFuture) throws Exception {//使用线程池提交请求Future<String> future = executorService.submit(new Callable<String>() {@Overridepublic String call() throws Exception {return queryFromMySql(input);}});//获取请求结果CompletableFuture.supplyAsync(new Supplier<String>() {@Overridepublic String get() {try {return future.get();} catch (Exception e) {return null;}}}).thenAccept((String result) -> {resultFuture.complete(Collections.singleton(Tuple2.of(input, result)));});}@Overridepublic void timeout(Integer input, ResultFuture<Tuple2<Integer, String>> resultFuture) throws Exception {// 超时-发出异常resultFuture.completeExceptionally(new Exception(input + "=获取数据超时"));// 超时-发出空的 Join 结果
//        resultFuture.complete(Collections.singleton(Tuple2.of(input, null)));// 超时-不发出元素
//        resultFuture.complete(Collections.EMPTY_LIST);}/*** SQL 查询代码实现*/private String queryFromMySql(Integer id) throws Exception {String sql = "SELECT deptno,db_source FROM dept WHERE deptno = ?";String result = null;Connection connection = null;PreparedStatement stmt = null;ResultSet rs = null;try {connection = dataSource.getConnection();stmt = connection.prepareStatement(sql);stmt.setInt(1, id);rs = stmt.executeQuery();// 模拟超时异常TimeUnit.SECONDS.sleep(10);while (rs.next()) {result = rs.getString("db_source");}} finally {if (rs != null) {rs.close();}if (stmt != null) {stmt.close();}if (connection != null) {connection.close();}}return result;}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/36735.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

数智化赋能水务行业高质量发展

数智化赋能水务行业高质量发展是指通过数字化和智能化技术的应用&#xff0c;提升水务行业的管理效率、服务质量和运营安全&#xff0c;实现可持续发展。以下是数智化赋能水务行业高质量发展的几个关键方面&#xff1a; 1. 智能水务管理平台 集成数据中心&#xff1a;建立统一…

C#——命名空间详情

命名空间 在 C# 中&#xff0c;可以将命名空间看作是一个范围&#xff0c;用来标注命名空间中成员的归属&#xff0c;一个命名空间中类与另一个命名空间中同名的类互不冲突&#xff0c;但在同一个命名空间中类的名称必须是唯一的。 定义命名空间 定义命名空间需要使用 namesp…

JS在线加密简述

JS在线加密&#xff0c;是指&#xff1a;在线进行JS代码混淆加密。通过混淆、压缩、加密等手段&#xff0c;使得JS源代码难以阅读和理解。从而可以有效防止代码被盗用或抄袭&#xff0c;保护开发者的知识产权和劳动成果。常用的JS在线加密网站有&#xff1a;JShaman、JS-Obfusc…

怎么在vite项目中全局导入一个scss文件

怎么在vite项目中全局导入一个scss文件 &#x1f389;&#x1f389;&#x1f389;欢迎来到我的博客,我是一名自学了2年半前端的大一学生,熟悉的技术是JavaScript与Vue.目前正在往全栈方向前进, 如果我的博客给您带来了帮助欢迎您关注我,我将会持续不断的更新文章!!!&#x1f64…

如何快速找到CAN总线的故障节点?

如何快速找到CAN总线的故障节点&#xff1f; 1、节点&#xff08;数量不多的情况&#xff09;依次接入总线&#xff1a;将CAN节点一个一个往总线上接&#xff0c;每接一个节点后观察、测试总线通信状况。2、使用CAN转换器或接口类产品辅助排查。&#xff08;快速定位&#xff…

【Redis】Java操作Redis(Jedis客户端使用)

Redis不仅支持简单的键值存储&#xff0c;还提供了丰富的数据结构&#xff08;如列表、哈希表、集合等&#xff09;和强大的原子操作&#xff0c;使得它在存储和处理数据时非常高效。关于这些数据结构的学习可以学习下面的博客&#xff1a; 【Redis】String的常用命令及图解St…

OBD诊断(ISO15031) 01服务

文章目录 功能简介PID 的功能请求和响应1、read-supported PIDs1.1、请求1.2、肯定响应 2、read PID value1.1、请求1.2、肯定响应 3、同时请求多个PID3、同时读取多个PID数据 Parameter definition报文示例1、单个PID请求和读取2、多个PID请求和读取 功能简介 01服务&#xf…

XML 技术

XML 技术 XML(可扩展标记语言)是一种用于存储和传输数据的标记语言。它由万维网联盟(W3C)开发,并在1998年成为正式标准。XML的设计目标是既易于人类阅读,也易于机器解析。它是一种自描述的语言,允许用户定义自己的标签和文档结构。XML被广泛应用于各种领域,包括网络服…

【Ubuntu24.04无显示器远控】【Todesk远程桌面黑屏】【Linux虚拟显示器】解决方案

1️⃣版本 Ubuntu 24.04Todesk 4.7.2.0xserver-xorg-video-dummy 1:0.4.0-1build1 2️⃣安装配置虚拟显示器 sudo apt install xserver-xorg-video-dummy编辑/etc/gdm3/custom.conf&#xff0c;关闭Ubuntu24.04Wayland切换为X11 WaylandEnablefalse /usr/share/X11/xorg.con…

NDT(基于正态分布变换的配准算法)

NDT是将单个扫描的离散点集转换为空间上定义的分段连续可微概率密度&#xff0c;该概率密度由一组易于计算的正态分布组成的算法。采用NDT连续化后&#xff0c;传统硬离散优化问题能够潜在地转化为更易于处理的连续优化问题。 NDT原理 NDT将根据点云中点所处的位置&#xff0…

网络治理新模式:Web3时代的社会价值重构

随着Web3技术的崛起&#xff0c;传统的网络治理模式正在经历革新&#xff0c;这不仅仅是技术的进步&#xff0c;更是对社会价值观念的挑战和重构。本文将深入探讨Web3时代的网络治理新模式&#xff0c;其背后的技术基础、社会影响以及未来的发展方向。 1. 引言 Web3时代&#…

Java中的函数式编程入门

Java中的函数式编程入门 大家好&#xff0c;我是免费搭建查券返利机器人省钱赚佣金就用微赚淘客系统3.0的小编&#xff0c;也是冬天不穿秋裤&#xff0c;天冷也要风度的程序猿&#xff01;今天我来为大家介绍一下Java中的函数式编程。随着Java 8的发布&#xff0c;函数式编程成…

使用Spring Boot实现与ActiveMQ的消息队列集成

使用Spring Boot实现与ActiveMQ的消息队列集成 大家好&#xff0c;我是免费搭建查券返利机器人省钱赚佣金就用微赚淘客系统3.0的小编&#xff0c;也是冬天不穿秋裤&#xff0c;天冷也要风度的程序猿&#xff01; 消息队列在现代分布式系统中扮演着至关重要的角色&#xff0c;…

智慧营区人员管理系统|DW-S406系统特点

1、项目背景 当前我国军队正处于加紧完成机械化和信息化建设双重历史任务的阶段&#xff0c;现阶段我国对军事通信领域强有力的支持性产业政策&#xff0c;将为行业的未来发展提供有力保障。随着经济实力的不断增长&#xff0c;以及国际、周边政治局势的日趋复杂&#xff0c;我…

# 音频处理4_傅里叶变换

1.离散傅里叶变换 对于离散时域信号 x[n]使用离散傅里叶变换&#xff08;Discrete Fourier Transform, DFT&#xff09;进行频域分析。 DFT 将离散信号 x[n] 变换为其频谱表示 X[k]&#xff0c;定义如下&#xff1a; X [ k ] ∑ n 0 N − 1 x [ n ] e − j 2 π k n N X[k]…

详细学习es6扩展运算符

ES6中的扩展运算符&#xff08;Spread Operator&#xff09;是一种非常方便的语法&#xff0c;主要用于将可迭代对象&#xff08;比如数组、字符串等&#xff09;展开成多个参数。以下是关于ES6扩展运算符的详细内容&#xff1a; 用法&#xff1a; 在数组字面量中展开数组&am…

C# 超简单的离线人脸识别库 - ViewFaceCore

项目介绍 ViewFaceCore是一个基于 SeetaFace6 的 .NET 人脸识别解决方案。 项目特点 开源&#xff08;MIT license&#xff09;、免费、简单的离线人脸识别库。 跨平台&#xff08;适用于 Windows、MacOS 和 Linux &#xff09;。 .NET 框架 和 操作系统 封装完善的NuGet包…

《昇思25天学习打卡营第1天 | 快速入门 mindspore》

1. 背景&#xff1a; 今天开始参与 CSDN 的活动&#xff0c;使用 mindspore 学习神经网络&#xff0c;先学习的是第二小节&#xff1b; 2. 训练的内容&#xff1a; 第二小节内容&#xff1a; 使用 mindspore &#xff0c;构建 DenseNet&#xff0c;识别手写字符&#xff1b;…

[CAN] 通讯协议手动解析与手动打包 [手撕编码格式]

手动解析与手动打包 一、Intel格式编码1.1 报文解析。1.2 报文打包二、Motorola格式通讯协议2.1 报文解析。2.2 报文打包🙋 前言 CAN有两种编码格式:Intel编码格式 和 Motorola编码格式,本教程将分别对两种格式进行手动解析与手动打包。 一、Intel格式编码 假设已知雷达CAN…

医疗数据分析

我待过2家大公司做医疗的&#xff0c;发现了他们的共性&#xff0c;有很多通用的方法&#xff0c;先说数据&#xff0c;医疗数据中最麻烦的&#xff0c;我觉得就是检验数据。 为什么&#xff1f;因为检查的指标项多&#xff0c;占所有数据的百分之50以上。 指标歧义: 血红蛋…