CompletableFuture、ListenableFuture高级用列

CompletableFuture
链式

 public static void main(String[] args) throws Exception {CompletableFuture<Integer> thenCompose = T1().thenCompose(Compress::T2).thenCompose(Compress::T3);Integer result = thenCompose.get();System.out.println(result);}// 假设这些是异步操作,并返回CompletableFuture<Integer>public static CompletableFuture<Integer> T1() {return CompletableFuture.supplyAsync(() -> {// 模拟耗时操作try {Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}return 1;});}public static CompletableFuture<Integer> T2(int valueFromT1) {return CompletableFuture.supplyAsync(() -> {// 使用上一步的结果进行计算int result = valueFromT1 * 2;try {Thread.sleep(500);} catch (InterruptedException e) {throw new RuntimeException(e);}return result;});}public static CompletableFuture<Integer> T3(int valueFromT2) {return CompletableFuture.supplyAsync(() -> {// 使用上一步的结果进行计算int finalResult = valueFromT2 + 10;return finalResult;});}

异步操作集合对象入库

public void saveUsers(List<User> users) throws InterruptedException, ExecutionException {// 将大集合分成若干个小集合,每个大小为100(具体分片大小根据实际需求调整)List<List<User>> partitions = users.stream().collect(Collectors.groupingBy(it -> users.indexOf(it) / 100)).values().stream().collect(Collectors.toList());List<CompletableFuture<Void>> futures = new ArrayList<>();for (List<User> partition : partitions) {CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {partition.forEach(user -> {userRepository.save(user); // 假设这是你的保存方法});});futures.add(future);}// 等待所有任务完成CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get();}

CompletableFuture
异常

public static void saveTut(List<User> users) throws InterruptedException, ExecutionException{// 将大集合分成若干个小集合ThreadLocal<AtomicInteger> threadLocal = ThreadLocal.withInitial(() -> new AtomicInteger(0));List<List<User>> partitions = Lists.partition(users, 10);List<CompletableFuture<Void>> futures = new ArrayList<>();for (List<User> partition : partitions) {CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {partition.forEach(user -> {AtomicInteger value = threadLocal.get();value.set(user.getId());value.incrementAndGet();});});// 添加异常处理器future.exceptionally(ex -> {// 记录异常信息//System.out.println("===="+threadLocal.get().intValue());return null;});futures.add(future);}// 等待所有任务完成,并处理整体完成时的异常CompletableFuture<Void> allFutures = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));allFutures.thenAccept(v -> {System.out.println("All save tasks completed.");}).exceptionally(ex -> {// 记录整体完成时的异常//System.err.println(ex.getMessage());return null;});// 确保阻塞直到所有异步操作完成allFutures.get();
} 

ListenableFuture

public class ListProcessingExample  {private static final ListeningExecutorService EXECUTOR_SERVICE = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));public static void main(String[] args) throws InterruptedException {List<User> users = Arrays.asList(new User(1), new User(2), new User(3));//processUsersAsync001(users);}public static void processUsersAsync001(List<User> users) throws InterruptedException {// 将用户列表分为3个分区(根据实际情况调整分区数量)int partitionSize = (int) Math.ceil((double) users.size() / 3);//线程数List<List<User>> partitions = Lists.partition(users, partitionSize);List<ListenableFuture<Void>> futures = new ArrayList<>();for (List<User> partition : partitions) {ListenableFuture<Void> future = EXECUTOR_SERVICE.submit(() -> {partition.forEach(user -> {System.out.println("Processing user: " + user.getId());});return null;});Futures.addCallback(future, new FutureCallback<Void>() {@Overridepublic void onSuccess(Void result) {System.out.println("Successfully processed a batch of users.");}@Overridepublic void onFailure(Throwable t) {System.err.println("Error processing a batch of users, error: " + t.getMessage());}}, MoreExecutors.directExecutor());futures.add(future);}// 等待所有任务完成(这里为了演示阻塞等待,实际应用中可能不需要这一步,因为有回调处理结果)EXECUTOR_SERVICE.shutdown();EXECUTOR_SERVICE.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);}/*** 增加成功回调获取数据*/public static void processUsersAsync002(List<User> users) throws InterruptedException {// 将用户列表分为3个分区(根据实际情况调整分区数量)int partitionSize = (int) Math.ceil((double) users.size() / 3);List<List<User>> partitions = Lists.partition(users, partitionSize);List<ListenableFuture<ProcessedUserResult>> futures = new ArrayList<>();for (List<User> partition : partitions) {ListenableFuture<ProcessedUserResult> future = EXECUTOR_SERVICE.submit(() -> {ProcessedUserResult result = new ProcessedUserResult();partition.forEach(user -> {System.out.println("Processing user: " + user.getId());result.successfulIds.add(user.getId());});return result;});Futures.addCallback(future, new FutureCallback<ProcessedUserResult>() {@Overridepublic void onSuccess(ProcessedUserResult result) {System.out.println("Successfully processed users with IDs: " + result.successfulIds);}@Overridepublic void onFailure(Throwable t) {System.err.println("Error processing a batch of users, error: " + t.getMessage());}}, MoreExecutors.directExecutor());futures.add(future);}// 等待所有任务完成(这里为了演示阻塞等待,实际应用中可能不需要这一步,因为有回调处理结果)EXECUTOR_SERVICE.shutdown();EXECUTOR_SERVICE.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);}static class ProcessedUserResult {// 用于存储成功处理的用户ID列表private List<Integer> successfulIds = new ArrayList<>();// 用于存储失败处理的用户ID列表private List<Integer> failedIds = new ArrayList<>();public void addSuccessfulId(int userId) {this.successfulIds.add(userId);}public List<Integer> getSuccessfulIds() {return successfulIds;}public void addFailedId(int userId) {this.failedIds.add(userId);}public List<Integer> getFailedIds() {return failedIds;}}static class User {private int id;public User(int id) {this.id = id;}public int getId() {return id;}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/617307.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【Oracle】Oracle编程PLSQL

Oracle编程 一、PL/SQL 1、PL/SQL概述 PL/SQL&#xff08;Procedure Language/SQL&#xff09;是 Oracle 对 sql 语言的过程化扩展&#xff0c;使 SQL 语言具有过程处理能力。 基本语法结构 [declare -- 声明变量 ]begin-- 代码逻辑 [exception-- 异常处理 ]end;2、变量 …

centos7下升级openssh9.4p1及openssl1.1.1v版本

背景&#xff1a;客户服务器扫描出一些漏洞&#xff0c;发现和版本有关&#xff0c;漏洞最高的版本是9.3p2&#xff0c;所以我们安装一个openssh9.4p1版本及openssl1.1.1v版本 虽然我们进行了镜像备份&#xff0c;为了安全先安装telnet以防止升级失败无法通过ssh连接服务器 一…

【会议征稿通知】第二届数字化经济与管理科学国际学术会议(CDEMS 2024)

第二届数字化经济与管理科学国际学术会议&#xff08;CDEMS 2024&#xff09; 2024 2nd International Conference on Digital Economy and Management Science&#xff08;CDEMS 2024&#xff09; 2024年第二届数字经济与管理科学国际会议(CDEMS 2024) 定于2023年4月26-28日…

如何使用统计鸟网站统计分析网站流量来源?

统计鸟官网地址&#xff1a;https://www.tongjiniao.com/ 站长必备&#xff01;网站数据统计&#xff0c;流量监测平台 提供网站数据统计分析、搜索关键词、流量访问来源等服务 深入分析用户点击习惯&#xff0c;为智能化运营网站提供更好的用户体验 目录 一、注册账号信息 二…

基于博弈树的开源五子棋AI教程[3 极大极小搜索]

基于博弈树的开源五子棋AI教程[3 极大极小搜索] 引子极大极小搜索原理alpha-beta剪枝负极大搜索尾记 引子 极大极小搜索是博弈树搜索中最常用的算法&#xff0c;广泛应用于各类零和游戏中&#xff0c;例如象棋&#xff0c;围棋等棋类游戏。算法思想也是合乎人类的思考逻辑的&a…

Flask+ Dependency-injecter+pytest 写测试类

最近在使用这几个在做项目&#xff0c;因为第一次用这个&#xff0c;所以不免有些问题。总结下踩的坑 1.测试类位置 首先测试类约定会放在tests里面&#xff0c;不然有可能发生引入包的问题&#xff0c;会报错某些包找不到。 2. 测试类依赖注入 这里我就用的真实的数据库操作…

Js--数组(三)

1.什么是数组&#xff1f; 数组&#xff1a;(Array)是一种可以按顺序保存数据的数据类型 2.为什么要数组&#xff1f; 思考&#xff1a;如果我想保存一个班里所有同学的姓名怎么办&#xff1f; 场景&#xff1a;如果有多个数据可以用数组保存起来&#xff0c;然后放到一个变量…

MacBook安装Docker

MacBook安装Docker dmg包安装 官方下载dmg安装包: https://docs.docker.com/desktop/mac/install/ 点击安装&#xff0c;然后启动 二进制安装 官方下载二进制包: https://download.docker.com/mac/static/stable/x86_64/ tar -zxvf docker-20.10.0.tgz#将解压出来的docke…

Apppium driver的一些比较重要操作,原生APP和H5 APP(WEBVIEW)

1.reset() //重置app 这时候driver会重置&#xff0c;相当于卸载重装应用。所以本地缓存会失效 driver.reset() 2.start_activity(包名,activity名) //启动app的某一个activity 例如&#xff1a;driver.start_activity("com.wuba.zhuanzhuan","./presentatio…

9个Linux网络命令

这些命令用于监控连接、排除网络故障、路由选择、DNS 查询和接口配置。 1. ping – 向网络主机发送 ICMP ECHO_REQUEST ping 是用于测试网络连接的最流行的网络终端工具。ping 有很多选项&#xff0c;但在大多数情况下&#xff0c;您将使用它来请求域或IP地址&#xff1a; p…

【AI视野·今日CV 计算机视觉论文速览 第285期】Mon, 8 Jan 2024

AI视野今日CS.CV 计算机视觉论文速览 Mon, 8 Jan 2024 Totally 66 papers &#x1f449;上期速览✈更多精彩请移步主页 Daily Computer Vision Papers Denoising Vision Transformers Authors Jiawei Yang, Katie Z Luo, Jiefeng Li, Kilian Q Weinberger, Yonglong Tian, Yue…

【漏洞复现】Apache Tomcat AJP文件包含漏洞(CVE-2020-1938)

Nx01 产品简介 Apache Tomcat 是一个免费的开源 Web 应用服务器&#xff0c;在中小型企业和个人开发用户中有着广泛的应用。 Nx02 漏洞描述 默认情况下&#xff0c;Apache Tomcat会开启AJP连接器&#xff0c;由于AJP服务&#xff08;8009端口&#xff09;存在文件包含缺陷&…

打造高性价比小程序,轻松降低成本

随着移动互联网的普及&#xff0c;小程序已经成为一个热门的应用开发方向。然而&#xff0c;对于许多企业和个人而言&#xff0c;制作一个小程序的费用却让人望而却步。那么&#xff0c;如何以最低的成本制作一款高性价比的小程序呢&#xff1f; 答案很简单&#xff0c;只需要找…

Spark SQL基础

SparkSQL基本介绍 什么是Spark SQL Spark SQL是Spark多种组件中其中一个,主要是用于处理大规模的结构化数据 什么是结构化数据: 一份数据, 每一行都有固定的列, 每一列的类型都是一致的 我们将这样的数据称为结构化的数据 例如: mysql的表数据 1 张三 20 2 李四 15 3 王五 1…

支付宝异步验签踩的坑

最近公司要做支付宝小程序 我作为服务端就要给小程序配置下单啊&#xff0c;异步回调同步支付状态等功能 就不可避免的使用到了支付宝异步验签 首先背景是我是PHP语言&#xff0c;然后验签方式是RSA2 一开始写原生验签方法&#xff0c;验签失败&#xff0c;后面又搞sdk 验签…

Java进阶十—JDBC

Java进阶十—JDBC 一.说明 用Java语言操作Mysql&#xff0c;首先需要学习Mysql MySQL入门教程-CSDN博客 二.JDBC的由来以及定义 JDBC是什么&#xff1f; Java数据库连接(Java Database Connectivity)简称JDBCJDBC是Java操作各数据库的一种规范&#xff0c;是Java语言中用来…

ChatGPT:人工智能与人类交流的桥梁

在人工智能的浪潮中&#xff0c;ChatGPT以其独特的交流能力成为了一个亮点。作为一个基于强大的GPT-4模型的聊天机器人&#xff0c;ChatGPT不仅仅是技术的展示&#xff0c;它更是人工智能与人类交流的桥梁。 人工智能的语言理解革命 ChatGPT的出现标志着人工智能在语言理解和…

攻防实战-手把手带你打穿内网

六朝何事&#xff0c;只成门户私计&#xff01; 目录 环境配置 网络配置 本次实战绘制出来的网络拓扑图如下&#xff1a; 第一层&#xff1a;12server-web1 信息搜集 网站url&#xff1a; 目录扫描 扫到后台地址&#xff1a; 发现有注册功能&#xff0c; 先注册一下尝试能…

浅谈Vue中的NextTick。

一、NextTick() 等待下一次 DOM 更新刷新的工具方法。 当你在 Vue 中更改响应式状态时&#xff0c;最终的 DOM 更新并不是同步生效的&#xff0c;而是由 Vue 将它们缓存在一个队列中&#xff0c;直到下一个“tick”才一起执行。这样是为了确保每个组件无论发生多少状态改变&…

预约上门按摩系统目前面临的挑战有哪些

按摩预约上门服务系统上线之后在运营的过程中主要面临的挑战主要有以下几个方面&#xff1a; 1.技师管理和培训&#xff1a;为了保证服务的质量&#xff0c;需要对技师进行管理和培训。这包括确保技师具备必要的技能和资格&#xff0c;以及提供必要的培训&#xff0c;以确保他们…