Java在SpringCloud中自定义Gateway负载均衡策略

Java在SpringCloud中自定义Gateway负载均衡策略

一、前言

spring-cloud-starter-netflix-ribbon已经不再更新了,最新版本是2.2.10.RELEASE,最后更新时间是2021年11月18日,详细信息可以看maven官方仓库:org.springframework.cloud/spring-cloud-starter-netflix-ribbon,SpringCloud官方推荐使用spring-cloud-starter-loadbalancer进行负载均衡。

背景:大文件上传做切片文件上传;

流程:将切片文件上传到服务器,然后进行合并任务,合并完成之后上传到对象存储;现在服务搞成多节点以后,网关默认走轮循,但是相同的服务在不同的机器上,这样就会导致切片文件散落在不同的服务器上,会导致文件合并失败;所以根据一个标识去自定义gateway对应服务的负载均衡策略,可以解决这个问题;

我的版本如下:

<spring-boot.version>2.7.3</spring-boot.version>
        <spring-cloud.version>2021.0.4</spring-cloud.version>
        <spring-cloud-alibaba.version>2021.0.4.0</spring-cloud-alibaba.version>

二、参考默认实现

springCloud原生默认的负载均衡策略是这个类:

org.springframework.cloud.loadbalancer.core.RoundRobinLoadBalancer

我们参考这个类实现自己的负载均衡策略即可,RoundRobinLoadBalancer实现了ReactorServiceInstanceLoadBalancer这个接口,实现了choose这个方法,如下图:

在choose方法中调用了processInstanceResponse方法,processInstanceResponse方法中调用了getInstanceResponse方法,所以我们我们可以复制RoundRobinLoadBalancer整个类,只修改getInstanceResponse这个方法里的内容就可以实现自定义负载均衡策略。

三、实现代码

原理:根据请求头当中设备的唯一标识传递到下游,唯一标识做哈希取余,可以指定对应的服务器节点,需要的服务设置自定义负载策略,不需要的服务设置默认的轮循机制即可

package com.wondertek.gateway.loadBalancer;import cn.hutool.core.util.ObjectUtil;
import com.wondertek.web.exception.enums.HttpRequestHeaderEnum;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.cloud.gateway.filter.GlobalFilter;
import org.springframework.core.Ordered;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;
@Slf4j
@Component
public class RequestFilter implements GlobalFilter, Ordered {@Overridepublic int getOrder() {// 应该小于LoadBalancerClientFilter的顺序值return Ordered.HIGHEST_PRECEDENCE;}@Overridepublic Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {ServerHttpRequest request = exchange.getRequest();String clientDeviceUniqueCode = request.getHeaders().getFirst(HttpRequestHeaderEnum.CLIENT_DEVICE_UNIQUE_CODE.getCode());// 存入Reactor上下文String resultCode = clientDeviceUniqueCode;return chain.filter(exchange).contextWrite(context -> {if (ObjectUtil.isNotEmpty(resultCode)) {log.info("开始将request中的唯一标识封装到上下游中:{}", resultCode);return context.put("identification", resultCode);} else {// 或者根据需求进行其他处理return context;}});}
}
package com.wondertek.gateway.loadBalancer;import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.loadbalancer.DefaultResponse;
import org.springframework.cloud.client.loadbalancer.EmptyResponse;
import org.springframework.cloud.client.loadbalancer.Request;
import org.springframework.cloud.client.loadbalancer.Response;
import org.springframework.cloud.loadbalancer.core.NoopServiceInstanceListSupplier;
import org.springframework.cloud.loadbalancer.core.ReactorServiceInstanceLoadBalancer;
import org.springframework.cloud.loadbalancer.core.SelectedInstanceCallback;
import org.springframework.cloud.loadbalancer.core.ServiceInstanceListSupplier;
import reactor.core.publisher.Mono;import java.util.*;
import java.util.concurrent.ThreadLocalRandom;@Slf4j
public class ClientDeviceUniqueCodeInstanceLoadBalancer implements ReactorServiceInstanceLoadBalancer {private final String serviceId;private ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider;public ClientDeviceUniqueCodeInstanceLoadBalancer(ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider, String serviceId) {this.serviceId = serviceId;this.serviceInstanceListSupplierProvider = serviceInstanceListSupplierProvider;}@Overridepublic Mono<Response<ServiceInstance>> choose(Request request) {//在 choose 方法中,使用 deferContextual 方法来访问上下文并提取客户端标识。这里的 getOrDefault 方法尝试从上下文中获取一个键为 "identification" 的值,如果不存在则返回 "default-identification"return Mono.deferContextual(contextView -> {String identification = contextView.getOrDefault("identification", "14d58a1ba286f087d9736249ec785314");log.info("上下游获取到的identification的值为:{}", identification);ServiceInstanceListSupplier supplier = serviceInstanceListSupplierProvider.getIfAvailable(NoopServiceInstanceListSupplier::new);return supplier.get(request).next().map(serviceInstances -> processInstanceResponse(supplier, serviceInstances, identification));});}private Response<ServiceInstance> processInstanceResponse(ServiceInstanceListSupplier supplier, List<ServiceInstance> serviceInstances, String identification) {Response<ServiceInstance> serviceInstanceResponse;if (Objects.isNull(identification)) {serviceInstanceResponse = this.getInstanceResponse(serviceInstances, null);} else {serviceInstanceResponse = this.getIpInstanceResponse(serviceInstances, identification);}if (supplier instanceof SelectedInstanceCallback && serviceInstanceResponse.hasServer()) {((SelectedInstanceCallback) supplier).selectedServiceInstance((ServiceInstance) serviceInstanceResponse.getServer());}return serviceInstanceResponse;}private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> instances, String identification) {if (instances.isEmpty()) {if (log.isWarnEnabled()) {log.warn("No servers available for service: " + this.serviceId);}return new EmptyResponse();} else {int index = ThreadLocalRandom.current().nextInt(instances.size());ServiceInstance instance = (ServiceInstance) instances.get(index);return new DefaultResponse(instance);}}private Response<ServiceInstance> getIpInstanceResponse(List<ServiceInstance> instances, String identification) {if (instances.isEmpty()) {log.warn("No servers available for service: " + this.serviceId);return new EmptyResponse();} else if (instances.size() == 1) {log.info("只有一个服务实例,直接返回这个实例");return new DefaultResponse(instances.get(0));} else {//创建一个新的列表以避免在原始列表上排序,避免了修改共享状态可能带来的线程安全问题List<ServiceInstance> sortedInstances = new ArrayList<>(instances);// 现在对新列表进行排序,保持原始列表的顺序不变Collections.sort(sortedInstances, Comparator.comparing(ServiceInstance::getHost));//log.info("获取到的实例个数的值为:{}", sortedInstances.size());sortedInstances.forEach(instance -> log.info("排序后的实例: {},{}", instance.getHost(), instance.getPort()));//log.info("多个服务实例,使用客户端 identification 地址的哈希值来选择服务实例");// 使用排序后的列表来找到实例int ipHashCode = Math.abs(identification.hashCode());//log.info("identificationHashCode的值为:{}", ipHashCode);int instanceIndex = ipHashCode % sortedInstances.size();//log.info("instanceIndex的值为:{}", instanceIndex);ServiceInstance instanceToReturn = sortedInstances.get(instanceIndex);//log.info("instanceToReturn.getUri()的值为:{}", instanceToReturn.getUri());log.info("自定义identification负载机制,Client identification: {} is routed to instance: {}:{}", identification, instanceToReturn.getHost(), instanceToReturn.getPort());return new DefaultResponse(instanceToReturn);}}}
package com.wondertek.gateway.loadBalancer;import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.loadbalancer.DefaultResponse;
import org.springframework.cloud.client.loadbalancer.EmptyResponse;
import org.springframework.cloud.client.loadbalancer.Request;
import org.springframework.cloud.client.loadbalancer.Response;
import org.springframework.cloud.loadbalancer.core.NoopServiceInstanceListSupplier;
import org.springframework.cloud.loadbalancer.core.ReactorServiceInstanceLoadBalancer;
import org.springframework.cloud.loadbalancer.core.SelectedInstanceCallback;
import org.springframework.cloud.loadbalancer.core.ServiceInstanceListSupplier;
import reactor.core.publisher.Mono;import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;@Slf4j
public class DefaultInstanceLoadBalancer implements ReactorServiceInstanceLoadBalancer {private final String serviceId;private ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider;final AtomicInteger position;public DefaultInstanceLoadBalancer(ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider, String serviceId, AtomicInteger position) {this.serviceId = serviceId;this.serviceInstanceListSupplierProvider = serviceInstanceListSupplierProvider;this.position = position;}@Overridepublic Mono<Response<ServiceInstance>> choose(Request request) {ServiceInstanceListSupplier supplier = serviceInstanceListSupplierProvider.getIfAvailable(NoopServiceInstanceListSupplier::new);return supplier.get(request).next().map(serviceInstances -> processInstanceResponse(supplier, serviceInstances));}private Response<ServiceInstance> processInstanceResponse(ServiceInstanceListSupplier supplier,List<ServiceInstance> serviceInstances) {Response<ServiceInstance> serviceInstanceResponse = getInstanceResponse(serviceInstances);if (supplier instanceof SelectedInstanceCallback && serviceInstanceResponse.hasServer()) {((SelectedInstanceCallback) supplier).selectedServiceInstance(serviceInstanceResponse.getServer());}return serviceInstanceResponse;}private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> instances) {if (instances.isEmpty()) {if (log.isWarnEnabled()) {log.warn("No servers available for service: " + serviceId);}return new EmptyResponse();}//创建一个新的列表以避免在原始列表上排序,避免了修改共享状态可能带来的线程安全问题List<ServiceInstance> sortedInstances = new ArrayList<>(instances);// 现在对新列表进行排序,保持原始列表的顺序不变Collections.sort(sortedInstances, Comparator.comparing(ServiceInstance::getHost));//log.info("获取到的实例个数的值为:{}", sortedInstances.size());sortedInstances.forEach(instance -> log.info("排序后的实例: {},{}", instance.getHost(), instance.getPort()));int pos = Math.abs(this.position.incrementAndGet());//log.info("默认轮循机制,pos递加后的值为:{}", pos);int positionIndex = pos % instances.size();//log.info("取余后的positionIndex的值为:{}", positionIndex);ServiceInstance instance = instances.get(positionIndex);//log.info("instance.getUri()的值为:{}", instance.getUri());log.info("默认轮循机制,routed to instance: {}:{}",instance.getHost(), instance.getPort());return new DefaultResponse(instance);}}
package com.wondertek.gateway.loadBalancer;import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.cloud.loadbalancer.annotation.LoadBalancerClient;
import org.springframework.cloud.loadbalancer.annotation.LoadBalancerClients;
import org.springframework.cloud.loadbalancer.core.ReactorServiceInstanceLoadBalancer;
import org.springframework.cloud.loadbalancer.core.ServiceInstanceListSupplier;
import org.springframework.cloud.loadbalancer.support.LoadBalancerClientFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;import java.util.concurrent.atomic.AtomicInteger;@Configuration
//单台服务
//@LoadBalancerClient(name = "oms-api", configuration = CustomLoadBalancerConfig.class)
//多台服务
@LoadBalancerClients({@LoadBalancerClient(name = "oms-api", configuration = CustomLoadBalancerConfig.class),@LoadBalancerClient(name = "unity-api", configuration = CustomLoadBalancerConfig.class),@LoadBalancerClient(name = "cloud-api", configuration = CustomLoadBalancerConfig.class),@LoadBalancerClient(name = "open-api", configuration = CustomLoadBalancerConfig.class),@LoadBalancerClient(name = "server-api", configuration = CustomLoadBalancerConfig.class),@LoadBalancerClient(name = "center-service", configuration = CustomLoadBalancerConfig.class),
})
@Slf4j
public class CustomLoadBalancerConfig {// 定义一个Bean来提供AtomicInteger的实例@Beanpublic AtomicInteger positionTracker() {// 这将在应用上下文中只初始化一次return new AtomicInteger(0);}//自定义优先级负载均衡器@Beanpublic ReactorServiceInstanceLoadBalancer customPriorityLoadBalancer(ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider,Environment environment,AtomicInteger positionTracker) {String serviceId = environment.getProperty(LoadBalancerClientFactory.PROPERTY_NAME);//目的为解决文件上传切片文件分散上传的问题if ("oms-api".equals(serviceId)||"unity-api".equals(serviceId)||"cloud-api".equals(serviceId)){//log.info("服务名称:serviceId:{},走自定义clientDeviceUniqueCode负载模式", serviceId);return new ClientDeviceUniqueCodeInstanceLoadBalancer(serviceInstanceListSupplierProvider, serviceId);}//log.info("服务名称:serviceId:{},走默认负载模式", serviceId);return new DefaultInstanceLoadBalancer(serviceInstanceListSupplierProvider, serviceId,positionTracker);}
}

【SpringCloud系列】开发环境下重写Loadbalancer实现自定义负载均衡

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/581197.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

获取Android和iOS崩溃日志的方法

文章目录 一、Android崩溃日志1、获取方法1.1 通过adb logcat获取1.2 通过adb shell dumpsys dropbox命令获取 2、导出设备Crash日志3、导出设备ANR日志4、常见日志类别 二、iOS崩溃日志1、获取方法1.1 xcode中打开1.2 手机上直接获取 2、Crash 头部信息 一、Android崩溃日志 …

js中深拷贝与浅拷贝的区别?如何实现一个深拷贝?(收藏好,用时好找)

文章目录 一、数据类型存储二、浅拷贝Object.assignslice()concat()拓展运算符 三、深拷贝_.cloneDeep()jQuery.extend()JSON.stringify()循环递归 四、区别小结 一、数据类型存储 前面文章我们讲到&#xff0c;JavaScript中存在两大数据类型&#xff1a; 基本类型引用类型 …

多维时序 | MATLAB实现SSA-GRU麻雀算法优化门控循环单元多变量时间序列预测

多维时序 | MATLAB实现SSA-GRU麻雀算法优化门控循环单元多变量时间序列预测 目录 多维时序 | MATLAB实现SSA-GRU麻雀算法优化门控循环单元多变量时间序列预测预测效果基本介绍程序设计参考资料 预测效果 基本介绍 1.MATLAB实现SSA-GRU麻雀算法优化门控循环单元多变量时间序列预…

[C/C++]排序算法 快速排序 (递归与非递归)

目录 &#x1f6a9;概念: &#x1f6a9;实现: ⚡1.hoare ⚡2.挖坑法 ⚡3.双指针法 &#x1f6a9;快速排序递归实现 &#x1f6a9;快速排序非递归实现 &#x1f6a9;概念: 通过一趟排序将要排序的数据分割成独立的两部分&#xff0c;其中一部分的所有数据比另一部分的所有…

Pytorch的讲解及实战·MNIST数据集手写数字识别

目录 一、前言与pytorch的下载 1、前言 2、下载pytorch ①创建虚拟环境 ②下载pytorch&#xff08;cpu版&#xff09; ③测试pytorch是否下载成功 ④使用jupyter notebook 但是使用不了torch的解决方法 二、pytorch的使用 1、Tensor的数据类型 ①torch.FloatTensor …

【nw.js】使用nw.js将html页面打包成exe免安装程序

文章目录 一、批处理zip命令&#xff08;已有可跳过此步骤&#xff09;二、nw.js包三、使用批处理命令打包成exe可执行文件四、使用EnigmaVB打包成免安装可独立运行的exe文件五、结束 一、批处理zip命令&#xff08;已有可跳过此步骤&#xff09; 下载zip&#xff0c;你可以到该…

53.网游逆向分析与插件开发-游戏反调试功能的实现-通过内核信息检测调试器

码云地址&#xff08;master分支&#xff09;&#xff1a;https://gitee.com/dye_your_fingers/sro_-ex.git 码云版本号&#xff1a;b44fddef016fc1587eda40ca7f112f02a8289504 代码下载地址&#xff0c;在 SRO_EX 目录下&#xff0c;文件名为&#xff1a;SRO_Ex-通过内核信息…

取出一个时间序列中每一个元素里的日期Series.dt.date()

【小白从小学Python、C、Java】 【计算机等考500强证书考研】 【Python-数据分析】 取出一个时间序列中 每一个元素里的年月日 Series.dt.date [太阳]选择题 以下代码的输出结果中正确的是? import pandas as pd t pd.Series(pd.date_range("2023-12-28", periods4…

Java企业电子招投标系统源代码,支持二次开发,采用Spring cloud框架

在数字化采购领域&#xff0c;企业需要一个高效、透明和规范的管理系统。通过采用Spring Cloud、Spring Boot2、Mybatis等先进技术&#xff0c;我们打造了全过程数字化采购管理平台。该平台具备内外协同的能力&#xff0c;通过待办消息、招标公告、中标公告和信息发布等功能模块…

饥荒Mod 开发(二四):制作一把万能工具

饥荒Mod 开发(二三)&#xff1a;显示物品栏详细信息 饥荒Mod 开发(二五)&#xff1a;常用组件 总结 源码 饥荒中的每种工具都有独特的功能&#xff0c;比如 斧头用来砍树&#xff0c; 铲子用来 挖东西&#xff0c;鹤嘴锄用来挖矿&#xff0c; 锤子可以敲碎东西&#xff0c;所以…

2013年第二届数学建模国际赛小美赛A题数学与经济灾难解题全过程文档及程序

2013年第二届数学建模国际赛小美赛 A题 数学与经济灾难 原题再现&#xff1a; 2008年的市场崩盘使世界陷入经济衰退&#xff0c;目前世界经济仍处于低迷状态&#xff0c;其原因是多方面的。其中之一是数学。   当然&#xff0c;并非只有金融界依赖于并非总是可靠的数学模型…

postman使用-03发送请求

文章目录 请求1.新建请求2.选择请求方式3.填写请求URL4.填写请求参数get请求参数在params中填写&#xff08;填完后在url中会自动显示&#xff09;post请求参数在body中填写&#xff0c;根据接口文档请求头里面的content-type选择body中的数据类型post请求参数为json-选择raw-选…

Flask 与微信小程序对接

Flask 与微信小程序的对接 在 web/controllers/api中增建py文件&#xff0c;主要是给微信小程序使用的。 web/controllers/init.py # -*- coding: utf-8 -*- from flask import Blueprint route_api Blueprint( api_page,__name__ )route_api.route("/") def ind…

软件测试/测试开发丨Pytest测试用例生命周期管理-Fixture

1、Fixture 用法 Fixture 特点及优势 1&#xff64;命令灵活&#xff1a;对于 setup,teardown,可以不起这两个名字2&#xff64;数据共享&#xff1a;在 conftest.py 配置⾥写⽅法可以实现数据共享&#xff0c;不需要 import 导⼊。可以跨⽂件共享3&#xff64;scope 的层次及…

Linux内核中断

Linux内核中断 ARM里当按下按键的时候&#xff0c;他首先会执行汇编文件start.s里面的异常向量表里面的irq,在irq里面进行一些操作。 再跳转到C的do_irq(); 进行操作&#xff1a;1&#xff09;判断中断的序号&#xff1b;2&#xff09;处理中断&#xff1b;3&#xff09;清除中…

2024美赛数学建模思路A题B题C题D题E题F题思路汇总 选题分析

文章目录 1 赛题思路2 美赛比赛日期和时间3 赛题类型4 美赛常见数模问题5 建模资料 1 赛题思路 (赛题出来以后第一时间在CSDN分享) https://blog.csdn.net/dc_sinor?typeblog 2 美赛比赛日期和时间 比赛开始时间&#xff1a;北京时间2024年2月2日&#xff08;周五&#xff…

【JavaScript】new原理解析

✨ 专栏介绍 在现代Web开发中&#xff0c;JavaScript已经成为了不可或缺的一部分。它不仅可以为网页增加交互性和动态性&#xff0c;还可以在后端开发中使用Node.js构建高效的服务器端应用程序。作为一种灵活且易学的脚本语言&#xff0c;JavaScript具有广泛的应用场景&#x…

netty源码:(40)ReplayingDecoder

ReplayingDecoder是ByteToMessageDecoder的子类&#xff0c;我们继承这个类时&#xff0c;也要实现decode方法&#xff0c;示例如下&#xff1a; package cn.edu.tju;import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handle…

Ubuntu20.04服务器使用教程(安装教程、常用命令、故障排查)持续更新中.....

安装教程&#xff08;系统、驱动、CUDA、CUDNN、Pytorch、Timeshift、ToDesk&#xff09; 制作U盘启动盘&#xff0c;并安装系统 在MSDN i tell you下载Ubuntu20.04 Desktop 版本&#xff0c;并使用Rufus制作UEFI启动盘&#xff0c;参考UEFI安装Ubuntu使用GPTUEFI模式安装&am…

【IoT网络层】STM32 + ESP8266 +MQTT + 阿里云物联网平台 |开源,附资料|

目标&#xff1a;实现STM32连接阿里云物联网平台发送数据同时接收数据&#xff0c;IOT studio界面显示数据。具体来说&#xff1a;使用ESP8266 ESP-01来连接网络&#xff0c;获取设备数据发送到阿里云物联网平台并显示且oled显示屏当前的设备数据&#xff0c;通过IOT studio界面…