RocketMQ - Spring Cloud Alibaba RocketMQ

Spring Cloud Stream是Spring Cloud体系内的一个框架,用于构建与共享消息传递系统连接的高度可伸缩的事件驱动微服务,其目的是简化消息业务在Spring Cloud应用中的开发。

Spring Cloud Stream的架构图如下所示,应用程序通过Spring Cloud Stream注入的输入通道inputs和输出通道outputs与消息中间件Middleware通信,消息通道通过特定的中间件绑定器Binder实现连接到外部代理。
Spring Cloud Stream的架构图
Spring Cloud Stream的实现基于发布/订阅机制,核心由四部分构成:Spring Framework中的Spring Messaging和Spring Integrataion,以及Spring Cloud Stream中的Binders和Bindings。

Spring Messaging:Spring Framework中的统一消息编程模型,其核心对象如下:

  • Message: 消息对象,包含消息头Header和消息体Payload。
  • MessageChannel:消息通道接口,用于接收消息,提供send方法将消息发送致消息通道。
  • MessageHandler:消息处理器接口,用于处理消息逻辑。

Spring Integration:Spring Framework中用于支持企业集成的一种扩展机制,作用是提供一个简单的模型来构建企业集成解决方案,对Spring Messaging进行了扩展。

  • MessageDispatcher: 消息分发接口,用于分发消息和添加删除消息处理器。
  • MessageRouter:消息路由接口,定义默认的输出消息通道。
  • Filter:消息的过滤注解,用于配置消息过滤表达式。
  • Aggregator:消息的聚合注解,用于将多条消息聚合成一条。
  • Splitter:消息的分割,用于将一条消息拆分成多条。

Binders:目标绑定器,负责与外部消息中间件系统集成的组件。

  • doBindProducer:绑定消息中间件客户端发送消息模块。
  • doBindConsumer:绑定消息中间件客户端接收消息模块。

Bindings:外部消息中间件系统与应用程序提供的消息生产者和消费者之间的桥梁。

Spring Cloud Stream官方提供了Kafka Binder和RabbitMQ Binder,用于集成Kafka和RabbitMQ,Spring Cloud Alibaba中加入了RocketMQ Binder,用于将RocketMQ集成到Spring Cloud Stream。

Spring Cloud Alibaba RocketMQ架构图

Spring Cloud Alibaba RocketMQ的架构图如下所示:
在这里插入图片描述

  • MessageChannel(output):消息通道,用于发送消息,Spring Cloud Stream的标准接口。
  • MessageChannel(input):消息通道,用于订阅消息,Spring Cloud Stream的标准接口。
  • Binder bindProducer:目标绑定器,将发送通道发过来的消息发送到RocketMQ消息服务器,由Spring Cloud Alibaba团队按照Spring Cloud Stream的标准协议实现。
  • Binder bindConsumer:目标绑定器,将接收到RocketMQ消息服务器的消息推送给订阅通道,由Spring Cloud Alibaba团队按照Spring Cloud Stream的标准协议实现。

Spring Cloud Stream消息发送流程

Spring Cloud Stream消息发送流程如下图所示,包括发送、订阅、分发、委派、消息处理等,具体实现如下:
在这里插入图片描述
在业务代码中调用MessageChannel接口的Send()方法,例如source.output().send(message)。

public interface Source {String OUTPUT = "output";@Output("output")MessageChannel output();
}@FunctionalInterface
public interface MessageChannel {long INDEFINITE_TIMEOUT = -1L;default boolean send(Message<?> message) {return this.send(message, -1L);}boolean send(Message<?> var1, long var2);
}

AbstractMessageChannel是消息通道的基本实现类,提供发送消息和接收消息的公用方法。

@IntegrationManagedResource
public abstract class AbstractMessageChannel extends IntegrationObjectSupport implements MessageChannel, TrackableComponent, InterceptableChannel, MessageChannelMetrics, ConfigurableMetricsAware<AbstractMessageChannelMetrics>, IntegrationPattern {public boolean send(Message<?> messageArg, long timeout) {// 省略部分代码sent = this.doSend(message, timeout);// 省略部分代码return sent;}protected abstract boolean doSend(Message<?> var1, long var2);
}

消息发送到AbstractSubscribableChannel类实现的doSend()方法。

protected boolean doSend(Message<?> message, long timeout) {try {return this.getRequiredDispatcher().dispatch(message);} catch (MessageDispatchingException var6) {String description = var6.getMessage() + " for channel '" + this.getFullChannelName() + "'.";throw new MessageDeliveryException(message, description, var6);}}

通过消息分发类MessageDispatcher把消息分发给MessageHandler。

private MessageDispatcher getRequiredDispatcher() {MessageDispatcher dispatcher = this.getDispatcher();Assert.state(dispatcher != null, "'dispatcher' must not be null");return dispatcher;
}protected abstract MessageDispatcher getDispatcher();

从AbstractSubscribableChannel的实现类DirectChannel得到MessageDispatcher的实现类UnicastingDispatcher。

public class DirectChannel extends AbstractSubscribableChannel {protected UnicastingDispatcher getDispatcher() {return this.dispatcher;}
}

调用dispatch()方法把消息分发给各个MessageHandler。

public class UnicastingDispatcher extends AbstractDispatcher {public final boolean dispatch(Message<?> message) {if (this.executor != null) {Runnable task = this.createMessageHandlingTask(message);this.executor.execute(task);return true;} else {return this.doDispatch(message);}}private boolean doDispatch(Message<?> message) {if (this.tryOptimizedDispatch(message)) {return true;} else {boolean success = false;Iterator<MessageHandler> handlerIterator = this.getHandlerIterator(message);if (!handlerIterator.hasNext()) {throw new MessageDispatchingException(message, "Dispatcher has no subscribers");} else {ArrayList exceptions = null;while(!success && handlerIterator.hasNext()) {MessageHandler handler = (MessageHandler)handlerIterator.next();try {handler.handleMessage(message);success = true;} catch (Exception var9) {RuntimeException runtimeException = IntegrationUtils.wrapInDeliveryExceptionIfNecessary(message, () -> {return "Dispatcher failed to deliver Message";}, var9);if (exceptions == null) {exceptions = new ArrayList();}exceptions.add(runtimeException);boolean isLast = !handlerIterator.hasNext();if (!isLast && this.failover) {this.logExceptionBeforeFailOver(var9, handler, message);}this.handleExceptions(exceptions, message, isLast);}}return success;}}}
}

遍历所有MessageHandler,调用handlerMessage()处理消息。

private Iterator<MessageHandler> getHandlerIterator(Message<?> message) {Set<MessageHandler> handlers = this.getHandlers();return this.loadBalancingStrategy != null ? this.loadBalancingStrategy.getHandlerIterator(message, handlers) : handlers.iterator();}

查看MessageHandler是从哪里来的,也就是handlers列表中的MessageHandler是如何添加的。

public abstract class AbstractSubscribableChannel extends AbstractMessageChannel implements SubscribableChannel, SubscribableChannelManagement {public boolean subscribe(MessageHandler handler) {MessageDispatcher dispatcher = this.getRequiredDispatcher();boolean added = dispatcher.addHandler(handler);this.adjustCounterIfNecessary(dispatcher, added ? 1 : 0);return added;}
}

AbstractMessageChannelBinder在初始化Binding时,会创建并初始化SendingHandler,调用subscribe()添加到handlers列表。

public abstract class AbstractMessageChannelBinder<C extends ConsumerProperties, P extends ProducerProperties, PP extends ProvisioningProvider<C, P>> extends AbstractBinder<MessageChannel, C, P> implements PollableConsumerBinder<MessageHandler, C>, ApplicationEventPublisherAware {public final Binding<MessageChannel> doBindProducer(final String destination, MessageChannel outputChannel, final P producerProperties) throws BinderException {// 创建Producer的messageHandlerfinal MessageHandler producerMessageHandler;final ProducerDestination producerDestination;try {// 省略部分代码producerMessageHandler = this.createProducerMessageHandler(producerDestination, producerProperties, outputChannel, errorChannel);// 省略部分代码// 创建SendingHandler并调用subscribe((SubscribableChannel)outputChannel).subscribe(new AbstractMessageChannelBinder.SendingHandler(producerMessageHandler, HeaderMode.embeddedHeaders.equals(producerProperties.getHeaderMode()), this.headersToEmbed, this.useNativeEncoding(producerProperties)));// 省略部分代码}}

Producer的MessageHandler是由消息中间件Binder来完成的,Spring Cloud Stream提供了创建MessageHandler的规范。

AbstractMessageChannelBinder的初始化由AbstractBindingLifecycle在Spring 容器加载所有Bean。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/213751.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

论文阅读《Domain Generalized Stereo Matching via Hierarchical Visual Transformation》

论文地址&#xff1a;https://openaccess.thecvf.com/content/CVPR2023/html/Chang_Domain_Generalized_Stereo_Matching_via_Hierarchical_Visual_Transformation_CVPR_2023_paper.html 概述 立体匹配模型是近年来的研究热点。但是&#xff0c;现有的方法过分依赖特定数据集上…

五年制专转本备考冲刺阶段,老师给你六点建议助你上岸

1、热衷的不是学习&#xff0c;而是思考 人与人之间最大的差别在于思维的差别&#xff0c;也可以说是思考的差别。专转本也是如此&#xff0c;有人思考得简单&#xff0c;有人思考得复杂&#xff1b;有人想得全面&#xff0c;有人想得肤浅。 只有善于思考&#xff0c;才会对问…

100:ReconFusion: 3D Reconstruction with Diffusion Priors

简介 官网 少样本重建必然导致nerf失败&#xff0c;论文提出使用diffusion模型来解决这一问题。从上图不难看出&#xff0c;论文一步步提升视角数量&#xff0c;逐步与Zip-NeRF对比。 实现流程 Diffusion Model for Novel View Synthesis 给定一组输入图像 x o b s { x i…

Jmeter beanshell编程实例

1、引言 BeanShell是一种小型的&#xff0c;免费的&#xff0c;可嵌入的符合Java语法规范的源代码解释器&#xff0c;具有对象脚本语言特性。 在Jmeter实践中&#xff0c;由于BeanShell组件较高的自由度&#xff0c;通常被用来处理较为复杂&#xff0c;其它组件难以处理的问题…

c语言:文件操作(1)

前言&#xff1a;为什么要使用文件 使用文件可以让程序在不同运行之间保存和读取数据。这样可以实现持久化存储&#xff0c;即使程序关闭后数据也不会丢失。文件也可以用于数据交换&#xff0c;允许不同程序之间共享信息。在 C 语言中&#xff0c;文件还可以用于读取配置信息&…

系统架构设计师教程(三)信息系统基础知识

信息系统基础知识 3.1 信息系统概述3.1.1 信息系统的定义3.1.2 信息系统的发展3.1.3 信息系统的分类3.1.4 信息系统的生命周期3.1.5 信息系统建设原则3.1.6 信息系统开发方法 3.2 业务处理系统 (TPS)3.2.1 业务处理系统的概念3.2.2 业务处理系统的功能3.2.3 业务处理系统的特点…

Python:核心知识点整理大全13-笔记

目录 6.4.3 在字典中存储字典 6.5 小结 第7章 用户输入和while循环 7.1 函数 input()的工作原理 7.1.1 编写清晰的程序 7.1.2 使用 int()来获取数值输入 7.1.3 求模运算符 7.1.4 在 Python 2.7 中获取输入 7.2 while 循环简介 7.2.1 使用 while 循环 往期快速传送门…

SPI 通信-stm32入门

本节我们将继续学习下一个通信协议 SPI&#xff0c;SPI 通信和我们刚学完的 I2C 通信差不多。两个协议的设计目的都一样&#xff0c;都是实现主控芯片和各种外挂芯片之间的数据交流&#xff0c;有了数据交流的能力&#xff0c;我们主控芯片就可以挂载并操纵各式各样的外部芯片&…

gpu版本的GNN的demo

1、当涉及到在GPU上运行图神经网络&#xff08;GNN&#xff09;时&#xff0c;通常使用深度学习框架&#xff0c;如PyTorch或TensorFlow。在这里&#xff0c;我将为您提供一个使用PyTorch Geometric库实现GNN的简单示例。 首先&#xff0c;确保您已经安装了PyTorch和PyTorch G…

第 375 场 LeetCode 周赛题解

A 统计已测试设备 模拟&#xff1a;记录当前已测试设备数量 class Solution { public:int countTestedDevices(vector<int> &batteryPercentages) {int res 0;int s 0;for (auto x: batteryPercentages) {if (x - s > 0) {res;s;}}return res;} };B 双模幂运算 …

【无线网络技术】——无线城域网(学习笔记)

&#x1f4d6; 前言&#xff1a;无线城域网&#xff08;WMAN&#xff09;是指在地域上覆盖城市及其郊区范围的分布节点之间传输信息的本地分配无线网络。能实现语音、数据、图像、多媒体、IP等多业务的接入服务。其覆盖范围的典型值为3~5km&#xff0c;点到点链路的覆盖可以高达…

少儿编程考级:激发孩子逻辑思维能力的关键

在当今信息化时代&#xff0c;少儿编程已经成为孩子们不可或缺的一项技能。而少儿编程考级&#xff0c;则是检验孩子们在这一技能上所取得的成就的重要途径。少儿编程考级不仅能够激发孩子们的逻辑思维能力&#xff0c;还能够提高他们的动手能力和创造力。6547网将详细介绍少儿…

电源模块测试系统测试LED电源项目的优势

LED电源测试是电源在设计、生产过程中的关键环节&#xff0c;也是确保LED照明产品可靠性和稳定性的重要步骤。LED电源测试一般包括电压、电流、效率、稳定性等。电源模块测试系统测试LED电源&#xff0c;实现自动化测试&#xff0c;保证测试结果的可靠性。 LED电源测试项目及方…

实现加盐加密方法以及MappedByteBuffer,RandomAccess

目录 自己实现 Spring Security MappedByteBuffer RandomAccess 加盐加密的实现 自己实现 传统MD5可通过彩虹表暴力破解&#xff0c; 加盐加密算法是一种常用的密码保护方法&#xff0c;它将一个随机字符串&#xff08;盐&#xff09;添加到原始密码中&#xff0c;然后再进…

力扣17. 电话号码的字母组合(java 回溯法)

Problem: 17. 电话号码的字母组合 文章目录 题目描述思路解题方法复杂度Code 题目描述 思路 题目给定一串数字&#xff0c;要求我们找出所有可能的字母组合&#xff0c;即我们可以穷举出所有可能的结果&#xff0c;而涉及到穷举我们自然可以想到利用回溯来解决问题&#xff0c…

无线且列窄图片如何转excel?

写此文原因&#xff1a;图片要转excel&#xff0c;这放以前&#xff0c;是不能实现的功能&#xff0c;但随着人工智能的蓬勃发展&#xff0c;人们已克服了这一难题&#xff0c;但是&#xff0c;我们知道&#xff0c;要将图片识别成excel&#xff0c;识别程序首先要先识别图片中…

如何在小米路由器4A千兆版刷入OpenWRT并通过内网穿透工具实现公网远程访问

文章目录 前言1. 安装Python和需要的库2. 使用 OpenWRTInvasion 破解路由器3. 备份当前分区并刷入新的Breed4. 安装cpolar内网穿透4.1 注册账号4.2 下载cpolar客户端4.3 登录cpolar web ui管理界面4.4 创建公网地址 5. 固定公网地址访问 前言 OpenWRT是一个高度模块化、高度自…

交易历史记录20231206 记录

昨日回顾&#xff1a; select top 10000 * from dbo.CODEINFO A left join dbo.全部&#xff21;股20231206010101 B ON A.CODE B.代码 left join dbo.全部&#xff21;股20231206CONF D on A.CODED.代码left join dbo.全部&#xff21;股20231206 G on A.CODEG.代码 left…

Kafka-快速实战

Kafka介绍 ChatGPT对于Apache Kafka的介绍&#xff1a; Apache Kafka是一个分布式流处理平台&#xff0c;最初由LinkedIn开发并于2011年开源。它主要用于解决大规模数据的实时流式处理和数据管道问题。 Kafka是一个分布式的发布-订阅消息系统&#xff0c;可以快速地处理高吞吐…

阿里云国际基于CentOS系统镜像快速部署Apache服务

阿里云轻量应用服务器提供了Windows Server系统镜像和主流的Linux系统镜像&#xff0c;您可以通过该类镜像创建纯净、安全、稳定的运行环境。本文以CentOS 7.6系统镜像为例&#xff0c;介绍如何快速配置Apache服务。 背景信息 注意&#xff0c;阿里云国际通过corebyt注册并充…