手写分布式配置中心(四)增加实时刷新功能(长轮询)

上一篇文章中实现了短轮询,不过短轮询的弊端也很明显,如果请求的频率较高,那么就会导致服务端压力大(并发高);如果请求的频率放低,那么客户端感知变更的及时性就会降低。所以我们来看另一种轮询方式,长轮询。
长轮询就是客户端发起请求,如果服务端的数据没有发生变更,那么就hold住请求,直到服务端的数据发生了变更,或者达到了一定的时间就会返回。这样就减少了客户端和服务端不断频繁连接和传递数据的过程,并且不会消耗服务端太多资源,而且客户端感知变更的及时性也会大大提高

代码在https://gitee.com/summer-cat001/config-center​​​​​​​

原理

要实现服务端长时间hold请求,就要利用到servlet异步的特性,因为web服务器会有一个线程池,每一个请求来了之后会提交给这个线程池去处理请求,如果一个任务很长时间都没完成的话就会一直占有这个线程,那么其他请求来了会发现线程池里没有可用的线程就会一直等,直到有空闲的线程,这样就会导致并发性大大的减少。所以需要采用异步响应的方式去实现,而比较方便实现异步http的方式就是Servlet3.0提供的AsyncContext 机制。asyncContext是为了把主线程返回给web服务器的线程池,不影响服务对其他客户端请求。会有线程专门处理这个长轮询,但并不是说每一个长轮询的http请求都要用一个线程阻塞在那。而是把长轮询的request的引用在一个集合中存起来,用一个或几个线程专门处理一批客户端的长轮询请求,这样就不需要为每一个长轮询单独分配线程阻塞在那了,从而大大降低了资源的消耗。注意,异步不是非阻塞,响应数据时还是要阻塞的。

服务端

服务端增加一个长轮询的接口

@PostMapping("/change/get/long")public Result<Void> getLongChangeConfig(@RequestBody Map<Long, Integer> configIdMap, HttpServletRequest request, HttpServletResponse response) {if (configIdMap == null || configIdMap.isEmpty()) {return Result.fail("配置参数错误");}response.setContentType("application/json;charset=UTF-8");AsyncContext asyncContext = request.startAsync();asyncContext.setTimeout(0);ConfigPolingTask configPolingTask = new ConfigPolingTask();configPolingTask.setAsyncContext(asyncContext);configPolingTask.setConfigPolingDataMap(configIdMap);configPolingTask.setEndTime(System.currentTimeMillis() + 28 * 1000);configService.configListener(configPolingTask);return null;}

主要就是把请求的配置id和版本的map、超时时间、asyncContext对象组装成一个任务,添加到任务池里,如有更新了配置,会去任务池里找是否有该配置id的任务,如果版本号大于任务的版本号,就将新配置返回给客户端。于此同时会有1个定时线程每1秒访问一下任务池,找到过期的任务,返回给客户端。客户端的请求过期时间是30秒,服务端过期时间定的是28秒,也就是配置没有改变的情况下,会hold请求28秒才返回,提前2秒返回是为了防止返回传输时间导致超过30秒,客户端断开链接。
 

@Slf4j
@Service
public class ConfigServiceImpl implements ConfigService {private ConfigDAO configDAO;private ConfigSyncService configSyncService;@Autowiredprivate LocalConfigDAO localConfigDAO;@Autowiredprivate LocalConfigSyncServiceImpl localConfigSyncService;@Value("${config.center.mode:0}")private int configCenterMode;private int respThreadNum;private final ExecutorService respExecutor;private final ConfigPolingTasksHolder configPolingTasksHolder;public ConfigServiceImpl() {configPolingTasksHolder = new ConfigPolingTasksHolder();//构建用于响应长轮询的线程池respExecutor = new ThreadPoolExecutor(100, 5000,0, TimeUnit.SECONDS,new ArrayBlockingQueue<>(102400),this::newRespThread,new ThreadPoolExecutor.CallerRunsPolicy());//每1秒轮询执行一次任务超时检测ScheduledExecutorService timeoutCheckExecutor = new ScheduledThreadPoolExecutor(1, this::newCheckThread);timeoutCheckExecutor.scheduleAtFixedRate(this::responseTimeoutTask, 0, 1, TimeUnit.SECONDS);}@PostConstructpublic void init() {ConfigCenterModeEnum configCenterModeEnum = ConfigCenterModeEnum.getEnum(configCenterMode);if (configCenterModeEnum == null) {throw new IllegalArgumentException("配置config.center.mode错误");}if (configCenterModeEnum == ConfigCenterModeEnum.STANDALONE) {this.configDAO = localConfigDAO;this.configSyncService = localConfigSyncService;}}@Overridepublic Result<Void> insertConfig(ConfigBO configBO) {List<ConfigDO> configList = configDAO.getAllValidConfig();if (configList.stream().anyMatch(c -> c.getName().equals(configBO.getName()))) {return Result.fail("配置名重复");}ConfigDO configDO = new ConfigDO();configDO.setName(configBO.getName());configDO.setConfigData(configBO.getConfigData().toJSONString());configDAO.insertConfigDO(configDO);return Result.success(null);}@Overridepublic Result<Void> updateConfig(ConfigBO configBO) {ConfigDO configDO = new ConfigDO();configDO.setId(configBO.getId());configDO.setName(configBO.getName());configDO.setConfigData(configBO.getConfigData().toJSONString());configDAO.updateConfig(configDO);configSyncService.publish(configBO.getId());return Result.success(null);}@Overridepublic Result<Void> delConfig(long id, long updateUid) {configDAO.delConfig(id, updateUid);return Result.success(null);}@Overridepublic Result<List<ConfigBO>> getAllValidConfig() {List<ConfigDO> configList = configDAO.getAllValidConfig();return Result.success(configList.stream().map(this::ConfigDO2BO).collect(Collectors.toList()));}@Overridepublic void configListener(ConfigPolingTask configPolingTask) {//先将任务加到待响应列表中,然后再判断账号是否有改变,防止并发问题//如先判断再加进去,加入前如有变动,任务里无法感知到,空等到超时configPolingTasksHolder.addConfigTask(configPolingTask);List<ConfigBO> allValidConfig = getAllValidConfig().getData();List<ConfigVO> changeConfigList = getChangeConfigList(configPolingTask, allValidConfig);if (!changeConfigList.isEmpty()) {List<ConfigPolingTask> todoTask = configPolingTasksHolder.getExecuteTaskList(configPolingTask::equals);if (!todoTask.isEmpty()) {doResponseTask(configPolingTask, Result.success(changeConfigList));}}}@Overridepublic void onChangeConfigEvent(long configId) {List<ConfigPolingTask> todoTasks = configPolingTasksHolder.getExecuteTaskList(configPolingTask -> configPolingTask.getConfigPolingDataMap().containsKey(configId));if (!todoTasks.isEmpty()) {List<ConfigBO> configList = Collections.singletonList(ConfigDO2BO(configDAO.getConfig(configId)));todoTasks.forEach(todoTask -> {List<ConfigVO> changeConfigList = getChangeConfigList(todoTask, configList);respExecutor.submit(() -> doResponseTask(todoTask, Result.success(changeConfigList)));});}}private List<ConfigVO> getChangeConfigList(ConfigPolingTask configPolingTask, List<ConfigBO> configList) {Map<Long, Integer> configPolingDataMap = configPolingTask.getConfigPolingDataMap();return configList.stream().filter(configBO -> configPolingDataMap.containsKey(configBO.getId())).filter(configBO -> configBO.getVersion() > configPolingDataMap.get(configBO.getId())).map(ConfigServiceImpl::configBO2ConfigVO).collect(Collectors.toList());}private ConfigBO ConfigDO2BO(ConfigDO configDO) {ConfigBO configBO = new ConfigBO();configBO.setId(configDO.getId());configBO.setName(configDO.getName());configBO.setVersion(configDO.getVersion());configBO.setCreateTime(configDO.getCreateTime());configBO.setConfigData(JSON.parseObject(configDO.getConfigData()));return configBO;}//响应超时未改变的任务private void responseTimeoutTask() {List<ConfigPolingTask> timeoutTasks = configPolingTasksHolder.getExecuteTaskList(configPolingTask -> System.currentTimeMillis() >= configPolingTask.getEndTime());timeoutTasks.forEach(timeoutTask -> respExecutor.submit(() ->doResponseTask(timeoutTask, Result.success(new ArrayList<>()))));}private void doResponseTask(ConfigPolingTask configPolingTask, Result<?> result) {AsyncContext asyncContext = configPolingTask.getAsyncContext();try (PrintWriter writer = asyncContext.getResponse().getWriter()) {writer.write(JSON.toJSONString(result));writer.flush();} catch (Exception e) {log.error("doResponseTimeoutTask error,task:{}", configPolingTask, e);} finally {asyncContext.complete();}}private Thread newCheckThread(Runnable r) {Thread t = new Thread(r);t.setDaemon(true);t.setName("ConfigLongPollingTimeoutCheckExecutor");return t;}private Thread newRespThread(Runnable r) {Thread t = new Thread(r);t.setDaemon(true);t.setName("ConfigLongPollingTimeoutRespExecutor-" + respThreadNum++);return t;}public static ConfigVO configBO2ConfigVO(ConfigBO configBO) {ConfigVO configVO = new ConfigVO();configVO.setId(configBO.getId());configVO.setName(configBO.getName());configVO.setVersion(configBO.getVersion());configVO.setConfigData(configBO.getConfigData());configVO.setCreateTime(DateUtil.date2str1(configBO.getCreateTime()));return configVO;}
}
public class ConfigPolingTasksHolder {private final List<ConfigPolingTask> configPolingTasks;public ConfigPolingTasksHolder() {configPolingTasks = new ArrayList<>();}public synchronized void addConfigTask(ConfigPolingTask configPolingTask) {configPolingTasks.add(configPolingTask);}//将要处理的任务在任务列表中删除,并将其放到外面执行,防止锁的时间太长public synchronized List<ConfigPolingTask> getExecuteTaskList(Predicate<ConfigPolingTask> predicate) {List<ConfigPolingTask> resultTasks = new ArrayList<>();configPolingTasks.removeIf(configPolingTask -> {boolean res = predicate.test(configPolingTask);if (res) {resultTasks.add(configPolingTask);}return res;});return resultTasks;}
}
@Data
public class ConfigPolingTask {/*** 截止时间*/private long endTime;/*** 异步请求*/private AsyncContext asyncContext;/*** 配置轮询数据(配置id,版本)*/private Map<Long, Integer> configPolingDataMap;
}

客户端

客户端就很简单了,只要循环发一个超时时间是30秒的http请求就行

public void startLongPolling() {polling("/config/change/get/long", null, 30000);}public void polling(String uri, Runnable runnable, int readTimeout) {Thread thread = new Thread(() -> {while (!Thread.interrupted()) {try {Optional.ofNullable(runnable).ifPresent(Runnable::run);Map<Long, List<ConfigDataBO>> refreshConfigMap = new HashMap<>();configMap.values().forEach(configBO -> {Optional.ofNullable(configBO.getConfigDataList()).ifPresent(cdList -> cdList.stream().filter(cd -> cd.getRefreshFieldList() != null && !cd.getRefreshFieldList().isEmpty()).forEach(refreshConfigMap.computeIfAbsent(configBO.getId(), k1 -> new ArrayList<>())::add));});if (refreshConfigMap.isEmpty()) {return;}Map<String, Integer> configIdMap = refreshConfigMap.keySet().stream().collect(Collectors.toMap(String::valueOf, configId -> configMap.get(configId).getVersion()));HttpRespBO httpRespBO = HttpUtil.httpPostJson(url + uri, JSON.toJSONString(configIdMap), readTimeout);List<ConfigVO> configList = httpResp2ConfigVOList(httpRespBO);if (configList.isEmpty()) {continue;}configList.forEach(configVO -> {Map<String, Object> result = new HashMap<>();DataTransUtil.buildFlattenedMap(result, configVO.getConfigData(), "");ConfigBO configBO = this.configMap.get(configVO.getId());configBO.setVersion(configVO.getVersion());List<ConfigDataBO> configDataList = configBO.getConfigDataList();Map<String, ConfigDataBO> configDataMap = configDataList.stream().collect(Collectors.toMap(ConfigDataBO::getKey, Function.identity()));result.forEach((key, value) -> {ConfigDataBO configDataBO = configDataMap.get(key);if (configDataBO == null) {configDataList.add(new ConfigDataBO(key, value.toString()));} else {configDataBO.setValue(value.toString());List<RefreshFieldBO> refreshFieldList = configDataBO.getRefreshFieldList();if (refreshFieldList == null) {refreshFieldList = new ArrayList<>();configDataBO.setRefreshFieldList(refreshFieldList);}refreshFieldList.forEach(refreshFieldBO -> {try {Field field = refreshFieldBO.getField();field.setAccessible(true);field.set(refreshFieldBO.getBean(), value.toString());} catch (Exception e) {log.error("startShortPolling set Field error", e);}});}});});} catch (Exception e) {log.error("startShortPolling error", e);}}});thread.setName("startShortPolling");thread.setDaemon(true);thread.start();}private List<ConfigVO> httpResp2ConfigVOList(HttpRespBO httpRespBO) {if (!httpRespBO.success()) {throw new IllegalArgumentException("获取配置失败:code:" + httpRespBO.getCode() + ",msg:" + httpRespBO.getMessage());}if (httpRespBO.getBody() == null) {throw new IllegalArgumentException("获取配置失败 body is null:code:" + httpRespBO.getCode() + ",msg:" + httpRespBO.getMessage());}Result<?> result = JSON.parseObject(new String(httpRespBO.getBody(), StandardCharsets.UTF_8), Result.class);if (result.failed()) {throw new IllegalArgumentException("获取配置失败 result:" + result);}return JSON.parseArray(JSON.toJSONString(result.getData()), ConfigVO.class);}
public class ClientTest {private String userName;private String userAge;private List<Object> education;public ClientTest() throws NoSuchFieldException {ConfigCenterClient configCenterClient = new ConfigCenterClient("http://localhost:8088");Map<String, String> configProperty = configCenterClient.getConfigProperty();this.userName = configProperty.get("user.name");this.userAge = configProperty.get("user.age");this.education = new ArrayList<>();int i = 0;while (configProperty.containsKey("user.education[" + i + "]")) {education.add(configProperty.get("user.education[" + (i++) + "]"));}configCenterClient.addRefreshField("user.name", new RefreshFieldBO(this, ClientTest.class.getDeclaredField("userName")));configCenterClient.startLongPolling();}public String toString() {return "姓名:" + userName + ",年龄:" + userAge + ",教育经历:" + education;}public static void main(String[] args) throws NoSuchFieldException, InterruptedException {ClientTest clientTest = new ClientTest();while (!Thread.interrupted()) {System.out.println(clientTest);Thread.sleep(1000);}}
}

效果

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/725048.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Mysql面试总结

基础 1. 数据库的三范式是什么&#xff1f; 第一范式&#xff1a;强调的是列的原子性&#xff0c;即数据库表的每一列都是不可分割的原子数据项。第二范式&#xff1a;要求实体的属性完全依赖于主关键字。所谓完全 依赖是指不能存在仅依赖主关键字一部分的属性。第三范式&…

牛客 子序列的权值最小值(贪心)

链接&#xff1a;https://ac.nowcoder.com/acm/contest/76803/A 来源&#xff1a;牛客网 题目描述 给定一个长度为 n n n 的数组 a a a&#xff0c;求数组所有非空子序列权值的最小值。 定义子序列 a i , a j , … , a k a_i,a_j,…,a_k ai​,aj​,…,ak​ 的权值为 a i a…

什么是reids缓存雪崩、缓存击穿、缓存穿透?

缓存雪崩、缓存击穿和缓存穿透是与redis缓存系统相关的三种常见问题&#xff0c;它们都可能导致缓存系统性能下降或失效。下面分别解释这三种情况并提供例子&#xff1a; 缓存雪崩&#xff08;Cache Avalanche&#xff09;&#xff1a; 定义&#xff1a; 缓存雪崩是指在某个时…

UDP通信发送和接收 || UDP实现全双工通信

recvfrom ssize_t recvfrom(int sockfd, void *buf, size_t len, int flags, struct sockaddr *src_addr, socklen_t *addrlen); 功能: 从套接字中接收数据 参数: sockfd:套接字文件描述符 buf:存放数据空间首地址 …

基于springboot实现的幼儿园管理系统

一、系统架构 前端&#xff1a;html | layui | jquery | css 后端&#xff1a;springboot | mybatis 环境&#xff1a;jdk1.8 | mysql | maven 二、代码及数据库 三、功能介绍 01. 登录页 02. 系统管理-用户管理 03. 系统管理-页面管理 04. 系统管理-角色管…

Day 50[补档] |● 123.买卖股票的最佳时机III ● 188.买卖股票的最佳时机IV

123.买卖股票的最佳时机III class Solution { public:int maxProfit(vector<int>& prices) {if(prices.size() 0) return 0;vector<vector<int>> dp(prices.size(), vector<int>(5,0));//dp[i][j] 表示根据j的方法数下&#xff0c;第i天时候最大…

win11配置Mask DINO小白踩坑记录

win11配置Mask DINO踩坑记录 1 准备工作2 创建python环境和安装detectron22.1 安装前提2.2 安装流程2.2.1 cl.exe的错误2.2.2 SetuptoolsDeprecationWarning的错误 3 MaskDINO运行3.1 运行demo 前情提要&#xff1a;需要复现Mask DINO&#xff0c;但是实验室没有Linux的电脑&am…

keycloak18.0.0==本地源码启动

github下载源码&#xff0c; 版本18.0.0 java和maven的版本如下 E:\keycloak-18.0.0>java -version java version "21.0.1" 2023-10-17 LTS Java(TM) SE Runtime Environment (build 21.0.112-LTS-29) Java HotSpot(TM) 64-Bit Server VM (build 21.0.112-LTS-…

Wince NK.BIN文件格式

nk.bin文件格式 开始7个字节为 42 30 30 30 46 46 0A 即“B000FF\x0A”&#xff0c;以次来判别文件类型。 接下来4字节&#xff08;DWORD&#xff09;表示ImageStart, 4字节表示ImageLength 如7字节后的8字节分别为&#xff1a;00 00 00 60 9C FA 33 01 则表示ImageStart0x600…

【SpringBoot3.x教程05】SpringBoot与关系型数据库的整合

前言&#xff1a;常用的ORM框架有哪些 JdbcTemplate JdbcTemplate 是Spring框架提供的一个JDBC抽象库&#xff0c;旨在简化传统的JDBC操作&#xff0c;避免了繁琐的JDBC代码和数据库资源的手动处理。通过JdbcTemplate&#xff0c;开发者可以更加专注于业务逻辑而不是数据库的连…

【网站项目】308学生档案管理系统

&#x1f64a;作者简介&#xff1a;拥有多年开发工作经验&#xff0c;分享技术代码帮助学生学习&#xff0c;独立完成自己的项目或者毕业设计。 代码可以私聊博主获取。&#x1f339;赠送计算机毕业设计600个选题excel文件&#xff0c;帮助大学选题。赠送开题报告模板&#xff…

如何用Python3自撰一个简单的后端框架

不使用任何现有的后端框架来创建一个Python 3的后端框架是一个相当复杂的任务,因为它涉及到许多Web开发的基础知识,比如HTTP协议处理、路由、中间件、请求和响应处理等。然而,我们可以从最基本的概念开始,逐步构建一个简单的后端框架。 以下是一个非常基础的指南,用于创建…

Spring Webflux 详解

目录 0、组件对比 1、WebFlux 1、引入 2、Reactor Core 1、HttpHandler、HttpServer 3、DispatcherHandler 1、请求处理流程 4、注解开发 1、目标方法传参 2.返回值写法 5、文件上传 6、错误处理 7、RequestContext 8、自定义Flux配置 9、Filter WebFlux&am…

2024武汉国际氢能源与燃料电池展将在8月盛大开幕!

2024武汉国际氢能源及燃料电池产业博览会 同期举办&#xff1a;2024世界汽车制造技术暨智能装备博览会 时间&#xff1a;2024.8.14-16 地点&#xff1a;武汉国际博览中心 邀请函 主办单位&#xff1a; 湖北省汽车行业协会 湖北省机械行业联合会 湖北省汽车产业技术创新联…

查找数组元素相同的最后一组数组的第一个下标

题目 面试过程中&#xff0c;被问到这道面试题&#xff0c;做个记录&#xff1a; 有一组数组x&#xff1a; x{“11”,”aa”} 要比较的数组y是&#xff1a; y{“11”,”aa”,“11”,”aa”} y1{“11”,”aa”,“11”,”res”,”aa”} y2 {“11”,”11”,“11”,”aa”} 需…

为树莓派5编译Android14源码

为树莓派5编译Android14源码 1. 软硬件要求 官方推荐配置: https://source.android.google.cn/docs/setup/start/requirements?hl=zh-cn ubuntu22.04 虚拟机内存最低 16GB磁盘最低 360GB2. 安装依赖工具 官方文档:https://source.android.google.cn/docs/setup/start/init…

jieba模块中文分词应用场景案例

jieba 是一个在 Python 中广泛使用的中文分词库。由于其高效、准确和易用&#xff0c;jieba 在自然语言处理领域有着广泛的应用。下面我将通过一个简单的案例来展示 jieba 在中文分词中的应用场景。 案例&#xff1a;文本分类 假设我们有一个简单的文本分类任务&#xff0c;需…

Linux 之二:CentOS7 的 IP 常用命令和配置及 xshell 基本使用方法

1. 进入虚拟机 点击右键---进入终端--输入 ip adrr 或 ifconfig 查看ip地址 下面输入命令 ifconfig&#xff08;注意&#xff1a;不是 ipconfig &#xff09; 或 ip addr 来查看当前系统 IP 查看到IP 后&#xff0c;比如&#xff1a;上面是 192.168.184.137 1.1 IP 常用命令…

LeetCode142题:环形链表II(python3)

代码思路&#xff1a; 双指针的第一次相遇&#xff1a; 设两指针 fast&#xff0c;slow 指向链表头部 head 。 令 fast 每轮走 2 步&#xff0c;slow 每轮走 1 步。 fast 指针走过链表末端&#xff0c;说明链表无环&#xff0c;此时直接返回 null。 如果链表存在环&#xff0c;…

web学习笔记(二十六)

目录 1.JS执行队列 1.1JS是单线程 1.2Web Worker 1.3同步和异步 1.4JS执行机制 2.location对象 2.1什么是location对象 2.2url包含的信息 2.3location对象属性 2.4location对象的方法 3.navigator对象和history对象 3.1navigator对象 3.2history对象 1.JS执行队…