记一次异步转同步的经历

         工作中会经常遇到一些对数据进行加工的场景,这些数据来自很多地方,一般通过HTTP、RPC等方式去调用,数据源返回的报文也一般都是JSON、XML等格式。其中大部分数据源是同步返回,但有些数据源是异步返回的(也就是说数据源的数据是数据源提供方回调给你的)。

        此时如果你的加工逻辑是必须要等到所有的数据都加载完毕才能开始加工的话,就会比较麻烦,因为有些数据是异步数据源回调给你的(需要你的程序有回调接口),但是处理程序并不知道异步数据源何时回调给你数据。那么如何处理这种情况呢?

方式一:利用循环:

  1. 调用外部数据源的结束后,如果调用成功,则 设置缓存(Map、Redis、Mysql),缓存key:为 当前流水号+数据源编码,缓存Value:Null
  2. 设置完缓存Key,程序开始循环查询该Key下是否有数据,如果有数据,则跳出循环执行后续程序,如果没有数据则继续执行循环。
  3. 异步数据源回调接口接收到异步数据源的请求,将请求报文设置到第一步的缓存中

       虽然上面的方案容易实现也容易理解,但是实现效果并不优雅,循环会比较耗费性能,即使可以在循环的过程中设置一定的休眠时间,但在时效性比较高的场景下,也并不合适。

      除了方式一之外,还有其他更优雅的实现方式吗?答案是肯定的,那么回到问题本身,程序无非是希望可以同步获取到数据,以方便后续对数据进行加工,但是异步数据源调用时同步返回的并不是你想要的数据,而是该数据源调用是否成功,真正的数据是数据源回调给你的,数据入口也随之改变了,因此问题就变成了:如果异步数据源调用成功,如何在当前线程等待异步数据源把程序需要的数据给回调(注意程序的回调接口是另外的线程了)回来,并由当前线程来获取呢?  其中方式一的解决方案就是循环等待取缓存。所以这里面重点关注的其实就是:等待和消息  

    等待也即阻塞一般第一时间想到的是锁,也就是说调用完异步数据源是否可以马上进行加锁等待,答案是可以的,但是随之的问题就是异步数据源回调之后怎么解开这个锁,以目前Java来说并没有较好的实现形式,可能需要借助一些工具才能实现。同时需要考虑的是即使数据源的回调接口可以解锁,那么回调接口又怎么把数据传递给加锁的线程呢?

    因此这个场景的最优解就是找到既有阻塞功能又有消息获取的一种形式,此时答案就已经呼之欲出了,消息队列

方式二:消息队列

  1. 调用外部数据源的结束后,如果调用成功,则创建队列:队列名:当前流水号+数据源编码
  2. 当前线程开启监听该队列(阻塞)
  3. 异步数据源回调接口接收到异步数据源的请求,将报文推送到步骤1的队列中去
  4. 当前线程获取到该队列的消息,结束阻塞继续执行

    以上就是使用队列实现异步转同步的形式,简单来说就是利用队列来实现阻塞和获取消息的目的,当然一提到队列,可能首先想到的是RabbitMq、Kafka之类的消息中间件

    但我们一般使用RabbitMq、Kafka都是用来监听一些预设的队列,这些队列用来处理固定的几种业务,通常会有多个生产者和多个消费者,消费者和生产者的数量一般都是固定的,消费的逻辑也大致相同,但该场景的情况是,消费者都是临时的、一次性的(线程维度,消费者获取到消息后,就会停止监听),比如线程A调用完异步接口,接着线程A就开始监听队列,接收到属于自己的消息后(可以判断消息是否是自己的消息,比如通过判断消息头里的属性(生产者设置)),断开监听,继续执行代码。因此该场景的消费者、生产者的数量是由请求频率决定的。不过在使用Mq时还会遇到一个问题那就是,假如有多个线程都在监听同一个队列,由于每个线程需要的信息时不同的(因为有可能是多条请求),那么此时就会造成很多消费失败的情况(消费者获取到的消息不是本线程需要的消息),消息可能会重新入队。最终导致mq的性能下降。因此使用预设队列的形式是有些问题的。

    上面描述的是使用预设队列的形式,还有一种形式就是动态队列:也即每笔请求,都会有相应的流水号,比如线程A调用完异步接口,根据流水号和数据源编码动态创建队列,接着线程A就开始监听该队列,然后回调接口接收到外部数据的回调信息,往相应的队列中插入数据,此时线程A接收到消息,断开监听,继续执行代码。

   如何实现动态队列呢?

   首先队列的两个主要功能要实现,即监听队列和推送消息。

/*** 消息管道接口*/
public interface MessageChannel {/*** 发布消息* @param channelName 管道名* @param message  消息内容*/void publishMessage(String channelName, String message);/*** 订阅消息* @param channelName 管道名* @return*/String subscribeMessage(String channelName);/*** 生成管道名称* @param serialNumber 请求流水号(该流水号应该是透传的,即异步请求时,该流水号传递给数据源,数据源回调时,该流水号再回传回来)* @param dataSourceCode  数据源编码* @return*/default String generateChannelName(String serialNumber,String dataSourceCode) {return serialNumber + ":" + dataSourceCode;}
}

实现一:

import org.springframework.stereotype.Component;import java.util.HashMap;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;/*** Jvm级别的消息管道*/
@Component
public class LocalMessageChannel implements MessageChannel{/*** 队列集合,一个时间段内的请求可能会比较大*/private static HashMap<String, BlockingQueue<String>> CHANNEL_COLLECTION = new HashMap<>();@Overridepublic void publishMessage(String channelName, String message) {CHANNEL_COLLECTION.get(channelName).add(message);}@Overridepublic String subscribeMessage(String channelName) {createChannel(channelName);BlockingQueue<String> blockingQueue = CHANNEL_COLLECTION.get(channelName);
//        阻塞获取消息String message = null;try {
//              可设置阻塞时间,避免回调时间过长
//            message = blockingQueue.poll(60, TimeUnit.SECONDS);message = blockingQueue.take();} catch (InterruptedException e) {e.printStackTrace();}
//        获取到消息后,移除该队列CHANNEL_COLLECTION.remove(channelName);return message;}/*** 创建管道* @param channelName*/private void createChannel(String channelName) {
//        SynchronousQueue队列内部仅允许容纳一个元素。当一个线程插入一个元素后会被阻塞,除非这个元素被另一个线程消费
//        由于业务的特殊性,采用临时队列(消费一次,队列即可删除),该队列只需容纳一条消息即可SynchronousQueue synchronousQueue = new SynchronousQueue();CHANNEL_COLLECTION.put(channelName, synchronousQueue);}
}

伪代码:

订阅消息:

import com.alibaba.fastjson.JSONObject;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.client.RestTemplate;import java.net.URI;@RequestMapping("dataSource")
@RestController
public class UserInfoController {@Autowiredprivate RestTemplate restTemplate;@Autowired@Qualifier("localMessageChannel")private MessageChannel messageChannel;/*** 请求异步数据源,并等待异步数据源的回调,等待机制使用队列* 注意流水号需要透传* @param serialNumber 流水号* @return*/@RequestMapping("/userInfo")public String userInfo(String serialNumber) {
//        注意该请求是异步请求(流水号透传)String forObject = restTemplate.getForObject(URI.create("http://xxxx?serialNumber=" + serialNumber), String.class);JSONObject jsonObject = JSONObject.parseObject(forObject);
//        todo 判断异步请求是否成功,省略
//        如果成功 (数据源编码是常量)String channelName = messageChannel.generateChannelName(serialNumber, "USERINFO");
//        阻塞获取消息(直到管道中被推送了消息)String message = messageChannel.subscribeMessage(channelName);return message;}
}

推送消息:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.client.RestTemplate;@RequestMapping("callback")
@RestController
public class CallBackController {@Autowiredprivate RestTemplate restTemplate;@Autowired@Qualifier("localMessageChannel")private MessageChannel messageChannel;/*** 异步数据源回调* @param serialNumber  流水号(透传)* @param dataSourceCode 数据源编码* @param request 回调内容* @return*/@RequestMapping("/dataSource")public String userInfo(String serialNumber,String dataSourceCode,String request) {
//        根据回调的请求体,生成需要推送的管道名称String channelName = messageChannel.generateChannelName(serialNumber, dataSourceCode);
//        推送消息messageChannel.publishMessage(channelName,request);return "SUCCESS";}
}

     上面的代码是实现动态队列的方式之一,但是有一个问题那就是他是JVM级别的,也就是说不能跨进程,实际生产环境中,每个服务可能不止一个实例,因此如果发起请求的服务实例,与接收回调的服务实例不是同一个时,就会造成队列永远订阅不到消息的情况,那有没有全局的订阅消息的方案呢?答案是有的,这里可以借助一下Redis

实现二:

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import redis.clients.jedis.*;import java.util.HashSet;
import java.util.Set;@Configuration
public class JedisConfig {@Beanpublic JedisPoolConfig jedisPoolConfig() {JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();//最大连接数,控制一个pool可分配多少个jedis实例,通过pool.getResource()来获取,-1不限制jedisPoolConfig.setMaxTotal(-1);//最大空闲连接jedisPoolConfig.setMaxIdle(10);//最小空闲连接jedisPoolConfig.setMinIdle(2);//在borrow一个jedis实例时,是否提前进行validate操作;如果为true,则得到的jedis实例均是可用的jedisPoolConfig.setTestOnBorrow(true);return jedisPoolConfig;}@Beanpublic JedisPool jedisPool(JedisPoolConfig jedisPoolConfig) {JedisPool jedisPool = new JedisPool(jedisPoolConfig, "127.0.0.1", 6379, 1000, null);return jedisPool;}/*** 这里为什么不把Jedis配置为一个Bean呢?* 这里因为将Jedis配置为Bean后,Jedis对象就是单例的,此时Jedis对象先监听队列* 然后再使用Jedis去发布队列消息时会报错:redis.clients.jedis.exceptions.JedisDataException: ERR only (P)SUBSCRIBE / (P)UNSUBSCRIBE / QUIT allowed in this context* 因此实际使用时,注入JedisPool即可* @param jedisPool* @return*/
//    @Beanpublic Jedis jedis(JedisPool jedisPool) {return jedisPool.getResource();}/*** 需要确保你的redis服务支持集群模式,否则报错* @param jedisPoolConfig* @return*/
//    @Beanpublic JedisCluster jedisCluster(JedisPoolConfig jedisPoolConfig) {Set<HostAndPort> hostAndPortSet = new HashSet<HostAndPort>();hostAndPortSet.add(new HostAndPort("127.0.0.1", 6379));JedisCluster jedisCluster = new JedisCluster(hostAndPortSet, jedisPoolConfig);return jedisCluster;}
}
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPubSub;@Component
public class RedisMessageChannel implements MessageChannel {/*** 注入JedisPool,使用时 jedisPoll.getResource()* 保证可以获取到不同的jedis对象*/@Autowiredprivate JedisPool jedisPool;@Overridepublic void publishMessage(String channelName, String message) {jedisPool.getResource().publish(channelName, message);}@Overridepublic String subscribeMessage(String channelName) {StringBuffer msgBuffer = new StringBuffer();Jedis jedis = jedisPool.getResource();
//        监听队列(阻塞操作)jedis.subscribe(new JedisPubSub() {Thread thread;@Overridepublic void onMessage(String channel, String message) {msgBuffer.append(message);
//                取消订阅,不取消的话,会一直阻塞unsubscribe(channelName);if (thread != null) {
//                    停止线程thread.interrupt();}}@Overridepublic void onSubscribe(String channel, int subscribedChannels) {System.out.println("已开始监听队列");
//                由于Jedis监听没有超时时间设置,因此可以在开始监听时开启一个线程设置休眠时间,时间到了之后取消订阅thread = new Thread(() -> {try {Thread.sleep(10 * 1000);if (isSubscribed()) {unsubscribe(channelName);}} catch (InterruptedException e) {
//                        如果在超时时间之前,已经接收到了消息,就会停止线程,此处会报错,不影响throw new RuntimeException(e);}});thread.start();}}, channelName);System.out.println("阻塞结束");return msgBuffer.toString();}
}

     以上便是使用Reids来实现的动态队列,它是跨进程的,同时由于Redis的PubSub队列实现并不存储数据,因此是一个比较轻量且优雅的实现方式。

    需要注意的是,这里我使用的Redis客户端是Jedis,如果使用RedisTemplate、Redisson作为客户端则默认情况下,监听队列并不会产生阻塞效果。

代码如下:

import org.redisson.api.RTopic;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;import java.nio.charset.StandardCharsets;@Component
public class RedisMessageChannel implements MessageChannel {@Autowiredprivate RedisTemplate redisTemplate;@Autowiredprivate RedissonClient redissonClient;@Overridepublic void publishMessage(String channelName, String message) {redisTemplate.convertAndSend(channelName, message);}@Overridepublic String subscribeMessage(String channelName) {StringBuffer msgBuffer = new StringBuffer();RedisConnectionFactory connectionFactory = redisTemplate.getConnectionFactory();
//        获取Redis连接
//        RedisClusterConnection clusterConnection = connectionFactory.getClusterConnection();
//        RedisSentinelConnection sentinelConnection = connectionFactory.getSentinelConnection();RedisConnection connection = connectionFactory.getConnection();//        监听队列(使用redisTemplate默认情况下此步骤不产生阻塞效果)connection.subscribe(new MessageListener() {@Overridepublic void onMessage(Message message, byte[] pattern) {msgBuffer.append(new String(message.getBody()));
//                取消订阅(将跳出该方法)connection.getSubscription().unsubscribe(channelName.getBytes(StandardCharsets.UTF_8));}}, channelName.getBytes(StandardCharsets.UTF_8));RTopic topic = redissonClient.getTopic(channelName);
//        监听队列(使用Redisson默认情况下此步骤不产生阻塞效果)topic.addListener(String.class, new org.redisson.api.listener.MessageListener<String>() {@Overridepublic void onMessage(CharSequence channel, String msg) {msgBuffer.append(msg);topic.removeAllListeners();}});System.out.println("###############");return msgBuffer.toString();}
}

   最后的最后,需要注意的是,那些数据源的异步接口,回调的时间要尽可能短毕竟服务线程都在那里卡着,如果回调时间很长(虽然可以设置监听的超时时间)就会导致等待的线程过多,从而造成服务器资源耗尽。

  另外对于存在依赖关系的数据加工来说,如果数据加工需要依赖一些异步数据源,那么对于上游系统来说,最好的方式是调用本系统的异步计算接口。这种情况下,上游系统调用了本系统的异步接口,可以等待本系统的回调,而本系统调用了外部的异步数据源之后,也可以不用等待异步回调了,而是在回调中再进行处理,那么资源问题就可以得到解决了。不过这样就不符合文章的标题了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/720216.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Android 音乐播放器(暂停、下一首、上一首)

1.编写主页面&#xff0c;使用listview组件放置音乐列表信息 <?xml version"1.0" encoding"utf-8"?> <RelativeLayout xmlns:android"http://schemas.android.com/apk/res/android"xmlns:app"http://schemas.android.com/apk/r…

使用postman测试若依其他业务接口API—3

请求方式 如上&#xff0c;使用Get请求来获取练习题库中的所有习题数据。 请求地址 在请求路径栏输入请求地址&#xff0c;以下图为例&#xff1a; 参数体与鉴权 在Parms键入所需参数&#xff0c;其中key为键,value为键的值&#xff1a;如下图所示&#xff1a; 认证成功与失…

持续集成(CICD)- gogs仓库的部署和使用

文章目录 一、gogs的介绍二、部署gog仓库三、首次启动gogs四、登录五、创建一个非空仓库六、从仓库拉取代码到本地七、把本地编辑的代码上传到仓库 一、gogs的介绍 Gogs作为一个轻量级、易于部署和使用的自托管Git服务&#xff0c;为小型团队和个人开发者提供了一个简单而强大…

【AIGC】如何提高Prompt准确度

前言 随着人工智能的迅猛进展&#xff0c;AIGC&#xff08;通用人工智能聊天工具&#xff09;已成为多个行业中不可或缺的自然语言处理技术。Prompt作为AIGC系统的一项关键功能&#xff0c;在工具的有效运作中发挥了举足轻重的作用。本篇文章将深入探讨Prompt与AIGC之间的紧密…

python笔记_程序流程控制2

C&#xff0c;循环控制 1&#xff0c;for循环 功能&#xff1a;让代码循环运行 语法&#xff1a; for <变量> in <范围、序列>&#xff1a; <循环操作语句> 例 nums &#xff08;1,2,3,4&#xff09; <class list> for i in nums&#xff1a; print&…

Java中文件的相关知识及文件IO操作

在我们日常生活中&#xff0c;会把许多东西都称之为文件。比如&#xff0c;一份纸质报告&#xff0c;或u盘中的一些文档&#xff0c;都会把它们称为文件。那么&#xff0c;这里说的文件是以操作系统的角度出发的。在操作系统中&#xff0c;会把许多硬件设备和软件资源都抽象成“…

ubuntu20.04安装nvidia驱动真实有效(被折磨了一天一夜的肝文!!!)

ubuntu20.04安装nvidia驱动真实有效 安装前后需要注意的安装nvidia驱动的教程 安装前后需要注意的 能找到这篇帖子说明你之前肯定有过无数次方法的尝试&#xff0c;这些尝试可能会影响下面教程的有效 1.下面这个指令可能会导致ubuntu内核的更新。内核更新可能会导致你的nvidia…

机器学习:主成分分析笔记

主成分分析&#xff08;Principal Component Analysis&#xff0c;PCA&#xff09;是一种无监督的机器学习算法&#xff0c;通常用于高维数据的降维、提取主要特征、数据降噪和可视化。PCA的基本思想是将原始数据的多个变量转换为少数几个相互独立的变量&#xff08;即主成分&a…

shadertoy 游戏《来自星尘》摇杆复刻

正确的做法应该是上 noise 而不是叠加 sin 波&#xff0c;不过如果不想麻烦的话叠波还是一个不错的选择&#xff1a;整体效果如下&#xff0c;已经非常形似 直接上链接&#xff1a;Shader - Shadertoy BETA float radiusScale 0.9; float variation(vec2 v1, vec2 v2, float …

node.js 封装分页查询

node.js封装sql分页查询 方法&#xff1a; /*** 生成分页查询sql* param {string} table 表名* param {number} pageNum 分页页数 * param {number} pageSize 分页条数 * param {object} query 查询对象 例&#xff1a;{id:1,name:小明}* returns sql语句*/ const limit (ta…

Java零基础-反射

哈喽&#xff0c;各位小伙伴们&#xff0c;你们好呀&#xff0c;我是喵手。 今天我要给大家分享一些自己日常学习到的一些知识点&#xff0c;并以文字的形式跟大家一起交流&#xff0c;互相学习&#xff0c;一个人虽可以走的更快&#xff0c;但一群人可以走的更远。 我是一名后…

springboot/ssm供应商管理系统Java货物进销存管理系统web

springboot/ssm供应商管理系统Java货物进销存管理系统web 基于springboot(可改ssm)vue项目 开发语言&#xff1a;Java 框架&#xff1a;springboot/可改ssm vue JDK版本&#xff1a;JDK1.8&#xff08;或11&#xff09; 服务器&#xff1a;tomcat 数据库&#xff1a;mysq…

传感器---触摸传感器

一、模块选型概述 芯片型号&#xff1a;TTP223B 供电电压&#xff1a;3-5V 控制接口&#xff1a;共三个引脚&#xff08;GND、VCC、SIG&#xff09;&#xff0c;GND为地&#xff0c;VCC为供电电源&#xff0c;SIG为数字信号输出脚&#xff1b; PCB尺寸&#xff1a;24 x 24 mm 触…

图像传感器与信号处理——SFR算法/ISO 12233解读

图像传感器与信号处理——SFR算法/ISO 12233解读 图像传感器与信号处理——SFR算法/ISO 12233解读 1. 前言 2. 基于视觉的分辨率测量方法 3. 基于边界的SFR算法 4. 基于正弦波的SFR算法 图像传感器与信号处理——SFR算法/ISO 12233解读 SFR的全称是Spatial Frequency Response&…

蓝桥集训之统计子矩阵

统计子矩阵 核心思想&#xff1a;矩阵前缀和 双指针 用i和j双指针 遍历所有子矩阵的列用s和t双指针 遍历所有子矩阵的行求其子矩阵的和 若>k 将s向下移动 矩阵和必定减小(元素个数减少)直到满足<k 因为列一定 行数即为方案数(从t行往上数到s行 共t-s1个区间[t,t][t-1,t]…

PESTEL分析

PESTEL分析是一种用于评估宏观环境对组织或企业的影响的工具。PESTEL是对政治、经济、社会、技术、环境和法律六个方面进行分析的首字母缩写。 - 政治因素&#xff08;Political&#xff09;&#xff1a;涉及政府政策、政治稳定性、法律和法规等因素对企业的影响。 - 经济因素&…

Linux中服务端开发

1 创建socket,返回一个文件描述符lfd---socket(); 2 将lfd和IP&#xff0c;PROT进行绑定---bind(); 3 将lfd由主动变成被动监听---listen(); 4 接收一个新的连接&#xff0c;得到一个的文件描述符cfd--accept() --该文件描述符用于与客户端通信 5 while(1) { 接受数据&a…

MySQL——存储引擎

存储引擎 InnoDB 是 MySQL 默认的存储引擎&#xff0c;只有在需要它不支持的特性时&#xff0c;才会考虑其他存储引擎 实现了 4 个标准的隔离级别&#xff0c;默认级别可重复度。在可重复度隔离级别下&#xff0c;通过 MVCC 间隙锁防止幻读 主索引是聚簇索引 内部做了很多…

Docker 日志存储大小限制,默认会充爆磁盘

背景 在容器化部署的时候&#xff0c;因为没有指定日志的最大存储时间&#xff0c;导致磁盘被充爆。查看日志 一般使用docker logs -f --tail 行数 容器名称 来查看容器的运行日志&#xff0c;但是在容器被初始化的时候&#xff0c;需要指定日志的最大存储时间&#xff0c;因为…

React学习笔记

1、React初步认识 React构建Web和原生交互界面的库,相较于其它前端框架的优势,具有丰富的生态跨平台支持。 1.1、React的开发依赖 开发React必须依赖三个库: react:包含react所必须的核心代码;react-dom:react渲染在不同平台所需要的核心代码 ;babel:将jsx转换成Rea…