发布-订阅消息系统在任何企业架构中都发挥着重要作用,因为它可以实现可靠的集成,而无需紧密耦合应用程序。在解耦的系统之间共享数据的能力并不是一个容易解决的问题。
考虑一家拥有多个使用不同语言和平台独立构建的应用程序的企业。它需要响应地共享数据和流程。我们可以使用消息传递来实现这一点,以使用可定制的格式频繁、立即、可靠和异步地传输数据包。异步消息传递从根本上来说是对分布式系统问题的务实反应。发送消息不需要两个系统同时启动并准备就绪。
发布订阅通道
从简单的角度来看,对该模式的理解依赖于它对观察者模式的扩展,添加了用于通信事件通知的事件通道的概念。观察者模式描述了将观察者与其主题解耦的需求,以便主题可以轻松地向所有感兴趣的观察者提供事件通知,无论有多少观察者。
每个订阅者需要被通知一次特定事件,但不应该被重复通知同一事件。在通知所有订阅者之前,不能将事件视为已消耗。一旦所有订阅者都收到通知,该事件就可以被视为已消耗,并且应该从通道中消失 [2]。
代理、队列、主题和订阅
代理消息传递支持真正时间解耦系统的场景,其中消息生产者或消费者的可用性无法得到保证。对于代理消息传递,队列是保留由生产者创建的消息的代理,并且消费者可以在准备好时检索消息。
队列提供了最简单的消息传递选项。Queue 中的消息按先进先出(FIFO)组织,每条消息预计由单个消费者处理;然而,主题和订阅构成了发布/订阅模式,允许 N 个消费者处理同一消息。
发布订阅消息系统
可以将单个消息添加到主题,并且对于满足的每个订阅规则,消息的副本将添加到该订阅。在这种情况下,每个订阅都成为队列,消费者可以在队列中单独处理订阅上的消息。
Apache Kafka 是行业领导者正在使用的可靠且成熟的项目之一,它为我们提供了每秒处理大量消息的能力,而不是传统的消息系统,后者在传统场景中非常有用,但效率不高且价值不高在处理大数据场景时。
除了消息传递之外,Apache Kafka 还可以应用于流处理、网站活动跟踪、日志聚合、指标、基于时间的消息存储、提交日志和事件源。
卡夫卡和动物园管理员
Kafka 是一个分布式发布-订阅消息系统,其设计、分区和复制提交日志服务本质上是快速、可扩展和分布式的。它与传统消息系统的不同之处在于非常容易横向扩展,提供高吞吐量,支持多订阅者,在故障期间自动平衡消费者,并且能够允许实时应用程序或ETL将其用作批量消费磁盘上持久化消息的数量 [1]。
ZooKeeper用于管理和协调Kafka代理。每个 Kafka 代理都使用 ZooKeeper 与其他 Kafka 代理进行协调。ZooKeeper 服务会向生产者和消费者通知 Kafka 系统中新代理的存在或代理的故障。根据 Zookeeper 收到的有关代理存在或失败的通知,生产者和消费者做出决定并开始与其他代理协调他们的工作。此外,它还负责为分区选择新的领导者。
案例分析
在掌握了一些技术之后,让我们专注于练习。因此,我们的案例研究模拟了在发布-订阅上下文中使用 Spring Boot 微框架 v2.1.8.RELEASE 构建的两个微服务之间的通信,使用 Apache Kafka 2.3.1 作为消息系统。为了验证我们的研究,我们将设置和执行集成测试,重点是使用 JUnit 4/5 测试框架在端到端场景中集成应用程序的不同层。
发布订阅消息系统
Producer API 是一个实现业务实体服务操作的模块,以协调和统一与企业、机构和实体组相关的经济信息。Consumer API 是同一解决方案中的另一个模块,旨在集中所有业务实体统计数据,接收来自不同来源的数据输入。
为了简单起见,API 使用 H2 内存数据库。项目结构由三个模块组成。生产者和消费者这两个主要模块都依赖于 Common 模块,该模块与系统的其余部分共享错误处理和辅助类等内容。
让我们开始吧。
Spring Kafka 与 Apache Kafka 消息系统集成
Spring for Apache Kafka 项目将 Spring 的核心概念应用于基于 Kafka 的消息传递解决方案的开发。它提供了一个“模板”作为发送消息的高级抽象。它还通过 @KafkaListener注释 和“侦听器容器”为消息驱动的 POJO 提供支持。这些库提倡使用依赖注入和声明式 [3]。
生产者API
我们需要两个步骤来配置生产者。第一个是配置类,我们在其中定义生产者Map对象、生产者工厂和Kafka模板。当我们将消息构建器设置为在 Kafka 代理中发布时,第二个是尊重服务类。