微服务设计模式 - 发布订阅模式(Publisher Subscriber Pattern)

微服务设计模式 - 发布订阅模式(Publisher Subscriber Pattern)

Publisher-Subscriber-Pattern3.

定义

发布-订阅模式(Publisher-Subscriber Pattern)是一种常见的设计模式,被广泛用于云计算和分布式系统中,以实现松散耦合的组件间通信。发布-订阅模式本质上是一种消息传递模式,其中消息发布者(Publisher)不会将消息直接发送给特定的接收者(Subscriber),而是将消息发布到一个中介(如消息通道或事件总线),订阅者通过订阅中介来接收感兴趣的消息。通过这种方式,发布者和订阅者之间实现了松耦合。

结构

发布-订阅模式主要包括以下几个组件:

  1. 发布者:生产并发布消息的源头。
  2. 中介:用于发布和传递消息的消息通道或事件总线。
  3. 订阅者:接收并处理消息的终点。
+-----------+             +---------+             +---------+
|  发布者   |  ---发布--> |  中介   | ---广播-->  |  订阅者  |
+-----------+             +---------+             +---------+^                     ||                     |
+-----------+                   |        订阅         |
|  订阅者   | ------------------+
+-----------+

工作原理

Publisher Subscriber Pattern Sequence

发布-订阅模式的工作流程如下:

  1. 消息发布:发布者生成消息并将其发布到中介(如消息队列或事件总线)。
  2. 消息存储与管理:中介接收并存储消息,同时管理订阅者信息。
  3. 消息广播:中介将消息广播给所有订阅了该消息的订阅者。
  4. 消息接收与处理:订阅者接收消息并进行处理。

优势与应用场景

发布-订阅模式(Publisher-Subscriber Pattern)在分布式系统和微服务架构中被广泛应用,主要优势如下:

1. 松耦合

定义和实现独立:发布者和订阅者相互独立,发布者不需要知道订阅者的具体信息,反之亦然。各个服务之间互不依赖,使得系统模块可以独立开发、测试和部署。

  • 应用情景:一个电商平台上,订单服务和库存服务可以独立存在。订单创建后,库存减少操作不需要紧耦合在一起,可以通过消息队列来解耦。

2. 扩展性

服务扩展变得简单:可以方便地添加新的订阅者,而无需修改已有的发布者或者其他订阅者。新订阅者只需要订阅相应的消息即可。

  • 应用情景:在已有用户服务和通知服务的基础上,可以快速添加一个新的分析服务来订阅用户注册消息以进行数据分析。

3. 灵活性和适应性

动态消息路由:通过路由键和主题交换机,可以实现复杂的消息路由和选择,使得系统更具灵活性和适应性。

  • 应用情景:广告系统通过特定兴趣标签路由消息到不同的消费者,实现个性化推荐。

4. 提高性能和可靠性

异步处理提升性能:发布者可以立即发布消息而不必等待消息被处理,从而提高系统的响应速度。订阅者可以按自己的节奏处理消息,减少系统的耦合和脆弱性。

  • 应用情景:支付系统可以在用户付款后立即返回结果,后续的账单生成和通知可以异步处理。

RabbitMQ

RabbitMQ 作为一个高性能、高可靠性的消息队列系统,与发布-订阅模式非常契合。它提供了丰富的功能,使得在实际开发中实现发布-订阅模式变得相对简单和高效。

RabbitMQ 如何支持发布-订阅模式

  1. 交换机(Exchange):RabbitMQ 使用交换机来接收并路由消息。发布者将消息发送到交换机,交换机再根据路由键将消息转发到绑定的队列。不同类型的交换机(如 fanoutdirecttopicheaders)可以实现各种不同的路由策略。
  2. 队列(Queue):队列是存储消息的地方。消费者从队列中提取消息进行处理。通过将队列绑定到交换机,可以实现消息的广播和分发。
  3. 绑定(Binding):绑定定义了交换机如何将消息路由到队列。绑定一般会指定一个路由键或者模式,从而实现消息过滤和选择。

交换机类型

  • Direct Exchange:直接交换机将消息路由到绑定键匹配的队列上。

  • Fanout Exchange:扇出交换机将消息广播到所有绑定的队列上,不考虑路由键。

  • Topic Exchange:主题交换机通过路由键模式(如 user.*order.#)路由消息,允许更灵活的路由策略。

  • Headers Exchange:头部交换机通过消息的头部属性来路由消息。

示例代码

在微服务架构中,发布-订阅模式常用于事件驱动的通信。以下是一个利用RabbitMQ实现发布-订阅模式的示例。

+-----------+                  +---------------+                  +--------------+
|            |                 |               |                 |              |
|  发布者    | ---发布消息---> |     主题交换机 | --路由键--->  |     用户队列  | 
|            |                 |               | (user.*)        |              |
+-----------+                  +---------------+                  +--------------+|              ||  订阅者一    | <---处理消息--+--------------++-----------+                  +---------------+                  +--------------+
|            |                 |               |                 |              |
|  发布者    | ---发布消息---> |     主题交换机 | --路由键--->  |     订单队列  | 
|            |                 |               | (order.*)       |              |
+-----------+                  +---------------+                  +--------------+|              ||  订阅者二    | <---处理消息--+--------------+

项目说明

为了让消费者仅消费特定的事件,我们可以通过多种方式实现。常见的方法包括:

  1. 消息过滤:在发送消息时添加特定的标签或属性,消费者根据这些标签或属性进行过滤。
  2. 不同的队列或主题:将不同类型的事件发布到不同的消息队列或主题上,消费者订阅他们感兴趣的队列或主题。

在Spring Boot中,使用RabbitMQ时,我们可以利用路由键(Routing Key)和基于主题的交换机(Topic Exchange)实现这种功能,具体实现步骤:

  1. 配置RabbitMQ的交换机、队列和绑定关系。

  2. 发布者在发布消息时指定路由键。

  3. 消费者通过 @RabbitListener 注解订阅特定的路由键。

项目结构

event-driven-system/
│
├── src/main/java/com/example/event/
│   ├── EventApplication.java
│   ├── config/
│   │   └── RabbitConfig.java
│   ├── publisher/
│   │   └── MessagePublisher.java
│   ├── subscriber/
│   │   ├── UserSubscriber.java
│   │   └── OrderSubscriber.java
└── src/main/resources/├── application.yml

代码流程

  • 发布者调用 publishUserEvent 方法,向 eventExchange 发布用户创建消息,消息被根据路由键 user.create 发送到 userQueue 中,然后 User Subscriber 消费者从 userQueue 队列中接收消息并处理。
  • 发布者调用 publishOrderEvent 方法,向 eventExchange 发布订单创建消息,消息被根据路由键 order.create 发送到 orderQueue 中,然后 Order Subscriber 消费者从 orderQueue 队列中接收消息并处理。

Publisher Subscriber Pattern Flow

这个设计确保了不同类型的消费者(用户订阅者和订单订阅者)能够接收和处理他们各自关注的消息,从而实现了消费者只消费特定事件的需求。

相关源代码

EventApplication.java

主程序启动类。

package com.example.event;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class EventApplication {public static void main(String[] args) {SpringApplication.run(EventApplication.class, args);}
}
RabbitConfig.java

在配置类中配置基于主题的交换机、队列及其绑定关系。我们可以在配置类中配置 RabbitListenerContainerFactory,并在使用 @RabbitListener 注解时自动使用这个工厂。这能让我们自定义一些与监听器相关的配置,比如并发消费者的数量、消息的确认模式等。

package com.example.event.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.listener.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitConfig {@Beanpublic Queue userQueue() {return new Queue("userQueue");}@Beanpublic Queue orderQueue() {return new Queue("orderQueue");}@Beanpublic TopicExchange eventExchange() {return new TopicExchange("eventExchange");}@Beanpublic Binding userBinding(Queue userQueue, TopicExchange eventExchange) {return BindingBuilder.bind(userQueue).to(eventExchange).with("user.*");}@Beanpublic Binding orderBinding(Queue orderQueue, TopicExchange eventExchange) {return BindingBuilder.bind(orderQueue).to(eventExchange).with("order.*");}@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {return new RabbitTemplate(connectionFactory);}@Beanpublic RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);// 设置消费者的并发数量,等配置factory.setConcurrentConsumers(3);factory.setMaxConcurrentConsumers(10);return factory;}
}
MessagePublisher.java

发布者在发布消息时指定路由键,以便消费者能按需求过滤消息。

package com.example.event.publisher;import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;@Service
public class MessagePublisher {@Autowiredprivate RabbitTemplate rabbitTemplate;public void publishUserEvent(String message) {rabbitTemplate.convertAndSend("eventExchange", "user.create", message);System.out.printf("Published user event: %s%n", message);}public void publishOrderEvent(String message) {rabbitTemplate.convertAndSend("eventExchange", "order.create", message);System.out.printf("Published order event: %s%n", message);}
}
UserSubscriber.java

用户订阅者只会接收指定的“user”事件,通过 @RabbitListener 注解明确指定使用 rabbitListenerContainerFactory,从而确保我们自定义的容器配置能够被应用。

package com.example.event.subscriber;import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;@Service
public class UserSubscriber {@RabbitListener(queues = "userQueue", containerFactory = "rabbitListenerContainerFactory")public void handleUserMessage(String message) {System.out.printf("Received user event message: %s%n", message);}
}
OrderSubscriber.java

订单订阅者只会接收指定的“order”事件。

package com.example.event.subscriber;import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;@Service
public class OrderSubscriber {@RabbitListener(queues = "orderQueue", containerFactory = "rabbitListenerContainerFactory")public void handleOrderMessage(String message) {System.out.printf("Received order event message: %s%n", message);}
}
application.yml

rabbitmq配置。

spring:rabbitmq:host: localhostport: 5672username: guestpassword: guest

总结

Robert-C-Martin-Quote-All-race-conditions-deadlock-conditions-and

发布-订阅模式是实现松耦合系统的强大工具,在云计算和分布式系统中应用广泛。通过使用如RabbitMQ的消息中间件,我们可以很容易地在Spring Boot项目中实现这一模式。通过本文的示例,我们展示了如何利用 RabbitListenerContainerFactory 配置消费者行为,并通过Spring Boot注解 @RabbitListener 进行订阅,从而实现发布-订阅模式的精细化控制和一个完整的发布-订阅模式。这种模式不仅提高了系统的扩展性和灵活性,还大大简化了开发过程中的依赖管理。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/58942.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

0xGame 2024 [Week 4] Jenkins

1.前言 由于好久没做web题了&#xff0c;所以今天来尝试来做一波web题&#xff0c;仅供刷题记录。 2.题目 这个给的提示对于小白来说实在是友好的过劲。 3.分析 上网搜到一个关于Jenkins的历史漏洞&#xff0c;下面链接可供参考 https://blog.csdn.net/2301_80127209/arti…

yolov8训练及测试(ubuntu18.04、tensorrt、ros)

1 数据集制作 1.1标注数据 Linux/Ubuntu/Mac 至少需要 Python 2.6 &#xff08;推荐使用 Python 3 或更高版本 及 PyQt5&#xff09; Ubuntu Linux (Python 3 Qt5) git clone https://gitcode.com/gh_mirrors/la/labelImg.git sudo apt-get install pyqt5-dev-tools cd lab…

Nginx 反向代理(解决跨域)

文章目录 前言一、同源策略二、跨域是什么&#xff1f;三、Nginx解决跨域1.前端示例代码2.说明 四、nginx反向代理配置五、启动nginx六、最终效果总结 前言 Nginx反向代理解决跨域 一、同源策略 定义&#xff1a;同源策略&#xff08;Same-Origin Policy&#xff09;是指浏览…

ssm《数据库系统原理》课程平台的设计与实现+vue

系统包含&#xff1a;源码论文 所用技术&#xff1a;SpringBootVueSSMMybatisMysql 免费提供给大家参考或者学习&#xff0c;获取源码看文章最下面 需要定制看文章最下面 目 录 目 录 I 摘 要 III ABSTRACT IV 1 绪论 1 1.1 课题背景 1 1.2 研究现状 1 1.3 研究内容…

多渠道流量获取策略提升网站访问量的有效方法

内容概要 在当今数字时代&#xff0c;企业面临着越来越激烈的竞争&#xff0c;流量获取变得极为重要。多渠道流量获取不仅可以增加网站的访问量&#xff0c;还能够有效提升品牌的知名度和影响力。通过整合多种渠道&#xff0c;企业能够更好地触达目标受众&#xff0c;实现精准…

kafka实时返回浏览数据

在安装完kafka(Docker安装kafka_docker 部署kafka-CSDN博客)&#xff0c;查看容器是否启动&#xff1a; docker ps | grep -E kafka|zookeeper 再用python开启服务 from fastapi import FastAPI, Request from kafka import KafkaProducer import kafka import json import …

【MyBatis源码】BoundSql分析

基础 BoundSql是对SQL语句及参数信息的封装&#xff0c;它是SqlSource解析后的结果。Executor组件并不是直接通过StaticSqlSource对象完成数据库操作的&#xff0c;而是与BoundSql交互。BoundSql是对Executor组件执行SQL信息的封装&#xff0c;具体实现代码如下&#xff1a; …

A Consistent Dual-MRC Framework for Emotion-cause Pair Extraction——论文阅读笔记

前言 这是我第一次向同学院同年级的学生和老师们汇报的第一篇论文,于2022年发表在TOIS上,属于CCF A类,主要内容是将MRC应用到情感原因对抽取中。 论文链接:用于情绪-原因对提取的一致双 MRC 框架 |信息系统上的 ACM Transactions 这里我就不放上我自己翻译的中文版还有我…

【Linux系统】—— 基本指令(一)

【Linux系统】—— 基本指令&#xff08;一&#xff09; 1 「ls」指令1.1 初识 「ls」 指令1.2 「ls -l」1.3 认识文件1.4 「ls -l」显示的内容1.5 如何区分文件类型1.6 「ls -a」1.7 混合使用命令行选项1.8 「ls」查看指定目录下文件1.9 「ls」 指令常用命令行选项 2 「pwd」 …

js,ts控制流程

摘要&#xff1a; 在 JavaScript 和 TypeScript 中&#xff0c;控制流程是指程序执行的顺序和条件判断。以下是一些常见的控制流程结构&#xff0c;包括条件语句、循环语句和函数调用等。 1. 条件语句&#xff1a; if 语句 let condition true;if (condition) {console.log(C…

计组-Cache的基本概念,计算Cache+主存的平均周期

由于寄存器是集成在CPU中且容量极小&#xff0c;所以我们用Cache来提高速度&#xff0c;在无寄存器时其当做访问速度最快的 Cache的命中率: 是指当CPU要处理某个数据时&#xff0c;首先会考虑在Cache里面去读取&#xff0c;当需要读取的数据在Cache里面时&#xff0c;此时这个…

《大数据与人工智能:提升数据质量与数量的利器》

《大数据与人工智能&#xff1a;提升数据质量与数量的利器》 一、大数据与人工智能的融合趋势二、大数据增加数据数量的方法&#xff08;一&#xff09;不同途径的数据增量&#xff08;二&#xff09;数据增强的多样方法 三、人工智能提升数据数量的手段&#xff08;一&#xf…

算法中使用的数据结构解释*

算法中使用的数据结构解释 在算法的执行过程中&#xff0c;需要有能够容纳临时数据的内存数据结构。数据结构的有效实施需要选择适当的数据结构。迭代或递归算法需要专门为其逻辑设计的数据结构。 也有人表述为容器&#xff0c;存放数据的容器。 在递归算法的情况下&#xff0c…

UE4安卓Gradle工程中的libUE4.so的生成原理

流程图 流程图放在最前面&#xff0c;下面是讲解。 libUE4.so 问&#xff1a;在UE4安卓开发中&#xff0c;libUE4.so即是符号表&#xff0c;又是引擎代码native&#xff0c;是吗&#xff1f; 答&#xff1a;是的&#xff0c;libUE4.so在UE4安卓开发中既包含符号表&#xff0c;…

C4.【C++ Cont】C++数据类型和typedef的补充说明

1.数据类型 C同C语言的一样的数据类型不在赘述,参见3.【C语言】内置数据类型,这里只讲不同点 1.在C中,布尔类型包含在头文件iostream中,不用像C语言一样包含stdbool.h 布尔类型变量的定义写法和C语言不同,只能写成 bool a true; bool b false; bool不可写成_Bool或Bool …

Windows部署rabbitmq

本次安装环境&#xff1a; 系统&#xff1a;Windows 11 软件建议版本&#xff1a; erlang OPT 26.0.2rabbitmq 3.12.4 一、下载 1.1 下载erlang 官网下载地址&#xff1a; 1.2 下载rabbitmq 官网下载地址&#xff1a; 建议使用解压版&#xff0c;安装版可能会在安装软件…

前端学习-盒子模型(十八)

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 目录 前言 盒子模型组成 边框 语法 边框简写 代码示例 表格的细线边框 语法 内边距 内边距复合写法 外边距 外边距典型应用 外边距合并 清除内外边距 总结 前…

GHuNeRF: Generalizable Human NeRF from a Monocular Video

研究背景 研究问题&#xff1a;这篇文章要解决的问题是学习一个从单目视频中泛化的人类NeRF模型。尽管现有的泛化人类NeRF已经取得了令人印象深刻的成果&#xff0c;但它们需要多视图图像或视频&#xff0c;这在某些情况下可能不可用。此外&#xff0c;一些基于单目视频的人类…

为啥学习数据结构和算法

基础知识就像是一座大楼的地基&#xff0c;它决定了我们的技术高度。而要想快速做出点事情&#xff0c;前提条件一定是基础能力过硬&#xff0c;“内功”要到位。 想要通关大厂面试&#xff0c;千万别让数据结构和算法拖了后腿 我们学任何知识都是为了“用”的&#xff0c;是为…

离线安装Vue2开发环境

在外网进行Vue2开发后&#xff0c;需要转到内网开发&#xff0c;无法在线依赖库安装&#xff0c;需要迁移node_modules。 1.内外网开发电脑安装同样版本的nodejs 我本地安装的node-v16.17.1-x64.msi&#xff0c;所以在内网环境也要按照node-v16.17.1-x64.msi。 在外网环境使用…