更多大厂面试内容可见 -> http://11come.cn
大厂生产解决方案:泳道隔离机制
背景
在公司中,由于项目多、开发人员多,一般会有多套测试环境(可以理解为多个服务器),同一套服务会在多套测试环境中都部署方便不同开发人员对项目同时进行改动,服务中会涉及到很多消息发送、接收的地方,就拿 Kafka 消息来说,多套测试环境共用一套 kafka 集群,那么就 需要一套机制来隔离不同测试环境中的消息 ,避免不同测试环境之间的消息错乱消费
如下图,对于不同测试环境,生产服务 A 去发送同一个 topic1
下的消息,所有消费服务 B 都去订阅 topic1
的消息,到底哪个消费服务 B 来进行消费呢?
我们希望的 目标 是希望 test1
环境下的消费服务 B 去消费当前环境 test1
中的消息,而不会消费到其他环境中的消息
最终希望的效果如下图,每个测试环境就是一个泳道,各个测试环境之间的消息可以被隔离开来,并且有一套基线环境作为兜底消费,在测试环境中不需要去部署一套完整的上下游服务
初步方案
既然是因为多个测试环境中的消费服务 B 都去订阅了同一个 topic
下的消息,那么让不同环境下的消费服务 B 去订阅不同的 topic 消息不就可以了?
如下图:
- 在测试环境 test1 下的服务 A 和 B,自己实现一个组件,判断当前处于什么样的测试环境,让他去订阅对应环境
topic:test1
这个消息 - 在测试环境 test2 下的服务 A 和 B,让他去订阅
topic:test2
这个消息
这样不就实现了不同测试环境之间的消息隔离了吗?
确实实现了,但是还存在一些问题
如果在测试时,只有生产服务 A 的代码发生了变动,但是想要测试的话,需要在测试环境 test1 中同时部署 A 和 B,如果不部署 B 服务的话,会导致没有对应的消费者,生产服务 A 的消息就没有人来消费了
这样会同时带来两个缺点:
- 部署大量冗余服务,导致大量占用服务器内存
- 给测试带来极大麻烦,有时候服务上下游依赖很多,我们也不知道具体有哪些,如何去一个一个进行部署?
优化方案
那么显然初版方案虽然实现了消息隔离,但还存在一些问题,如何去解决上边的问题呢?
即我们希望部分没有代码变动的服务不需要在各个测试环境中都去部署一套,那么可以准备一套 基线环境 ,基线环境包含了一套完整的服务,而其他测试环境可以只部署自己发生变动的服务即可,没有代码变动的服务我们希望可以使用基线环境上存在的服务
如下图,有 3 套环境,分别为 test1
、test2
和 基线环境 default
:
- test1、test2 是不同的测试环境
- 基线环境上部署完整的上下游服务
优化后的方案,需要遵循两个规则:
- 同一个消息的所有测试环境使用相同的 topic
- 不同测试环境的消费服务使用不同的消费者组
简单解释一下,第一个说的是同一种消息,比如“发起退款”的消息,所有测试环境都使用相同的 topic,这一点和初步方案不同
第二个是不同环境的消费服务的消费者组标识 groupId 不同,命名方式 {服务名:环境标识}
先说一下,优化版本中如何实现消息隔离
当测试环境 test1
中的生产服务 A 去发送消息,消息 topic 为【发起退款】,在消息中我们会带上泳道标识,消息会给每个消费者组都进行发送,消息隔离通过消费者来实现:
- 当
test1
环境的消费服务 B 收到消息之后,判断消息中的泳道标识和当前环境是否相同,显然相同,进行消费 - 当
test2
环境的消费服务 B 收到消息之后,发现消息的泳道标识为test1
,但是当前环境的泳道标识为test2
,不同,则不消费 - 当
基线环境
的消费服务 B 收到消息之后,由于基线环境是作为兜底消费,所以会先判断当前消息对应环境是否存在消费者组,如果不存在的话,就作为兜底进行消费;如果当前消息存在对应的消费者组,则不消费,避免重复消费
这样既实现了消息隔离,又可以避免服务的冗余部署
当测试环境 test1 中的消费服务 B 没有部署的话,那么他的生产服务 A 去发送消息,这个消息就会被 基线环境
的消费服务 B 进行兜底消费
这里基线环境的消费服务 B 去判断发现 test1
环境中对应的消费者组并没有上线,那么我基线环境就对这个消息进行消费,避免这个消息没有对应的服务进行消费
通过增加基线环境的概念,就可以避免初步方案中的问题,避免大量冗余服务的部署,对于那些没有代码变动的服务,我们就不部署,通过基线环境中的服务进行消费
收益提升
经生产验证,机器内存为 300G,使用率由 78% 降低至 54%,收益明显
最终方案
在优化方案中,实现了消息隔离,但是当消息流转到基线环境的服务之后,如何再转回原来的泳道环境?
如下图,test1
环境的服务 A 发送消息,经过基线环境的服 B、服务 C 消费之后,也去发送 kafka 消息,此时 test1
环境部署了服务 D,需要让基线环境的消息可以再回到 test1
环境
实现思路:
在每个服务去处理消息时,拿到消息中的泳道标识,存储到线程上下文中,这样在当前线程的处理流程中都可以取到这个标识,之后在每个服务去发送消息的时候,从这个上下文中拿到泳道标识放入消息中即可
实现细节
通过接入组件的方式来实现泳道隔离,组件需要包含功能:
- 拦截 kafka 消息发送:在消息中添加泳道标识
- 拦截 kafka 消息接收:判断消息泳道标识与当前环境标识是否相同
基于 spring-kafka 实现泳道隔离组件
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>
泳道标识获取
当前测试环境的标识可以放在 SpringBoot 应用的 System.property 中,这样可以通过 System.getProperty("ROLE")
即可获取当前环境标识
消息接收拦截
kafka 提供了有发送和接收消息的拦截器
在接收消息时,取出消息中的泳道标识,和当前环境的泳道标识做对比,如果对比通过,则放行;如果不通过,则拦截不进行消费
基线环境的判断会更多,因为基线环境作为兜底消费,需要了解到收到的消息是否存在对应的消费者组进行消费,这点可以通过 AdminClient 来和 kafka 交互进行查询
消息发送拦截
在发送消息时,拦截到消息,将泳道标识放入消息中
kafka 提供了消息头(kafka header)来传递元数据,泳道的标识可以放入 kafka 消息头的位置
消费者组的设置
对于泳道隔离,我们要让上层应用无感知,也就是应用接入了泳道隔离组件,总不能让对应的开发人员自己去设置消费者组的 groupId 为 {服务名:泳道标识}
应用只需要设置 groupId 为服务名,在组件中我们对 groupId 进行处理,在后边拼接上泳道标识即可,这部分涉及到 ConcurrentKafkaListenerContainerFactory 的 Bean 构建以及拦截器的设置,需了解 kafka 源码
涉及 spring-kafka 相关类:
DefaultKafkaConsumerFactory #
ConcurrentKafkaListenerContainerFactory # 并发消费容器创建工厂类
KafkaProperties # Kafka 属性