Java在SpringCloud中自定义Gateway负载均衡策略

Java在SpringCloud中自定义Gateway负载均衡策略

一、前言

spring-cloud-starter-netflix-ribbon已经不再更新了,最新版本是2.2.10.RELEASE,最后更新时间是2021年11月18日,详细信息可以看maven官方仓库:org.springframework.cloud/spring-cloud-starter-netflix-ribbon,SpringCloud官方推荐使用spring-cloud-starter-loadbalancer进行负载均衡。

背景:大文件上传做切片文件上传;

流程:将切片文件上传到服务器,然后进行合并任务,合并完成之后上传到对象存储;现在服务搞成多节点以后,网关默认走轮循,但是相同的服务在不同的机器上,这样就会导致切片文件散落在不同的服务器上,会导致文件合并失败;所以根据一个标识去自定义gateway对应服务的负载均衡策略,可以解决这个问题;

我的版本如下:

<spring-boot.version>2.7.3</spring-boot.version>
        <spring-cloud.version>2021.0.4</spring-cloud.version>
        <spring-cloud-alibaba.version>2021.0.4.0</spring-cloud-alibaba.version>

二、参考默认实现

springCloud原生默认的负载均衡策略是这个类:

org.springframework.cloud.loadbalancer.core.RoundRobinLoadBalancer

我们参考这个类实现自己的负载均衡策略即可,RoundRobinLoadBalancer实现了ReactorServiceInstanceLoadBalancer这个接口,实现了choose这个方法,如下图:

在choose方法中调用了processInstanceResponse方法,processInstanceResponse方法中调用了getInstanceResponse方法,所以我们我们可以复制RoundRobinLoadBalancer整个类,只修改getInstanceResponse这个方法里的内容就可以实现自定义负载均衡策略。

三、实现代码

原理:根据请求头当中设备的唯一标识传递到下游,唯一标识做哈希取余,可以指定对应的服务器节点,需要的服务设置自定义负载策略,不需要的服务设置默认的轮循机制即可

package com.wondertek.gateway.loadBalancer;import cn.hutool.core.util.ObjectUtil;
import com.wondertek.web.exception.enums.HttpRequestHeaderEnum;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.cloud.gateway.filter.GlobalFilter;
import org.springframework.core.Ordered;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;
@Slf4j
@Component
public class RequestFilter implements GlobalFilter, Ordered {@Overridepublic int getOrder() {// 应该小于LoadBalancerClientFilter的顺序值return Ordered.HIGHEST_PRECEDENCE;}@Overridepublic Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {ServerHttpRequest request = exchange.getRequest();String clientDeviceUniqueCode = request.getHeaders().getFirst(HttpRequestHeaderEnum.CLIENT_DEVICE_UNIQUE_CODE.getCode());// 存入Reactor上下文String resultCode = clientDeviceUniqueCode;return chain.filter(exchange).contextWrite(context -> {if (ObjectUtil.isNotEmpty(resultCode)) {log.info("开始将request中的唯一标识封装到上下游中:{}", resultCode);return context.put("identification", resultCode);} else {// 或者根据需求进行其他处理return context;}});}
}
package com.wondertek.gateway.loadBalancer;import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.loadbalancer.DefaultResponse;
import org.springframework.cloud.client.loadbalancer.EmptyResponse;
import org.springframework.cloud.client.loadbalancer.Request;
import org.springframework.cloud.client.loadbalancer.Response;
import org.springframework.cloud.loadbalancer.core.NoopServiceInstanceListSupplier;
import org.springframework.cloud.loadbalancer.core.ReactorServiceInstanceLoadBalancer;
import org.springframework.cloud.loadbalancer.core.SelectedInstanceCallback;
import org.springframework.cloud.loadbalancer.core.ServiceInstanceListSupplier;
import reactor.core.publisher.Mono;import java.util.*;
import java.util.concurrent.ThreadLocalRandom;@Slf4j
public class ClientDeviceUniqueCodeInstanceLoadBalancer implements ReactorServiceInstanceLoadBalancer {private final String serviceId;private ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider;public ClientDeviceUniqueCodeInstanceLoadBalancer(ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider, String serviceId) {this.serviceId = serviceId;this.serviceInstanceListSupplierProvider = serviceInstanceListSupplierProvider;}@Overridepublic Mono<Response<ServiceInstance>> choose(Request request) {//在 choose 方法中,使用 deferContextual 方法来访问上下文并提取客户端标识。这里的 getOrDefault 方法尝试从上下文中获取一个键为 "identification" 的值,如果不存在则返回 "default-identification"return Mono.deferContextual(contextView -> {String identification = contextView.getOrDefault("identification", "14d58a1ba286f087d9736249ec785314");log.info("上下游获取到的identification的值为:{}", identification);ServiceInstanceListSupplier supplier = serviceInstanceListSupplierProvider.getIfAvailable(NoopServiceInstanceListSupplier::new);return supplier.get(request).next().map(serviceInstances -> processInstanceResponse(supplier, serviceInstances, identification));});}private Response<ServiceInstance> processInstanceResponse(ServiceInstanceListSupplier supplier, List<ServiceInstance> serviceInstances, String identification) {Response<ServiceInstance> serviceInstanceResponse;if (Objects.isNull(identification)) {serviceInstanceResponse = this.getInstanceResponse(serviceInstances, null);} else {serviceInstanceResponse = this.getIpInstanceResponse(serviceInstances, identification);}if (supplier instanceof SelectedInstanceCallback && serviceInstanceResponse.hasServer()) {((SelectedInstanceCallback) supplier).selectedServiceInstance((ServiceInstance) serviceInstanceResponse.getServer());}return serviceInstanceResponse;}private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> instances, String identification) {if (instances.isEmpty()) {if (log.isWarnEnabled()) {log.warn("No servers available for service: " + this.serviceId);}return new EmptyResponse();} else {int index = ThreadLocalRandom.current().nextInt(instances.size());ServiceInstance instance = (ServiceInstance) instances.get(index);return new DefaultResponse(instance);}}private Response<ServiceInstance> getIpInstanceResponse(List<ServiceInstance> instances, String identification) {if (instances.isEmpty()) {log.warn("No servers available for service: " + this.serviceId);return new EmptyResponse();} else if (instances.size() == 1) {log.info("只有一个服务实例,直接返回这个实例");return new DefaultResponse(instances.get(0));} else {//创建一个新的列表以避免在原始列表上排序,避免了修改共享状态可能带来的线程安全问题List<ServiceInstance> sortedInstances = new ArrayList<>(instances);// 现在对新列表进行排序,保持原始列表的顺序不变Collections.sort(sortedInstances, Comparator.comparing(ServiceInstance::getHost));//log.info("获取到的实例个数的值为:{}", sortedInstances.size());sortedInstances.forEach(instance -> log.info("排序后的实例: {},{}", instance.getHost(), instance.getPort()));//log.info("多个服务实例,使用客户端 identification 地址的哈希值来选择服务实例");// 使用排序后的列表来找到实例int ipHashCode = Math.abs(identification.hashCode());//log.info("identificationHashCode的值为:{}", ipHashCode);int instanceIndex = ipHashCode % sortedInstances.size();//log.info("instanceIndex的值为:{}", instanceIndex);ServiceInstance instanceToReturn = sortedInstances.get(instanceIndex);//log.info("instanceToReturn.getUri()的值为:{}", instanceToReturn.getUri());log.info("自定义identification负载机制,Client identification: {} is routed to instance: {}:{}", identification, instanceToReturn.getHost(), instanceToReturn.getPort());return new DefaultResponse(instanceToReturn);}}}
package com.wondertek.gateway.loadBalancer;import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.loadbalancer.DefaultResponse;
import org.springframework.cloud.client.loadbalancer.EmptyResponse;
import org.springframework.cloud.client.loadbalancer.Request;
import org.springframework.cloud.client.loadbalancer.Response;
import org.springframework.cloud.loadbalancer.core.NoopServiceInstanceListSupplier;
import org.springframework.cloud.loadbalancer.core.ReactorServiceInstanceLoadBalancer;
import org.springframework.cloud.loadbalancer.core.SelectedInstanceCallback;
import org.springframework.cloud.loadbalancer.core.ServiceInstanceListSupplier;
import reactor.core.publisher.Mono;import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;@Slf4j
public class DefaultInstanceLoadBalancer implements ReactorServiceInstanceLoadBalancer {private final String serviceId;private ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider;final AtomicInteger position;public DefaultInstanceLoadBalancer(ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider, String serviceId, AtomicInteger position) {this.serviceId = serviceId;this.serviceInstanceListSupplierProvider = serviceInstanceListSupplierProvider;this.position = position;}@Overridepublic Mono<Response<ServiceInstance>> choose(Request request) {ServiceInstanceListSupplier supplier = serviceInstanceListSupplierProvider.getIfAvailable(NoopServiceInstanceListSupplier::new);return supplier.get(request).next().map(serviceInstances -> processInstanceResponse(supplier, serviceInstances));}private Response<ServiceInstance> processInstanceResponse(ServiceInstanceListSupplier supplier,List<ServiceInstance> serviceInstances) {Response<ServiceInstance> serviceInstanceResponse = getInstanceResponse(serviceInstances);if (supplier instanceof SelectedInstanceCallback && serviceInstanceResponse.hasServer()) {((SelectedInstanceCallback) supplier).selectedServiceInstance(serviceInstanceResponse.getServer());}return serviceInstanceResponse;}private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> instances) {if (instances.isEmpty()) {if (log.isWarnEnabled()) {log.warn("No servers available for service: " + serviceId);}return new EmptyResponse();}//创建一个新的列表以避免在原始列表上排序,避免了修改共享状态可能带来的线程安全问题List<ServiceInstance> sortedInstances = new ArrayList<>(instances);// 现在对新列表进行排序,保持原始列表的顺序不变Collections.sort(sortedInstances, Comparator.comparing(ServiceInstance::getHost));//log.info("获取到的实例个数的值为:{}", sortedInstances.size());sortedInstances.forEach(instance -> log.info("排序后的实例: {},{}", instance.getHost(), instance.getPort()));int pos = Math.abs(this.position.incrementAndGet());//log.info("默认轮循机制,pos递加后的值为:{}", pos);int positionIndex = pos % instances.size();//log.info("取余后的positionIndex的值为:{}", positionIndex);ServiceInstance instance = instances.get(positionIndex);//log.info("instance.getUri()的值为:{}", instance.getUri());log.info("默认轮循机制,routed to instance: {}:{}",instance.getHost(), instance.getPort());return new DefaultResponse(instance);}}
package com.wondertek.gateway.loadBalancer;import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.cloud.loadbalancer.annotation.LoadBalancerClient;
import org.springframework.cloud.loadbalancer.annotation.LoadBalancerClients;
import org.springframework.cloud.loadbalancer.core.ReactorServiceInstanceLoadBalancer;
import org.springframework.cloud.loadbalancer.core.ServiceInstanceListSupplier;
import org.springframework.cloud.loadbalancer.support.LoadBalancerClientFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;import java.util.concurrent.atomic.AtomicInteger;@Configuration
//单台服务
//@LoadBalancerClient(name = "oms-api", configuration = CustomLoadBalancerConfig.class)
//多台服务
@LoadBalancerClients({@LoadBalancerClient(name = "oms-api", configuration = CustomLoadBalancerConfig.class),@LoadBalancerClient(name = "unity-api", configuration = CustomLoadBalancerConfig.class),@LoadBalancerClient(name = "cloud-api", configuration = CustomLoadBalancerConfig.class),@LoadBalancerClient(name = "open-api", configuration = CustomLoadBalancerConfig.class),@LoadBalancerClient(name = "server-api", configuration = CustomLoadBalancerConfig.class),@LoadBalancerClient(name = "center-service", configuration = CustomLoadBalancerConfig.class),
})
@Slf4j
public class CustomLoadBalancerConfig {// 定义一个Bean来提供AtomicInteger的实例@Beanpublic AtomicInteger positionTracker() {// 这将在应用上下文中只初始化一次return new AtomicInteger(0);}//自定义优先级负载均衡器@Beanpublic ReactorServiceInstanceLoadBalancer customPriorityLoadBalancer(ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider,Environment environment,AtomicInteger positionTracker) {String serviceId = environment.getProperty(LoadBalancerClientFactory.PROPERTY_NAME);//目的为解决文件上传切片文件分散上传的问题if ("oms-api".equals(serviceId)||"unity-api".equals(serviceId)||"cloud-api".equals(serviceId)){//log.info("服务名称:serviceId:{},走自定义clientDeviceUniqueCode负载模式", serviceId);return new ClientDeviceUniqueCodeInstanceLoadBalancer(serviceInstanceListSupplierProvider, serviceId);}//log.info("服务名称:serviceId:{},走默认负载模式", serviceId);return new DefaultInstanceLoadBalancer(serviceInstanceListSupplierProvider, serviceId,positionTracker);}
}

【SpringCloud系列】开发环境下重写Loadbalancer实现自定义负载均衡

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/581197.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

获取Android和iOS崩溃日志的方法

文章目录 一、Android崩溃日志1、获取方法1.1 通过adb logcat获取1.2 通过adb shell dumpsys dropbox命令获取 2、导出设备Crash日志3、导出设备ANR日志4、常见日志类别 二、iOS崩溃日志1、获取方法1.1 xcode中打开1.2 手机上直接获取 2、Crash 头部信息 一、Android崩溃日志 …

js中深拷贝与浅拷贝的区别?如何实现一个深拷贝?(收藏好,用时好找)

文章目录 一、数据类型存储二、浅拷贝Object.assignslice()concat()拓展运算符 三、深拷贝_.cloneDeep()jQuery.extend()JSON.stringify()循环递归 四、区别小结 一、数据类型存储 前面文章我们讲到&#xff0c;JavaScript中存在两大数据类型&#xff1a; 基本类型引用类型 …

为什么微信小程序不用div而是用自创的view标签?

在官方的《小程序开发指南》中&#xff0c;明确说明了为什么要自定义标签以及这么设计的原因和带来的好处&#xff0c;说明如下&#xff1a; 6.1.2 管控与安全 基于Web 技术来渲染小程序是存在一些不可控因素和安全风险的。这是因为Web技术是非常开放灵活的&#xff0c;我们可以…

多维时序 | MATLAB实现SSA-GRU麻雀算法优化门控循环单元多变量时间序列预测

多维时序 | MATLAB实现SSA-GRU麻雀算法优化门控循环单元多变量时间序列预测 目录 多维时序 | MATLAB实现SSA-GRU麻雀算法优化门控循环单元多变量时间序列预测预测效果基本介绍程序设计参考资料 预测效果 基本介绍 1.MATLAB实现SSA-GRU麻雀算法优化门控循环单元多变量时间序列预…

[C/C++]排序算法 快速排序 (递归与非递归)

目录 &#x1f6a9;概念: &#x1f6a9;实现: ⚡1.hoare ⚡2.挖坑法 ⚡3.双指针法 &#x1f6a9;快速排序递归实现 &#x1f6a9;快速排序非递归实现 &#x1f6a9;概念: 通过一趟排序将要排序的数据分割成独立的两部分&#xff0c;其中一部分的所有数据比另一部分的所有…

C# Lock 解读

C# Lock 解读 一、Lock定义二、简单例子三、简单解释一下执行过程四、Lock的对象选择问题1、为什么不能lock值类型2、Lock字符串3、MSDN推荐的Lock对象4、lock(typeof(Class)) 五、特殊问题&#xff1a;Lock&#xff08;this&#xff09;等的详细解释总结 一、Lock定义 lock 关…

Vue.set 方法原理

function set(target, key, value) {// 判断是否是数组&#xff0c;并且 key 是一个有效的索引值if (Array.isArray(target) && isValidArrayIndex(key)) {target.length Math.max(target.length, key)target.splice(key, 1, value)return value}// 判断 key 是否已经…

JS常见正则表达式写法(附案例)

正则表达式方法示例: 1. test方法解析&#xff0c;test判断正则是否在字符串中出现过&#xff0c;如果出现返回true&#xff0c;如果没出现返回false。 let str hello world; let ret1 /e/.test(str); // true let ret2 /q/.test(str); // false 如&…

Pytorch的讲解及实战·MNIST数据集手写数字识别

目录 一、前言与pytorch的下载 1、前言 2、下载pytorch ①创建虚拟环境 ②下载pytorch&#xff08;cpu版&#xff09; ③测试pytorch是否下载成功 ④使用jupyter notebook 但是使用不了torch的解决方法 二、pytorch的使用 1、Tensor的数据类型 ①torch.FloatTensor …

【nw.js】使用nw.js将html页面打包成exe免安装程序

文章目录 一、批处理zip命令&#xff08;已有可跳过此步骤&#xff09;二、nw.js包三、使用批处理命令打包成exe可执行文件四、使用EnigmaVB打包成免安装可独立运行的exe文件五、结束 一、批处理zip命令&#xff08;已有可跳过此步骤&#xff09; 下载zip&#xff0c;你可以到该…

maven依赖无法传递问题排查

一、背景 在A模块中引入B模块&#xff0c;C服务引入A模块但是B模块没有传递进来。 二、排查 使用mvn clean install -Dmaven.test.skiptrue查看打包日志信息&#xff0c;通过搜索A模块名称&#xff0c;出现如下警告信息&#xff1a; [WARING] The POM for A:jar:0.0.1-SNAP…

HarmonyView: Harmonizing Consistency and Diversity in One-Image-to-3D

Q: 这篇论文试图解决什么问题&#xff1f; A: 这篇论文试图解决单张图像到3D内容生成中的一致性和多样性之间的平衡问题。具体来说&#xff0c;论文提出了一个名为HarmonyView的方法&#xff0c;它通过分解一致性和多样性这两个方面来生成既具有视觉一致性又具有多样性的3D内容…

ROS无人机初始化GPS定位漂移误差,确保无人机稳定飞行

引言&#xff1a; 由于GPS在室外漂移的误差比较大&#xff0c;在长时间静止后启动&#xff0c;程序发布的位置可能已经和预期的位置相差较大&#xff0c;导致无法完成任务&#xff0c;尤其是气压计的数据不准&#xff0c;可能会导致无人机不能起飞或者一飞冲天。本文主要是在进…

53.网游逆向分析与插件开发-游戏反调试功能的实现-通过内核信息检测调试器

码云地址&#xff08;master分支&#xff09;&#xff1a;https://gitee.com/dye_your_fingers/sro_-ex.git 码云版本号&#xff1a;b44fddef016fc1587eda40ca7f112f02a8289504 代码下载地址&#xff0c;在 SRO_EX 目录下&#xff0c;文件名为&#xff1a;SRO_Ex-通过内核信息…

ceph集群搭建到应用从入门到熟练,包含块存储、对象存储、cephfs的应用、cephx认证等

ceph-deploy比较适合生产环境&#xff0c;不是用cephadm搭建。相对麻烦一些&#xff0c;但是并不难&#xff0c;细节把握好就行&#xff0c;只是命令多一些而已。 ceph理论知识 略… ceph集群实验环境 服务器主机public网段IP&#xff08;对外服务&#xff09;cluster网段I…

取出一个时间序列中每一个元素里的日期Series.dt.date()

【小白从小学Python、C、Java】 【计算机等考500强证书考研】 【Python-数据分析】 取出一个时间序列中 每一个元素里的年月日 Series.dt.date [太阳]选择题 以下代码的输出结果中正确的是? import pandas as pd t pd.Series(pd.date_range("2023-12-28", periods4…

OpenCV中使用Mask R-CNN实现图像分割的原理与技术实现方案

本文详细介绍了在OpenCV中利用Mask R-CNN实现图像分割的原理和技术实现方案。Mask R-CNN是一种先进的深度学习模型&#xff0c;通过结合区域提议网络&#xff08;Region Proposal Network&#xff09;和全卷积网络&#xff08;Fully Convolutional Network&#xff09;&#xf…

Java企业电子招投标系统源代码,支持二次开发,采用Spring cloud框架

在数字化采购领域&#xff0c;企业需要一个高效、透明和规范的管理系统。通过采用Spring Cloud、Spring Boot2、Mybatis等先进技术&#xff0c;我们打造了全过程数字化采购管理平台。该平台具备内外协同的能力&#xff0c;通过待办消息、招标公告、中标公告和信息发布等功能模块…

饥荒Mod 开发(二四):制作一把万能工具

饥荒Mod 开发(二三)&#xff1a;显示物品栏详细信息 饥荒Mod 开发(二五)&#xff1a;常用组件 总结 源码 饥荒中的每种工具都有独特的功能&#xff0c;比如 斧头用来砍树&#xff0c; 铲子用来 挖东西&#xff0c;鹤嘴锄用来挖矿&#xff0c; 锤子可以敲碎东西&#xff0c;所以…

2013年第二届数学建模国际赛小美赛A题数学与经济灾难解题全过程文档及程序

2013年第二届数学建模国际赛小美赛 A题 数学与经济灾难 原题再现&#xff1a; 2008年的市场崩盘使世界陷入经济衰退&#xff0c;目前世界经济仍处于低迷状态&#xff0c;其原因是多方面的。其中之一是数学。   当然&#xff0c;并非只有金融界依赖于并非总是可靠的数学模型…