《开发实战》17 | 异步处理好用,但非常容易用错

大多数业务项目都是由同步处理、异步处理和定时任务处理三种模式相辅相成实现的。
区别于同步处理,异步处理无需同步等待流程处理完毕,因此适用场景主要包括:

  • 服务于主流程的分支流程。比如,在注册流程中,把数据写入数据库的操作是主流程,但注册后给用户发优惠券或欢迎短信的操作是分支流程,时效性不那么强,可以进行异步处理。
  • 用户不需要实时看到结果的流程。比如,下单后的配货、送货流程完全可以进行异步处理,每个阶段处理完成后,再给用户发推送或短信让用户知晓即可。

异步处理因为可以有 MQ 中间件的介入用于任务的缓冲的分发,所以相比于同步处理,在应对流量洪峰、实现模块解耦和消息广播方面有功能优势。
用三个代码案例结合目前常用的 MQ 系统 RabbitMQ讲解

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

异步处理需要消息补偿闭环

使用类似 RabbitMQ、RocketMQ 等 MQ 系统来做消息队列实现异步处理,虽然说消息可以落地到磁盘保存,即使 MQ 出现问题消息数据也不会丢失,但是异步流程在消息发送、传输、处理等环节,都可能发生消息丢失。此外,任何 MQ 中间件都无法确保 100% 可用,需要考虑不可用时异步流程如何继续进行。
对于异步处理流程,必须考虑补偿或者说建立主备双活流程
我们来看一个用户注册后异步发送欢迎消息的场景。用户注册落数据库的流程为同步流程,会员服务收到消息后发送欢迎消息的流程为异步流程。
image.png

  • 蓝色的线,使用 MQ 进行的异步处理,我们称作主线,可能存在消息丢失的情况(虚线代表异步调用);
  • 绿色的线,使用补偿 Job 定期进行消息补偿,我们称作备线,用来补偿主线丢失的消息;
  • 考虑到极端的 MQ 中间件失效的情况,我们要求备线的处理吞吐能力达到主线的能力水平。

实现代码
首先,定义 UserController 用于注册 + 发送异步消息。对于注册方法,我们一次性注册 10 个用户,用户注册消息不能发送出去的概率为 50%。

@RestController
@Slf4j
@RequestMapping("user")
public class UserController {@Autowiredprivate UserService userService;@Autowiredprivate RabbitTemplate rabbitTemplate;@GetMapping("register")public void register() {//模拟10个用户注册IntStream.rangeClosed(1, 10).forEach(i -> {//落库User user = userService.register();//模拟50%的消息可能发送失败if (ThreadLocalRandom.current().nextInt(10) % 2 == 0) {//通过RabbitMQ发送消息rabbitTemplate.convertAndSend(RabbitConfiguration.EXCHANGE, RabbitConfiguration.ROUTING_KEY, user);log.info("sent mq user {}", user.getId());}});}
}

然后,定义 MemberService 类用于模拟会员服务。会员服务监听用户注册成功的消息,并发送欢迎短信。
我们使用 ConcurrentHashMap 来存放那些发过短信的用户 ID 实现幂等,避免相同的用户进行补偿时重复发送短信:

@Component
@Slf4j
public class MemberService {// 发送欢迎消息的状态private Map<Long, Boolean> welcomeStatus = new ConcurrentHashMap<>();// 监听用户注册成功的消息,发送欢迎消息@RabbitListener(queues = RabbitConfiguration.QUEUE)public void listen(User user) {log.info("receive mq user {}", user.getId());welcome(user);}//发送欢迎消息public void welcome(User user) {//去重操作if (welcomeStatus.putIfAbsent(user.getId(), true) == null) {try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {}log.info("memberService: welcome new user {}", user.getId());}}
}

对于 MQ 消费程序,处理逻辑务必考虑去重(支持幂等),原因有几个:

  • MQ 消息可能会因为中间件本身配置错误、稳定性等原因出现重复。
  • 自动补偿重复,比如本例,同一条消息可能既走 MQ 也走补偿,肯定会出现重复,而且考虑到高内聚,补偿 Job 本身不会做去重处理。
  • 人工补偿重复。出现消息堆积时,异步处理流程必然会延迟。如果我们提供了通过后台进行补偿的功能,那么在处理遇到延迟的时候,很可能会先进行人工补偿,过了一段时间后处理程序又收到消息了,重复处理。我之前就遇到过一次由 MQ 故障引发的事故,MQ 中堆积了几十万条发放资金的消息,导致业务无法及时处理,运营以为程序出错了就先通过后台进行了人工处理,结果 MQ 系统恢复后消息又被重复处理了一次,造成大量资金重复发放。

定义补偿 Job 也就是备线操作
我们在 CompensationJob 中定义一个 @Scheduled 定时任务,5 秒做一次补偿操作,因为 Job 并不知道哪些用户注册的消息可能丢失,所以是全量补偿,补偿逻辑是:每 5 秒补偿一次,按顺序一次补偿 5 个用户,下一次补偿操作从上一次补偿的最后一个用户 ID 开始;对于补偿任务我们提交到线程池进行“异步”处理,提高处理能力

@Component
@Slf4j
public class CompensationJob {// 补偿Job异步处理线程池private static ThreadPoolExecutor compensationThreadPool = new ThreadPoolExecutor(10, 10,1, TimeUnit.HOURS,new ArrayBlockingQueue<>(1000),new ThreadFactoryBuilder().setNameFormat("compensation-threadpool-%d").get());@Autowiredprivate UserService userService;@Autowiredprivate MemberService memberService;// 目前补偿到哪个用户IDprivate long offset = 0;// 10秒后开始补偿,5秒补偿一次@Scheduled(initialDelay = 10_000, fixedRate = 5_000)public void compensationJob() {log.info("开始从用户ID {} 补偿", offset);// 获取从offset开始的用户userService.getUsersAfterIdWithLimit(offset, 5).forEach(user -> {compensationThreadPool.execute(() -> memberService.welcome(user));offset = user.getId();});}
}

为了实现高内聚,主线和备线处理消息,最好使用同一个方法。
本例中 MemberService 监听到 MQ 消息和 CompensationJob 补偿,调用的都是 welcome 方法。
此外值得一说的是,Demo 中的补偿逻辑比较简单,生产级的代码应该在以下几个方面进行加强:

  • 考虑配置补偿的频次、每次处理数量,以及补偿线程池大小等参数为合适的值,以满足补偿的吞吐量。
  • 考虑备线补偿数据进行适当延迟。比如,对注册时间在 30 秒之前的用户再进行补偿,以方便和主线 MQ 实时流程错开,避免冲突。
  • 诸如当前补偿到哪个用户的 offset 数据,需要落地数据库。
  • 补偿 Job 本身需要高可用,可以使用类似 XXLJob 或 ElasticJob 等任务系统。

上面的补偿代码还是可以的,就是补偿有一点点延迟。

注意消息模式是广播还是工作队列

消息广播,和我们平时说的“广播”意思差不多,就是希望同一条消息,不同消费者都能分别消费;而队列模式,就是不同消费者共享消费同一个队列的数据,相同消息只能被某一个消费者消费一次。
同一个用户的注册消息,会员服务需要监听以发送欢迎短信,营销服务同样需要监听以发送新用户小礼物。但是,会员服务、营销服务都可能有多个实例,我们期望的是同一个用户的消息,可以同时广播给不同的服务(广播模式),但对于同一个服务的不同实例(比如会员服务 1 和会员服务 2),不管哪个实例来处理,处理一次即可(工作队列模式):
image.png

//为了代码简洁直观,我们把消息发布者、消费者、以及MQ的配置代码都放在了一起
@Slf4j
@Configuration
@RestController
@RequestMapping("workqueuewrong")
public class WorkQueueWrong {private static final String EXCHANGE = "newuserExchange";@Autowiredprivate RabbitTemplate rabbitTemplate;@GetMappingpublic void sendMessage() {rabbitTemplate.convertAndSend(EXCHANGE, "", UUID.randomUUID().toString());}//使用匿名队列作为消息队列@Beanpublic Queue queue() {return new AnonymousQueue();}//声明DirectExchange交换器,绑定队列到交换器@Beanpublic Declarables declarables() {DirectExchange exchange = new DirectExchange(EXCHANGE);return new Declarables(queue(), exchange,BindingBuilder.bind(queue()).to(exchange).with(""));}//监听队列,队列名称直接通过SpEL表达式引用Bean@RabbitListener(queues = "#{queue.name}")public void memberService(String userName) {log.info("memberService: welcome message sent to new user {} from {}", userName, System.getProperty("server.port"));}
}   

目前这样,一个服务的两个实例都接收到了消息
image.png

private static final String QUEUE = "newuserQueue";
@Bean
public Queue queue() {return new Queue(QUEUE);
}

第二步,进一步完整实现用户服务需要广播消息给会员服务和营销服务的逻辑。
我们希望会员服务和营销服务都可以收到广播消息,但会员服务或营销服务中的每个实例只需要收到一次消息。
代码如下,我们声明了一个队列和一个广播交换器 FanoutExchange,然后模拟两个用户服务和两个营销服务:

@Slf4j
@Configuration
@RestController
@RequestMapping("fanoutwrong")
public class FanoutQueueWrong {private static final String QUEUE = "newuser";private static final String EXCHANGE = "newuser";@Autowiredprivate RabbitTemplate rabbitTemplate;@GetMappingpublic void sendMessage() {rabbitTemplate.convertAndSend(EXCHANGE, "", UUID.randomUUID().toString());}//声明FanoutExchange,然后绑定到队列,FanoutExchange绑定队列的时候不需要routingKey@Beanpublic Declarables declarables() {Queue queue = new Queue(QUEUE);FanoutExchange exchange = new FanoutExchange(EXCHANGE);return new Declarables(queue, exchange,BindingBuilder.bind(queue).to(exchange));}// 会员服务实例1@RabbitListener(queues = QUEUE)public void memberService1(String userName) {log.info("memberService1: welcome message sent to new user {}", userName);}//会员服务实例2@RabbitListener(queues = QUEUE)public void memberService2(String userName) {log.info("memberService2: welcome message sent to new user {}", userName);}//营销服务实例1@RabbitListener(queues = QUEUE)public void promotionService1(String userName) {log.info("promotionService1: gift sent to new user {}", userName);}//营销服务实例2@RabbitListener(queues = QUEUE)public void promotionService2(String userName) {log.info("promotionService2: gift sent to new user {}", userName);}
}

通过日志可以发现,一条用户注册的消息,要么被会员服务收到,要么被营销服务收到,显然这不是广播
其实,广播交换器非常简单,它会忽略 routingKey,广播消息到所有绑定的队列。在这个案例中,两个会员服务和两个营销服务都绑定了同一个队列,所以这四个服务只能收到一次消息:

修改方式很简单,我们把队列进行拆分,会员和营销两组服务分别使用一条独立队列绑定到广播交换器即可

@Slf4j
@Configuration
@RestController
@RequestMapping("fanoutright")
public class FanoutQueueRight {private static final String MEMBER_QUEUE = "newusermember";private static final String PROMOTION_QUEUE = "newuserpromotion";private static final String EXCHANGE = "newuser";@Autowiredprivate RabbitTemplate rabbitTemplate;@GetMappingpublic void sendMessage() {rabbitTemplate.convertAndSend(EXCHANGE, "", UUID.randomUUID().toString());}@Beanpublic Declarables declarables() {//会员服务队列Queue memberQueue = new Queue(MEMBER_QUEUE);//营销服务队列Queue promotionQueue = new Queue(PROMOTION_QUEUE);//广播交换器FanoutExchange exchange = new FanoutExchange(EXCHANGE);//两个队列绑定到同一个交换器return new Declarables(memberQueue, promotionQueue, exchange,BindingBuilder.bind(memberQueue).to(exchange),BindingBuilder.bind(promotionQueue).to(exchange));}@RabbitListener(queues = MEMBER_QUEUE)public void memberService1(String userName) {log.info("memberService1: welcome message sent to new user {}", userName);}@RabbitListener(queues = MEMBER_QUEUE)public void memberService2(String userName) {log.info("memberService2: welcome message sent to new user {}", userName);}@RabbitListener(queues = PROMOTION_QUEUE)public void promotionService1(String userName) {log.info("promotionService1: gift sent to new user {}", userName);}@RabbitListener(queues = PROMOTION_QUEUE)public void promotionService2(String userName) {log.info("promotionService2: gift sent to new user {}", userName);}
}

对于每一条 MQ 消息,会员服务和营销服务分别都会收到一次,一条消息广播到两个服务的同时,在每一个服务的两个实例中通过轮询接收:

别让死信堵塞了消息队列

在很多时候,消息队列的堆积堵塞,是因为有大量始终无法处理的消息。
用户服务在用户注册后发出一条消息,会员服务监听到消息后给用户派发优惠券,但因为用户并没有保存成功,会员服务处理消息始终失败,消息重新进入队列,然后还是处理失败。这种在 MQ 中像幽灵一样回荡的同一条消息,就是死信。
随着 MQ 被越来越多的死信填满,消费者需要花费大量时间反复处理死信,导致正常消息的消费受阻,最终 MQ 可能因为数据量过大而崩溃。
我们更希望的逻辑是,对于同一条消息,能够先进行几次重试,解决因为网络问题导致的偶发消息处理失败,如果还是不行的话,再把消息投递到专门的一个死信队列。对于来自死信队列的数据,我们可能只是记录日志发送报警,即使出现异常也不会再重复投递。整个逻辑如下图所示:
image.png

//定义死信交换器和队列,并且进行绑定
@Bean
public Declarables declarablesForDead() {Queue queue = new Queue(Consts.DEAD_QUEUE);DirectExchange directExchange = new DirectExchange(Consts.DEAD_EXCHANGE);return new Declarables(queue, directExchange,BindingBuilder.bind(queue).to(directExchange).with(Consts.DEAD_ROUTING_KEY));
}
//定义重试操作拦截器
@Bean
public RetryOperationsInterceptor interceptor() {return RetryInterceptorBuilder.stateless().maxAttempts(5) //最多尝试(不是重试)5次.backOffOptions(1000, 2.0, 10000) //指数退避重试.recoverer(new RepublishMessageRecoverer(rabbitTemplate, Consts.DEAD_EXCHANGE, Consts.DEAD_ROUTING_KEY)) //重新投递重试达到上限的消息.build();
}
//通过定义SimpleRabbitListenerContainerFactory,设置其adviceChain属性为之前定义的RetryOperationsInterceptor来启用重试拦截器
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);factory.setAdviceChain(interceptor());return factory;
}
//死信队列处理程序
@RabbitListener(queues = Consts.DEAD_QUEUE)
public void deadHandler(String data) {log.error("got dead message {}", data);
}

可以通过增加消费线程来避免性能问题,如下我们直接设置 concurrentConsumers 参数为 10,来增加到 10 个工作线程:

@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);factory.setAdviceChain(interceptor());factory.setConcurrentConsumers(10);return factory;
}	


我之前做过一个demo 是基于canal做mysql数据同步,需要将解析好的数据发到kafka里面,再进行处理。在使用的时候发现这么一个问题,就是kafka多partition消费时不能保证消息的顺序消费,进而导致mysql数据同步异常。由于kafka可以保证在同一个partition内消息有序,于是我自定义了一个分区器,将数据的id取hashcode然后根据partition的数量取余作为分区号,保证同一条数据的binlog能投递到同一个partition中,从而达到消息顺序消费的目的。

在用户注册后发送消息到 MQ,然后会员服务监听消息进行异步处理的场景下,有些时候会发现,虽然用户服务先保存数据再发送 MQ,但会员服务收到消息后去查询数据库,却发现数据库中还没有新用户的信息。
当时倒不是因为主从的问题,而是因为业务代码把保存数据和发MQ消息放在了一个事务中,有概率收到消息的时候事务还没有提交完成,当时开发同学的处理方式是收MQ消息的时候sleep 1秒,或许应该是先提交事务,完成后再发MQ消息,但是这又出来一个问题MQ消息发送失败怎么办?所以后来演化为建立本地消息表来确保MQ消息可补偿,把业务处理和保存MQ消息到本地消息表操作在相同事务内处理,然后异步发送和补偿发送消息表中的消息到MQ

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/86603.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

SpringAOP入门案例

package com.elf.spring.aop.aspectj; /*** author 45* version 1.0*/ public interface UsbInterface {public void work(); }package com.elf.spring.aop.aspectj; import org.springframework.stereotype.Component; /*** author 45* version 1.0*/ Component //把Phone对象…

什么是WhatsApp群发,WhatsApp协议,WhatsApp云控

那么WhatsApp群控云控可以做什么呢&#xff1f; 1、获客 自动化引流&#xff0c;强大的可控性&#xff0c;产品快速拓客 2、导流 一键式傻瓜化自动加好友&#xff0c;群发&#xff0c;朋友圈营销 3、群控 一键式拉群好友&#xff0c;建群&#xff0c;进群 …

力扣236 补9.14

做不来&#xff0c;我做中等题基本上都是没有思路&#xff0c;这里需要先遍历祖先节点&#xff0c;那必然用先序遍历&#xff0c;这题还是官方题解容易理解&#xff0c;第二火的题解反而把我弄得脑袋昏昏的。 class Solution { TreeNode ans; public TreeNode lowestCommonAnce…

公众号迁移多久可以完成?

公众号账号迁移的作用是什么&#xff1f;只能变更主体吗&#xff1f;长期以来&#xff0c;由于部分公众号在注册时&#xff0c;主体不准确的历史原因&#xff0c;或者公众号主体发生合并、分立或业务调整等现实状况&#xff0c;在公众号登记主体不能对应实际运营人的情况下&…

Django之视图

一&#xff09;文件与文件夹 当我们设定好一个Djiango项目时&#xff0c;里面会有着view.py等文件&#xff0c;也就是文件的方式&#xff1a; 那么我们在后续增加app等时&#xff0c;view.py等文件会显得较为臃肿&#xff0c;当然也根据个人习惯&#xff0c;这时我们可以使用…

2023-9-23 最大不相交区间数量

题目链接&#xff1a;最大不相交区间数量 #include <iostream> #include <algorithm>using namespace std;const int N 100010;int n;struct Range {int l, r;bool operator< (const Range &W) const {return r < W.r;} }range[N];int main() {cin >…

IDEA最新激 20活23码

人狠话不多 大家好&#xff0c;最近Intelli Idea官方的校验规则进行了更新&#xff0c;之前已经成功激20活23的Idea可能突然无法使用了。 特地从网上整理了最新、最稳定的激20活23码分享给大家&#xff0c;希望可以帮助那些苦苦为寻找Idea激20活23码而劳累的朋友们。 本激23…

前端框架之争:Vue.js vs. React.js vs. Angular

文章目录 Vue.js - 渐进式框架的魅力简单易用组件化开发生态系统和工具适用场景 React.js - 高性能的虚拟DOM虚拟DOM单向数据流社区和生态系统适用场景 Angular - 一站式框架完整的框架双向数据绑定类型安全适用场景 如何选择&#xff1f;项目规模生态系统技能和经验性能需求 结…

MyBatis基础之SqlSession

SqlSession 线程安全问题 当你翻看 SqlSession 的源码时&#xff0c;你会发现它只是一个接口。我们通过 MyBatis 操作数据库&#xff0c;实际上就是通过 SqlSession 获取一个 JDBC 链接&#xff0c;然后操作数据库。 SqlSession 接口有 3 个实现类&#xff1a; #实现类1Defa…

【Java 基础篇】Java函数式接口详解

Java是一门强类型、面向对象的编程语言&#xff0c;但在Java 8引入了函数式编程的概念&#xff0c;这为我们提供了更多灵活的编程方式。函数式接口是函数式编程的核心概念之一&#xff0c;本文将详细介绍Java函数式接口的概念、用法以及一些实际应用。 什么是函数式接口&#…

第一个Servlet程序

目录 一、Servlet是什么 二、第一个Servlet项目 2.1 创建Maven项目 2.2 引入Servlet依赖 2.3 创建目录 三、Servlet启动 3.1 编写代码 3.2 打包程序 3.3 部署程序 四、更便捷的部署方式 4.1 安装Smart Tomcat插件 一、Servlet是什么 Servlet 是一种实现动态页面的技术。是一组…

LeetCode 75-02:字符串的最大公因子

前置知识&#xff1a;使用欧几里得算法求出最大公约数 func gcdOfStrings(str1 string, str2 string) string {if str1str2 ! str2str1 {return ""}return str1[:gcd(len(str1), len(str2))] }func gcd(a, b int)int{if b 0{return a}return gcd(b, a%b) }

【C语言精髓 之 指针】指针*、取地址、解引用*、引用

/*** file * author jUicE_g2R(qq:3406291309)————彬(bin-必应)* 一个某双流一大学通信与信息专业大二在读 * copyright 2023.9* COPYRIGHT 原创技术笔记&#xff1a;转载需获得博主本人同意&#xff0c;且需标明转载源* language …

【postgresql 】 ERROR: “name“ is not supported as an alias

org.postgresql.util.PSQLException: ERROR: "name" is not supported as an alias 错误&#xff1a;不支持将“name”作为别名 SELECT real_name name FROM doc_user 加上 在关键词上加上 “” 示例&#xff1a; SELECT real_name "name" FROM do…

“Vue进阶:深入理解插值、指令、过滤器、计算属性和监听器“

目录 引言&#xff1a;Vue的插值Vue的指令Vue的过滤器Vue的计算属性和监听器vue购物车案例总结&#xff1a; 引言&#xff1a; Vue.js是一款流行的JavaScript框架&#xff0c;它提供了许多强大的功能来简化前端开发。在本篇博客中&#xff0c;我们将深入探讨Vue的一些高级特性…

Java项目:SSM的网上书城系统

作者主页&#xff1a;Java毕设网 简介&#xff1a;Java领域优质创作者、Java项目、学习资料、技术互助 文末获取源码 一、相关文档 1、关于雅博书城在线系统的基本要求 &#xff08;1&#xff09;功能要求&#xff1a;可以管理个人中心、用户管理、图书分类管理、图书信息管理、…

Java 函数式编程思考 —— 授人以渔

引言 最近在使用函数式编程时&#xff0c;突然有了一点心得体会&#xff0c;简单说&#xff0c;用好了函数式编程&#xff0c;可以极大的实现方法调用的解耦&#xff0c;业务逻辑高度内聚&#xff0c;同时减少不必要的分支语句&#xff08;if-else&#xff09;。 一、函数式编…

clickhouse学习之路----clickhouse的特点及安装

clickhouse学习笔记 反正都有学不完的技术&#xff0c;不如就学一学clickhouse吧 文章目录 clickhouse学习笔记clickhouse的特点1.列式存储2. DBMS 的功能3.多样化引擎4.高吞吐写入能力5.数据分区与线程级并行 clickhouse安装1.关闭防火墙2.CentOS 取消打开文件数限制3.安装依…

Python 运行代码

一、Python运行代码 可以使用三种方式运行Python&#xff0c;如下&#xff1a; 1、交互式 通过命令行窗口进入 Python 并开始在交互式解释器中开始编写 Python 代码 2、命令行脚本 可以把代码放到文件中&#xff0c;通过python 文件名.py命令执行代码&#xff0c;如下&#xff…

机器学习——pca降维/交叉验证/网格交叉验证

1、pca降维&#xff1a;目的是提升模型训练速度 定义&#xff1a; 使用方法&#xff1a;给训练数据或者测试数据进行降维处理 给训练数据降维 给测试数据降维&#xff1a;这里1就要用transform&#xff0c;而不是fit_transform&#xff0c;因为之前训练数据降维时特征已经确定…