Rpc导读

手写Rpc框架 - 导读

git仓库-all-rpc

GTIEE:https://gitee.com/quercus-sp204/all-rpc 【参考源码 yrpc】

1. Rpc概念

RPC 即远程过程调用(Remote Procedure Call) ,就是通过网络从远程计算机程序上请求服务。

  • 本地调用抽象:允许程序像调用本地函数一样调用远程计算机上的函数。开发者无需编写复杂的网络通信代码来处理诸如建立连接、发送请求、等待响应等细节,只需关注业务逻辑的实现。例如,在一个分布式系统中,A 服务器上的程序需要调用 B 服务器上的某个函数来获取数据,使用 RPC 就可以像调用本地函数一样简单。
  • 通信协议与序列化:为了实现这种跨网络的函数调用,RPC 框架通常会使用特定的通信协议(如 TCP/IP)来传输数据,并通过序列化和反序列化技术,将调用的参数和返回值转换为适合在网络上传输的格式。比如,将参数对象转换为字节流进行传输,在接收端再将字节流还原为对象。

那么,它的应用场景有哪些呢?我们平时用到了吗?

  • 微服务架构:在微服务架构中,各个微服务之间通常需要相互通信来完成复杂的业务流程。例如,一个电商系统中,订单服务可能需要调用库存服务来检查商品库存,调用用户服务来验证用户信息等,RPC 可以高效地实现这些微服务间的通信。
  • 分布式系统:在大型分布式系统中,不同节点可能负责不同的功能模块。例如,在搜索引擎系统中,索引构建节点和查询服务节点可能分布在不同的服务器上,通过 RPC 可以实现节点之间的协同工作。

说白了,就是不同服务之间的网络通信嘛。那你可能会问了,假如我的系统有User、Order、Shipment三个服务【爪哇SpringBoot编写的嚯】,如果User想要访问Order上面的函数,我只需要将此函数以Http接口的形式暴露出去,然后User那边使用RestTemplate来访问不就好了吗?何必这么费劲还要用框架呢?

仔细想一下,确实有那么点儿卵道理,但是又仔细一想,

  • 你会发现,你使用RestTemplate的时候,需要手动处理请求的 URL 拼接、请求头设置、参数序列化和响应反序列化等操作,对了,还有异常也需要自己处理,需要自己处理调用失败的情况,如重试几次啊等等
  • HTTP 协议是一种文本协议,存在较多的头部信息,在数据传输时会带来额外的开销。而rpc框架的协议通常相比http协议,是很轻量的。
  • 如果Order部署在了多台机器上面,代码里面肯定是存了这些机器的地址,如果扩容或者缩容,都要修改代码,同时还需要我们手动选择调用哪一台机器上面的Order【需要手动实现负载均衡】

那么,rpc都可以完成这些,并且服务的地址信息那些啊,可以在rpc的注册中心拉取到,动态感知。

综上所述,一个基本的Rpc框架主要应该具有的能力是:

基础通信能力

  • 高效序列化、优化网络传输,采用高效的网络协议和传输方式

  • 稳定性与可靠性

    • 连接管理:具备完善的连接池管理机制,对连接进行复用,减少频繁建立和销毁连接的开销,同时保证连接的稳定性。例如,在高并发场景下,能自动处理连接的超时、重连等问题

服务发现与治理能力

  • 服务注册与发现: 1. 服务提供者能够在启动时自动将自己的服务信息(如服务名称、地址、端口等)注册到服务注册中心,方便服务消费者发现和调用。2.实时感知:调用方能够及时感知调用的服务信息并更新。
  • 负载均衡: 如随机、轮询、最少活跃调用数、一致性哈希
  • 服务容错
    • 熔断机制:当服务提供者出现故障或响应时间过长时,能够自动熔断对该服务的调用,避免大量请求积压,影响整个系统的稳定性。
    • 降级策略:在系统资源紧张或服务出现故障时,能够自动降级服务,提供默认的响应结果或采取其他临时措施,保证系统的基本可用性。

易用性与可扩展性

  • 简单的编程模型:让开发者能够像调用本地函数一样调用远程服务,无需关注底层的网络通信细节,降低开发难度,提高开发效率
  • 插件化与扩展性
    • SPI 机制:具备良好的插件化架构,通过服务提供者接口(SPI)机制,允许开发者根据实际需求扩展框架的功能,如自定义序列化方式、负载均衡策略、过滤器等。

在这里插入图片描述

现在就按照上面的能力,来一个一个实现,最终将其组合成一个框架。

2. 角色

一个Rpc框架的大致角色分布:

在这里插入图片描述

服务提供者将自己的数据信息【例如,端口、ip、接口等信息提供给注册中心】(服务注册)

消费者从注册中心拉取到可用的服务信息(服务发现),然后选择一个合适的服务(负载均衡),发送网络请求【请求里面封装了需要调用的接口,参数等等】,

服务提供者接收到请求之后,本地调用方法,然后通过网络把响应结果过传输给消费者

最后消费者解析响应结果。

注册中心: (本文就选择zookeeper为注册中心)

3. 注册中心的接入

zookeeper的安装与启动,就不在这里赘述了。

思考注册中心的主要能力:【服务注册,服务发现】

定义接口

Registry接口

/** 注册中心的能力: 注册服务, 拉取服务列表*/
public interface Registry {/*** 注册服务* @param serviceConfig 服务的配置内容*/void register(ServiceConfig<?> serviceConfig);/*** 从注册中心拉取服务列表* @param serviceName 服务的名称* @return 服务的地址*/List<InetSocketAddress> lookup(String serviceName, String group);
}

ServiceConfig 封装服务信息的class

/*服务信息*/
public class ServiceConfig<T> {// 接口的类型/*比如UserServiceImpl实现了UserService, 真实对象就是实现类,interfaceProvider就是UserService.class*/private Class<?> interfaceProvider;private Object ref; // 真实对象private String group = "default"; // 分组// get set.....
}

Zookeeper注册中心的实现类 ZookeeperRegistry

服务注册在zookeeper上面的节点如图所示

trpc根节点
==▶消费者节点
==▼生产者节点
====▼接口的全限定名 
======▼分组
========▼地址信息1...

在这里插入图片描述

@Slf4j
public class ZookeeperRegistry extends AbstractRegistry {// 维护一个zk实例private ZooKeeper zooKeeper;public ZookeeperRegistry() {this.zooKeeper = ZookeeperUtil.createZookeeper();}public ZookeeperRegistry(String connectString,int timeout) {this.zooKeeper = ZookeeperUtil.createZookeeper(connectString,timeout);}@Overridepublic void register(ServiceConfig<?> service) {// 服务名称的节点 ----  "/tprc-metadata/providers/接口全限定名"String parentNode = Constant.BASE_PROVIDERS_PATH +"/"+service.getInterface().getName();// 建立服务节点这个节点应该是一个持久节点if(!ZookeeperUtil.exists(zooKeeper,parentNode,null)){ZookeeperNode zookeeperNode = new ZookeeperNode(parentNode,null);ZookeeperUtil.createNode(zooKeeper, zookeeperNode, null, CreateMode.PERSISTENT);}// 建立分组节点parentNode = parentNode + "/" + service.getGroup();if(!ZookeeperUtil.exists(zooKeeper,parentNode,null)){ZookeeperNode zookeeperNode = new ZookeeperNode(parentNode,null);ZookeeperUtil.createNode(zooKeeper, zookeeperNode, null, CreateMode.PERSISTENT);}// 创建本机的临时节点, ip:port ,// 服务提供方的端口一般自己设定,我们还需要一个获取ip的方法// ip我们通常是需要一个局域网ip,不是127.0.0.1,也不是ipv6// 192.168.12.121String node = parentNode + "/" + NetUtils.getIp() + ":" + TrpcBootstrap.getInstance().getConfiguration().getPort();if(!ZookeeperUtil.exists(zooKeeper,node,null)){ZookeeperNode zookeeperNode = new ZookeeperNode(node,null);ZookeeperUtil.createNode(zooKeeper, zookeeperNode, null, CreateMode.EPHEMERAL);}log.info("服务{},注册ok",service.getInterface().getName());}/*** 拉取合适的服务列表* @param serviceName 服务名称* @return 服务列表*/@Overridepublic List<InetSocketAddress> lookup(String serviceName,String group) {// 1、找到服务对应的节点String serviceNode = Constant.BASE_PROVIDERS_PATH + "/" + serviceName + "/" +group;List<String> children = ZookeeperUtil.getChildren(zooKeeper, serviceNode, null);// 获取了所有的可用的服务列表List<InetSocketAddress> inetSocketAddresses = children.stream().map(ipString -> {String[] ipAndPort = ipString.split(":");String ip = ipAndPort[0];int port = Integer.parseInt(ipAndPort[1]);return new InetSocketAddress(ip, port);}).toList();if(inetSocketAddresses.isEmpty()){throw new DiscoveryException("未发现任何可用的服务主机.");}return inetSocketAddresses;}
}

上面的ZookeeperUtil是自定义的操作Zookeeper的工具类。-- 详情见源码里面的注释,值得说明一下,Zookeeper要先有父结点才能创建子节点,不能把路径直接写全了直接创建,故在源码里面会用createRoot方法初始化所有的父结点。

public static ZooKeeper createZookeeper(String connectPath, int timeout) {CountDownLatch countDownLatch = new CountDownLatch(1);try {.................// 连接成功就创建根节点,检查是否存在rpc根节点  /trpc-metadata/providers  &&  /trpc-metadata/consumerscreateRoot(zooKeeper);...............} catch (IOException | InterruptedException e) {log.info("创建zookeeper实例时发生异常:",e);throw new ZookeeperException("创建zookeeper实例时发生异常");}
}

至此,注册中心的两个基本方法就可以告一段落了。

4.Trpc框架启动器

既然是一个框架,那么,我们必然有一个入口来启动这一套流程。

①服务提供方

- 基本功能信息appName、registry

形如: all-tRpc-demo / demo-simple-provider / …/ProviderApplication.java 这样来启动我们的提供方。

public class ProviderApplication {public static void main(String[] args) {TrpcBootstrap.getInstance() .appName("user-provider")// 配置注册中心.registry(new RegistryConfig("zookeeper://127.0.0.1:2181"))// 扫描包下的类,然后批量发布接口.scan("com.feng.demo")// 启动服务.start();}
}

在TrpcBootstrap.java里面

@Slf4j
public class TrpcBootstrap {// 单例,每个应用程序只有一个private static final TrpcBootstrap trpcBootstrap = new TrpcBootstrap();// 配置private final RpcConfiguration configuration;// 获取实例public static TrpcBootstrap getInstance() {return trpcBootstrap;}// 设置应用名称 *****public TrpcBootstrap appName( String appName ) {configuration.setAppName(appName);return this;}// 配置注册中心 *****public TrpcBootstrap registry( RegistryConfig registryConfig ) {// 传递过来的参数registryConfig,还没有创建连接,在这里创建与注册中心的连接registryConfig.createRegistryConnection(); // 创建注册中心的连接configuration.setRegistryConfig(registryConfig); // 设置服务注册中心return this;}
}

RpcConfiguration是封装的配置信息

// 全局的配置类,代码配置 --> xml配置 --> 默认项
@Data
public class RpcConfiguration {// 配置信息-->端口号private int port = 8094;// 配置信息-->应用程序的名字private String appName = "default";// 分组信息private String group = "default";// 配置信息-->注册中心private RegistryConfig registryConfig = new RegistryConfig("zookeeper://127.0.0.1:2181"); // 默认的// 配置信息-->序列化协议private String serializeType = "jdk";// 配置信息-->压缩使用的协议private String compressType = "gzip";// 配置信息@Getterpublic IdGenerator idGenerator = new IdGenerator(1, 2);// 配置信息-->负载均衡策略private LoadBalancer loadBalancer = new RoundRobinLoadBalancer();// 为每一个ip配置一个限流器private final Map<SocketAddress, RateLimiter> everyIpRateLimiter = new ConcurrentHashMap<>(16);// 为每一个ip配置一个断路器,熔断private final Map<SocketAddress, CircuitBreaker> everyIpCircuitBreaker = new ConcurrentHashMap<>(16);// 读xml,dom4jpublic RpcConfiguration() {............}
}// 里面又持有注册中心的类
@Slf4j
public class RegistryConfig {// 定义连接的 url zookeeper://127.0.0.1:2181  redis://192.168.12.125:3306private final String connectString;// 持有一个 Registryprivate Registry registry;public RegistryConfig(String connectString) {this.connectString = connectString;}public Registry getRegistry() {if ( registry == null ) createRegistryConnection();return registry;}/*** 可以使用简单工厂来完成* @return 具体的注册中心实例*/public void createRegistryConnection() {if ( connectString == null ) throw new DiscoveryException("未配置注册中心!");// 1、获取注册中心的类型String registryType = getRegistryType(connectString,true).toLowerCase().trim();log.info("【创建与注册中心的连接~~~】 注册中心的类型: {}", registryType);// 2、通过类型获取具体注册中心if( registryType.equals("zookeeper") ){String host = getRegistryType(connectString, false);this.registry = new ZookeeperRegistry(host, Constant.ZK_TIME_OUT);} else if (registryType.equals("nacos")){String host = getRegistryType(connectString, false);this.registry = new NacosRegistry(host, Constant.ZK_TIME_OUT);} else {throw new DiscoveryException("未发现合适的注册中心。");}}private String getRegistryType(String connectString,boolean ifType){String[] typeAndHost = connectString.split("://");if(typeAndHost.length != 2){throw new RuntimeException("给定的注册中心连接url不合法");}if(ifType){return typeAndHost[0];} else {return typeAndHost[1];}}
}
- 扫描接口并发布scan
// 扫描项目指定包下面的接口,并且将他们发布到注册中心
public TrpcBootstrap scan(String packageName) {// 1. 获取指定包 path 下的所有类的全限定名List<String> classNames = getAllClassName(packageName);// 2.1 拿到所有标注了TrpcApi注解的类List<? extends Class<?>> classes = getTrpcClassesByList(classNames);//  2.2遍历这些构建实例for (Class<?> clazz : classes) {Class<?>[] interfaces = clazz.getInterfaces(); // 获取到他的接口Object instance;try {instance = clazz.getConstructor().newInstance(); // 通过无参构造器创建一个实例} catch (InstantiationException | IllegalAccessException | InvocationTargetException | NoSuchMethodException e) {throw new RuntimeException(e);}// 获取注解的groupTrpcApi annotation = clazz.getAnnotation(TrpcApi.class);String group = annotation.group();// 3.将这些接口发布for (Class<?> anInterface : interfaces) {ServiceConfig<?> serviceConfig = new ServiceConfig<>();serviceConfig.setInterface(anInterface);serviceConfig.setRef(instance);serviceConfig.setGroup(group);if (log.isDebugEnabled()){log.debug("---->已经通过包扫描,将服务【{}】发布.",anInterface);}// 3、发布publish(serviceConfig);}}return this;
}//
private TrpcBootstrap publish( ServiceConfig<?> service ) {configuration.getRegistryConfig().getRegistry().register(service);// 维护一个映射关系SERVERS_LIST.put(service.getInterface().getName(), service);return this;
}

具体可以看源码里面的实现

- 启动netty
public void start() {// 1、创建eventLoop,老板只负责处理请求,之后会将请求分发至workerEventLoopGroup boss = new NioEventLoopGroup(2);EventLoopGroup worker = new NioEventLoopGroup(10);try {ServerBootstrap bootstrap = new ServerBootstrap();// 3.配置服务bootstrap = bootstrap.group(boss, worker).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {// 是核心,我们需要添加很多入站和出站的handlersocketChannel.pipeline().addLast(new LoggingHandler()) // 打印日志.addLast(new TrpcRequestDecoder())  // 请求过来,需要解码// 根据请求进行方法调用.addLast(new MethodCallHandler()).addLast(new TrpcResponseEncoder()) // 响应回去,需要编码;}});// 4.绑定端口ChannelFuture channelFuture = bootstrap.bind(configuration.getPort()).sync();// 5.关闭channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {throw new RuntimeException(e);} finally {boss.shutdownGracefully();worker.shutdownGracefully();}
}

② 服务消费方

形如 all-tRpc-demo / demo-simple-consumer /…/ConsumerApplication.java

public class ConsumerApplication {public static void main(String[] args) {ReferenceConfig<UserService> reference = new ReferenceConfig<>();reference.setReference(UserService.class);// 1、连接注册中心// 2、拉取服务列表// 3、选择一个服务并建立连接// 4、发送请求,携带一些信息(接口名,参数列表,方法的名字),获得结果TrpcBootstrap.getInstance().appName("user-consumer").registry(new RegistryConfig("zookeeper://127.0.0.1:2181")).reference(reference);UserService userService = reference.get();System.out.println("=======================================");for (int i = 0; i < 10; i++) {System.out.println("【rpc调用开始=============】");// 开始时间long start = System.currentTimeMillis();List<User> users = userService.getUserByName("田小锋q" + i);for (User user : users) {System.out.println(user);}// 结束时间long end = System.currentTimeMillis();System.out.println("rpc执行耗时:" + (end - start));System.out.println("【rpc调用=============结束-----】");}}
}@Slf4j
public class ReferenceConfig<T> {// 接口类型private Class<T> interfaceRef;// 注册中心private Registry registry;// 分组信息private String group;/*** 代理设计模式* @return 代理对象*/public T get() {// 此处一定是使用动态代理完成了一些工作ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); // 类加载器Class<T>[] classes = new Class[]{interfaceRef}; // 接口类型InvocationHandler handler = new ProxyConsumerInvocationHandler(registry,interfaceRef,group);// 使用动态代理生成代理对象Object helloProxy = Proxy.newProxyInstance(classLoader, classes, handler);return (T)helloProxy;}
}

主要是jdk动态代理 在invoke里面实现我们的远程调用

5. 序列化&&压缩

在 RPC(远程过程调用)框架中,序列化和压缩是两个重要的概念,它们在数据传输过程中起着关键作用

序列化

上图里面可以看出来,序列化是将对象转换为字节流的过程,以便在网络上传输或存储到文件中。在 RPC 中,客户端调用远程服务时,需要将调用的参数对象序列化为字节流,通过网络发送到服务端;服务端接收到字节流后,再将其反序列化为对象进行处理。处理完后,又将结果对象序列化为字节流返回给客户端,客户端再反序列化得到结果。

常见的序列化方式

那么我们就定义一下序列化的接口

public interface Serializer {/*** 抽象的用来做序列化的方法*/byte[] serialize(Object object);/*** 反序列化的方法*/<T> T deserialize(byte[] bytes, Class<T> clazz);
}
1. JDK序列化
@Slf4j // lombok里面的日志注解
public class JdkSerializer implements Serializer {@Overridepublic byte[] serialize(Object object) {if (object == null) return null;try (ByteArrayOutputStream baos = new ByteArrayOutputStream();ObjectOutputStream outputStream = new ObjectOutputStream(baos);) { // try - with写法outputStream.writeObject(object);byte[] result = baos.toByteArray();if(log.isInfoEnabled()){log.info("对象【{}】已经完成了序列化操作,序列化后的字节数为【{}】",object,result.length);}return result;} catch (IOException e) {log.error("序列化对象【{}】时放生异常.",object);throw new SerializeException(e);}}@Overridepublic <T> T deserialize(byte[] bytes, Class<T> clazz) {if(bytes == null || clazz == null) return null;try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes);ObjectInputStream objectInputStream = new ObjectInputStream(bais);) {Object object = objectInputStream.readObject();if(log.isInfoEnabled()){log.info("类【{}】已经完成了反序列化操作.",clazz);}return (T)object;} catch (IOException | ClassNotFoundException e) {log.error("反序列化对象【{}】时放生异常.",clazz);throw new SerializeException(e);}}
}
2.JSON序列化

这里就用fastjson了

@Slf4j
public class JsonSerializer implements Serializer {@Overridepublic byte[] serialize(Object object) {if (object == null) return null;byte[] result = JSON.toJSONBytes(object);if (log.isInfoEnabled()) {log.info("对象【{}】已经完成了序列化操作,序列化后的字节数为【{}】", object, result.length);}return result;}@Overridepublic <T> T deserialize(byte[] bytes, Class<T> clazz) {if (bytes == null || clazz == null) return null;T t = JSON.parseObject(bytes, clazz);if (log.isInfoEnabled()) {log.info("类【{}】已经完成了反序列化操作.", clazz);}return t;}
}
3.Hessian序列化

Hessian是一个轻量级的、基于HTTP的RPC(远程过程调用)框架,由Resin开源提供。它使用一个简单的、基于二进制的协议来序列化对象,并通过HTTP进行传输。Hessian的设计目标是提供一种高效、可靠且易于使用的远程服务调用机制。

maven依赖

<dependency><groupId>com.caucho</groupId><artifactId>hessian</artifactId><version>版本号</version>  <!-- 4.0.66 -->
</dependency>
@Slf4j
public class HessianSerializer implements Serializer {@Overridepublic byte[] serialize(Object object) {if (object == null) return null;try (ByteArrayOutputStream baos = new ByteArrayOutputStream();) {Hessian2Output hessian2Output = new Hessian2Output(baos);hessian2Output.writeObject(object);hessian2Output.flush();byte[] result = baos.toByteArray();if(log.isInfoEnabled())log.info("对象【{}】已经完成了序列化操作,序列化后的字节数为【{}】",object,result.length);return result;} catch (IOException e) {log.error("使用hessian进行序列化对象【{}】时放生异常.",object);throw new SerializeException(e);}}@Overridepublic <T> T deserialize(byte[] bytes, Class<T> clazz) {if(bytes == null || clazz == null) return null;try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes);) {Hessian2Input hessian2Input = new Hessian2Input(bais);T t = (T) hessian2Input.readObject();if(log.isInfoEnabled()log.info("类【{}】已经使用hessian完成了反序列化操作.",clazz);return t;} catch (IOException  e) {log.error("使用hessian进行反序列化对象【{}】时发生异常.",clazz);throw new SerializeException(e);}}
}
4.序列化工厂

转念一想嚯,我们这是实现一个框架额,肯定是要有对外扩展的能力的,那么,我们就将序列化的所有方式默认加载到内存中去,通过一个工厂类来获取指定的序列化方法就可以了,然后暴露添加其他序列化方法的接口。

/*** @version 1.0* @Author txf* @Date 2025/2/10 15:17* @注释 序列化工厂*/
@Slf4j
public class SerializerFactory {private final static ConcurrentHashMap<String, ObjectWrapper<Serializer>> SERIALIZER_CACHE = new ConcurrentHashMap<>(8);private final static ConcurrentHashMap<Byte, ObjectWrapper<Serializer>> SERIALIZER_CACHE_CODE = new ConcurrentHashMap<>(8);static {ObjectWrapper<Serializer> jdk = new  ObjectWrapper<>((byte) 1, "jdk", new JdkSerializer());ObjectWrapper<Serializer> json = new  ObjectWrapper<>((byte) 2, "json", new JsonSerializer());ObjectWrapper<Serializer> hessian = new  ObjectWrapper<>((byte) 3, "hessian", new HessianSerializer());SERIALIZER_CACHE.put("jdk",jdk);SERIALIZER_CACHE.put("json",json);SERIALIZER_CACHE.put("hessian",hessian);SERIALIZER_CACHE_CODE.put((byte) 1, jdk);SERIALIZER_CACHE_CODE.put((byte) 2, json);SERIALIZER_CACHE_CODE.put((byte) 3, hessian);}/*** 使用工厂方法获取一个SerializerWrapper* @param serializeType 序列化的类型* @return SerializerWrapper*/public static  ObjectWrapper<Serializer> getSerializer(String serializeType) {ObjectWrapper<Serializer> serializerWrapper = SERIALIZER_CACHE.get(serializeType);if(serializerWrapper == null){log.error("未找到您配置的【{}】序列化工具,默认选用jdk的序列化方式。",serializeType);return SERIALIZER_CACHE.get("jdk");}return SERIALIZER_CACHE.get(serializeType);}public static  ObjectWrapper<Serializer> getSerializer(Byte serializeCode) {ObjectWrapper<Serializer> serializerWrapper = SERIALIZER_CACHE_CODE.get(serializeCode);if(serializerWrapper == null){log.error("未找到您配置的【{}】序列化工具,默认选用jdk的序列化方式。",serializeCode);return SERIALIZER_CACHE.get("jdk");}return SERIALIZER_CACHE_CODE.get(serializeCode);}/*** 新增一个新的序列化器* @param serializerObjectWrapper 序列化器的包装*/public static void addSerializer(ObjectWrapper<Serializer> serializerObjectWrapper){SERIALIZER_CACHE.put(serializerObjectWrapper.getName(),serializerObjectWrapper);SERIALIZER_CACHE_CODE.put(serializerObjectWrapper.getCode(),serializerObjectWrapper);}
}

压缩

压缩是指通过特定的算法对数据进行处理,减少数据的存储空间或传输带宽。在 RPC 中,对序列化后的字节流进行压缩可以进一步减少数据传输量,提高传输效率。说白了,就是传的少了。RPC嘛,将性能追求到极致!!!!

设计方式同序列化一样的。

public interface Compressor {// 序列化后的字节数组压缩byte[] compress(byte[] bytes);// 解压缩byte[] decompress(byte[] bytes);
}

Gzip压缩

@Slf4j
public class GzipCompressor implements Compressor {@Overridepublic byte[] compress(byte[] bytes) {try (ByteArrayOutputStream baos = new ByteArrayOutputStream();GZIPOutputStream gzipOutputStream = new GZIPOutputStream(baos);) {gzipOutputStream.write(bytes);gzipOutputStream.finish();byte[] result = baos.toByteArray();if(log.isInfoEnabled())log.info("对字节数组进行了压缩长度由【{}】压缩至【{}】.",bytes.length,result.length);return result;} catch (IOException e){log.error("对字节数组进行压缩时发生异常",e);throw new CompressException(e);}}@Overridepublic byte[] decompress(byte[] bytes) {try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes);GZIPInputStream gzipInputStream = new GZIPInputStream(bais);) {byte[] result = gzipInputStream.readAllBytes();if(log.isInfoEnabled())log.info("对字节数组进行了解压缩长度由【{}】变为【{}】.",bytes.length,result.length);return result;} catch (IOException e){log.error("对字节数组进行压缩时发生异常",e);throw new CompressException(e);}}
}

压缩工厂就不在这里写了。

导读部分结束了。。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/71565.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

网络安全:防范NetBIOS漏洞的攻击

稍微懂点电脑知识的朋友都知道&#xff0c;NetBIOS 是计算机局域网领域流行的一种传输方式&#xff0c;但你是否还知道&#xff0c;对于连接互联网的机器来讲&#xff0c;NetBIOS是一大隐患。 漏洞描述 NetBIOS(Network Basic Input Output System&#xff0c;网络基本输入输…

VIE(可变利益实体)架构通俗解析 —— 以阿里巴巴为例(中英双语)

VIE&#xff08;可变利益实体&#xff09;架构通俗解析 —— 以阿里巴巴为例 什么是 VIE 架构&#xff1f; VIE&#xff08;Variable Interest Entity&#xff0c;可变利益实体&#xff09;是一种特殊的法律结构&#xff0c;主要用于中国企业在海外上市&#xff0c;特别是受中…

使用代码与 AnythingLLM 交互的基本方法和示例

AnythingLLM 是一个基于大语言模型&#xff08;LLM&#xff09;的工具&#xff0c;主要用于构建和管理个人或企业知识库。虽然它主要提供图形化界面&#xff08;GUI&#xff09;进行操作&#xff0c;但也可以通过代码进行一些高级配置和集成。以下是使用代码与 AnythingLLM 交互…

用DeepSeek零基础预测《哪吒之魔童闹海》票房——从数据爬取到模型实战

系列文章目录 1.元件基础 2.电路设计 3.PCB设计 4.元件焊接 5.板子调试 6.程序设计 7.算法学习 8.编写exe 9.检测标准 10.项目举例 11.职业规划 文章目录 **一、为什么要预测票房&#xff1f;****二、准备工作****三、实战步骤详解****Step 1&#xff1a;数据爬取与清洗&am…

如何将MySQL数据库迁移至阿里云

将 MySQL 数据库迁移至阿里云可以通过几种不同的方法&#xff0c;具体选择哪种方式取决于你的数据库大小、数据复杂性以及对迁移速度的需求。阿里云提供了多种迁移工具和服务&#xff0c;本文将为你介绍几种常见的方法。 方法一&#xff1a;使用 阿里云数据库迁移服务 (DTS) 阿…

Ubuntu22.04 - gflags的安装和使用

目录 gflags 介绍gflags 安装gflags 使用 gflags 介绍 gflags 是Google 开发的一个开源库&#xff0c;用于 C应用程序中命令行参数的声明、定义和解析。gflags 库提供了一种简单的方式来添加、解析和文档化命令行标志(flags),使得程序可以根据不同的运行时配置进行调整。 它具…

Git LFS介绍(Large File Storage)大文件扩展,将大文件存储在外部存储,仓库中只记录文件的元数据(大文件的指针,类似一个小的占位符文件)

文章目录 LFS的功能&#xff1f;如何使用LFS&#xff1f;将大文件存储在外部系统是什么意思&#xff1f;具体是如何运作的&#xff1f;为什么要这样做&#xff1f; 对开发者的影响&#xff1f;1. **性能和效率**2. **协作体验**3. **版本管理差异**4. **额外的工具和配置** LFS…

Fastgpt学习(5)- FastGPT 私有化部署问题解决

1.☺ 问题描述&#xff1a; Windows系统&#xff0c;本地私有化部署&#xff0c;postgresql数据库镜像日志持续报错" data directory “/var/lib/postgresql/data” has invalid permissions "&#xff0c;“ DETAIL: Permissions should be urwx (0700) or urwx,gr…

2026考研趋势深度解析:政策变化+高效工具指南

2026考研深度解析&#xff1a;趋势洞察高效工具指南&#xff0c;助你科学备战上岸 从政策变化到工具实战&#xff0c;这份千字攻略解决99%考生的核心焦虑 【热点引入&#xff1a;考研赛道进入“高难度模式”】 2025年全国硕士研究生报名人数突破520万&#xff0c;报录比预计扩…

娱乐使用,可以生成转账、图片、聊天等对话内容

软件介绍 今天要给大家介绍一款由吾爱大佬 lifeixue 开发的趣味软件。它的玩法超丰富&#xff0c;能够生成各式各样的角色&#xff0c;支持文字聊天、发红包、转账、发语音以及分享图片等多种互动形式&#xff0c;不过在分享前得着重提醒&#xff0c;此软件仅供娱乐&#xff0…

DeepSeek动画视频全攻略:从架构到本地部署

DeepSeek 本身并不直接生成动画视频,而是通过与一系列先进的 AI 工具和传统软件协作,完成动画视频的制作任务。这一独特的架构模式,使得 DeepSeek 在动画视频创作领域发挥着不可或缺的辅助作用。其核心流程主要包括脚本生成、画面设计、视频合成与后期处理这几个关键环节。 …

C++类与对象深度解析(一):从引用、内联函数到构造析构的编程实践

目录 一.引用 引用的特征&#xff1a;1.引用必须初始化 2.本质是别名 3.函数参数传递 4.常引用 5.函数返回值 6.权限 放大 缩小 平移 引用 vs 指针 二.内联函数 关键点说明 三.宏函数 四.类 什么是类&#xff1f; 简单的类 五.构造函数与析构函数 1. 构造函数&…

vsan数据恢复—vsan缓存盘故障导致虚拟磁盘文件丢失的数据恢复案例

vsan数据恢复环境&故障&#xff1a; VMware vsan架构采用21模式。每台设备只有一个磁盘组&#xff08;71&#xff09;&#xff0c;缓存盘的大小为240GB&#xff0c;容量盘的大小为1.2TB。 由于其中一台主机&#xff08;0号组设备&#xff09;的缓存盘出现故障&#xff0c;导…

开源在线考试系统开源在线考试系统:支持数学公式的前后端分离解决方案

开源在线考试系统&#xff1a;支持数学公式的前后端分离解决方案 项目介绍项目概述&#xff1a;技术栈&#xff1a;版本要求主要功能&#xff1a;特色亮点 项目仓库地址演示地址GiteeGitHub 系统效果展示教师端系统部分功能截图学生端系统部分功能截图 结语 项目介绍 项目概述…

redis解决高并发看门狗策略

当一个业务执行时间超过自己设定的锁释放时间&#xff0c;那么会导致有其他线程进入&#xff0c;从而抢到同一个票,所有需要使用看门狗策略&#xff0c;其实就是开一个守护线程&#xff0c;让守护线程去监控key&#xff0c;如果到时间了还未结束&#xff0c;就会将这个key重新s…

新数据结构(12)——代理

什么是代理 在进行操作时有时不希望用户直接接触到目标&#xff0c;这时需要使用代理让用户间接接触到目标 给目标对象提供一个代理对象&#xff0c;并且由代理对象控制着对目标对象的引用 图解&#xff1a; 代理的目的 控制访问&#xff1a;通过代理对象的方式间接的访问目…

Unity Shader Graph 2D - Procedural程序化图形之夹心圆环

前言 本文将使用Unity Shader Graph的节点来绘制一个夹心圆环,分成三部分外环、内环和中心环。通过制作一个夹心圆环能够更好地理解和实践Shader Graph中的基础节点以及思维。 创建一个Ring的Shader Graph文件,再创建一个对应的材质球M_Ring以及一个Texture2D的MainT…

缓存三大问题及其解决方案

缓存三大问题及其解决方案 1. 前言 ​ 在现代系统架构中&#xff0c;缓存与数据库的结合使用是一种经典的设计模式。为了确保缓存中的数据与数据库中的数据保持一致&#xff0c;通常会给缓存数据设置一个过期时间。当系统接收到用户请求时&#xff0c;首先会访问缓存。如果缓…

【算法】----多重背包问题I,II(动态规划)

&#x1f339;作者:云小逸 &#x1f4dd;个人主页:云小逸的主页 &#x1f4dd;Github:云小逸的Github &#x1f91f;motto:要敢于一个人默默的面对自己&#xff0c;强大自己才是核心。不要等到什么都没有了&#xff0c;才下定决心去做。种一颗树&#xff0c;最好的时间是十年前…

LeetCode-524. 通过删除字母匹配到字典里最长单词

1、题目描述&#xff1a; 给你一个字符串 s 和一个字符串数组 dictionary &#xff0c;找出并返回 dictionary 中最长的字符串&#xff0c;该字符串可以通过删除 s 中的某些字符得到。 如果答案不止一个&#xff0c;返回长度最长且字母序最小的字符串。如果答案不存在&#x…