SpringCloud 集成 RocketMQ 及配置解析

文章目录

  • 前言
  • 一、SpringCloud 集成 RocketMQ
    • 1. pom 依赖
    • 2. yml 配置
    • 3. 操作实体
    • 4. 生产消息
      • 4.1. 自动发送消息
      • 4.2. 手动发送消息
    • 5. 消费消息
  • 二、配置解析
    • 1. spring.cloud.stream.function.definition


前言

  1. 定义
    Spring Cloud Stream 是一个用来为微服务应用构建消息驱动能力的框架。它可以基于 Spring Boot 来创建独立的、可用于生产的 Spring 应用程序。Spring Cloud Stream 为一些供应商的消息中间件产品提供了个性化的自动化配置实现,并引入了发布-订阅、消费组、分区这三个核心概念。简单的说,Spring Cloud Stream本质上就是整合了Spring Boot和Spring Integration,实现了一套轻量级的消息驱动的微服务框架。

  2. 抽象模型
    我们都知道市面上有很多消息中间件,Sping Cloud Stream 为了可以集成各种各样的中间件,它抽象出了 Binder 的概念,每个消息中间件都需要有对应自己的 Binder。这样它就可以根据不同的 Binder 集成不同的中间件。下图的input和output是channel,Binder则是消息中间件和通道之间的桥梁。
    在这里插入图片描述

  3. 绑定器
    通过使用 Spring Cloud Stream,可以有效简化开发人员对消息中间件的使用复杂度,让系统开发人员可以有更多的精力关注于核心业务逻辑的处理。但是目前 Spring Cloud Stream 只支持 RabbitMQ 和 Kafka 的自动化配置。
    Spring Cloud Stream 提供了 Binder (负责与消息中间件进行交互),我们则通过 inputs 或者 outputs 这样的消息通道与 Binder 进行交互。

Binder 绑定器是 Spring cloud Stream 中一个非常重要的概念,实现了应用程序和消息中间件之间的隔离,同时我们也可以通过应用程序实现,消息中间件之间的通信。在我们的项目的可以继承多种绑定器,我们可以根据不同特性的消息使用不同的消息中间件。Spring Cloud Stream 为我们实现了 RabbitMQ 和Kafka 的绑定器。如果你想使用其他的消息中间件需要自己去实现绑定器接口。

一、SpringCloud 集成 RocketMQ

1. pom 依赖

<!-- rocketmq -->
<dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>

2. yml 配置

spring:cloud:stream:function:definition: producer1;consumer1 # 方法定义(用于定义发送者或消费者方法)# 配置消息通道通用属性(适用于所有消息中间件)bindings:# 配置channel消息通道consumer1-in-0:destination: consumer_topic # topic消息主题content-type: application/json # 内容格式group: consumer-group # 消费者组producer1-out-0:destination: producer_topic # topic消息主题content-type: application/json # 内容格式rocketmq:binder:name-server: 127.0.0.1:9876 # rocketmq服务地址vipChannelEnabled: true # 是否开启vip通道(兼容老版本使用。多监听一个端口用于接受处理消息,防止端口占用。)# 配置消息通道独特属性(仅适用于rocketmq)bindings:# 配置channel消息通道(生产者:[functionName]-out-[index],消费者:[functionName]-in-[index])producer1-out-0:producer:group: consumer-groupsync: true # 是否开启同步发送consumer1-in-0: consumer:subscription: myTag  # 消费tagdelayLevelWhenNextConsume: -1suspendCurrentQueueTimeMillis: 99999999broadcasting: false # 是否使用广播消费,默认为false使用集群消费

3. 操作实体

package com.demo.model;import lombok.AllArgsConstructor;
import lombok.Data;/*** 消息model*/
@Data
@AllArgsConstructor
public class MsgModel {/*** 消息id*/private String msgId;/*** 消息内容*/private String message;
}

4. 生产消息

4.1. 自动发送消息

通过 MessageBuilder 自动发送消息。

package com.demo;import com.demo.model.MsgModel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;import java.util.function.Supplier;/*** 消息生产者类*/
@Configuration
@Slf4j
public class MyProducer {/*** 消息生产者1*/@Beanpublic Supplier<Message<MsgModel>> producer1() {return () -> {MsgModel msgModel = new MsgModel(System.currentTimeMillis(), "测试消息");log.info("producer1发送消息:" + msgModel);return MessageBuilder.withPayload(entity).build();};}
}

这种方式定义 suppelier 会 默认1000ms 发送一次记录。可以修改 spring.cloud.stream.poller.fixedDelay 设置延迟毫秒值。

4.2. 手动发送消息

通过 StreamBridge 手动发送消息。

package com.demo.controller;import com.demo.model.MsgModel;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;/*** 消息controller*/
@RestController
@RequiredArgsConstructor
@RequestMapping("/msg")
@Slf4j
public class MsgController {private final StreamBridge streamBridge;/*** 发送消息*/@GetMapping("/send")public void sendMsg() {MsgModel msgModel = new MsgModel(System.currentTimeMillis(), "测试消息");log.info("producer1发送消息:" + msgModel);streamBridge.send("producer1-out-0", MessageBuilder.withPayload(entity).setHeader("MyHearder", "这是一个请求头").build());}
}

5. 消费消息

package com.demo;import com.demo.model.MsgModel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;import java.util.function.Consumer;/*** 消息消费者类*/
@Configuration
@Slf4j
public class ReceiveMQ {/*** 消息消费者1*/@Beanpublic Consumer<Message<MsgModel>> consumer1(){return (message)->{MessageHeaders headers = message.getHeaders();MsgModel msgModel = message.getPayload();log.info("consumer1接收消息,消息头:" + headers.get("MyHeader"));log.info("consumer1接收消息,消息内容:" + msgModel);};}
}

二、配置解析

1. spring.cloud.stream.function.definition

进行生产者或消费者方法定义,在 rocketmq 初始时会加载这些方法以创建生产者或消费者列表。

不管是创建 Consumer 还是 Supplier 或者是 Function Stream 都会将其方法名称进行一个 topic 拆封和绑定。假设创建了一个 Consumer< String > myTopic 的方法,Stream 会将其 拆分成 In 和 out 两个通道:

  • 输入通道(消费者): [functionName]-in-[index]
    consumer1-in-0
  • 输出通道(生产者): [functionName]-out-[index]
    producer1-out-0

注意:这里的 functionName 需要和生产者或消费者方法名称以及 spring.cloud.stream.function.definition 下的名称保持一致。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/836932.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

spacy微调BERT-NER模型

数据准备 加载数据集 from tqdm.notebook import tqdm import osdataset [] with open(train_file, r) as file:for line in tqdm(file.readlines()):data json.loads(line.strip())dataset.append(data)你可以按照 CLUENER 的格式准备训练数据&#xff0c; 例如&#xff1…

(done) Beam search

参考视频1&#xff1a;https://www.bilibili.com/video/BV1Gs421N7S1/?spm_id_from333.337.search-card.all.click&vd_source7a1a0bc74158c6993c7355c5490fc600 &#xff08;beam search 视频&#xff09; 参考博客1&#xff1a;https://jasonhhao.github.io/2020/06/19/…

在word中创建宏来多级列表的编号不显示的bug

出现问题的示意图如下&#xff0c;可以看出标题前面1.1消失了 第一步&#xff1a;选择开发工具 第二步&#xff1a; 第三步&#xff1a;选择当前文件&#xff08;创建宏后&#xff0c;方便查找&#xff09; 第四步&#xff1a; 第五步&#xff1a;打卡VB 第七步&#xf…

ONVIF系列一:ONVIF介绍

感谢博主OceanStar的学习笔记&#xff0c;ONVIF系列二和系列三中安装操作过程及代码实现参考了这位博主的博客。 ONVIF系列&#xff1a; ONVIF系列一&#xff1a;ONVIF介绍 ONVIF系列二&#xff1a;Ubuntu安装gSOAP、生成ONVIF代码框架 ONVIF系列三&#xff1a;ONVIF客户端实现…

机器人开发项目实现过程

比赛项目实现过程 第一步&#xff1a;设置远程桌面连接 登录机器人系统&#xff0c;设置网络&#xff0c;参考远程桌面连接20230525.mp4 外接显示器、鼠标和键盘 登录系统 账户&#xff1a;robuster 密码&#xff1a;123456 建议&#xff0c;手机开热点&#xff0c;机器人…

消费新纪元:探索消费增值的财富之旅

你是否曾对日常消费感到一丝无奈&#xff0c;觉得钱一旦花出去就如同流水般逝去&#xff0c;再也无法追回&#xff1f;现在&#xff0c;让我为你揭示一种革命性的消费观念——消费增值&#xff0c;它不仅能满足你的物质需求&#xff0c;还能让你的资金像滚雪球般持续增长&#…

鸿蒙ArkUI开发:常用布局【弹性布局方向图】

弹性布局方向图 Flex({ direction: FlexDirection.Row }) FlexDirection.Row&#xff08;默认值&#xff09;&#xff1a;主轴为水平方向&#xff0c;子组件从起始端沿着水平方向开始排布FlexDirection.RowReverse&#xff1a;主轴为水平方向&#xff0c;子组件从终点端沿着F…

vscode 实现本地服务器部署小结

在查阅 MDN 网站的时候&#xff0c;偶然发现的原来 vscode 也可以实现本地化服务器部署&#xff0c;来模拟服务器的运行。 安装插件 在VSCode的插件市场搜索并安装以下插件&#xff1a; – Live Server&#xff08;用于开启本地服务器&#xff09; – Debugger for Chrome&a…

算法设计与分析(超详解!) 第三节 贪婪算法

1.贪心算法基础 1.贪心算法的基本思想 贪心算法是从问题的某一个初始解出发&#xff0c;向给定的目标推进。但它与普通递推求解过程不同的是&#xff0c;其推动的每一步不是依据某一固定的递推式&#xff0c;而是做一个当时看似最佳的贪心选择&#xff0c;不断地将问题实例归…

【C++】string类的使用④(字符串操作String operations || 常量成员Member constants)

&#x1f525;个人主页&#xff1a; Forcible Bug Maker &#x1f525;专栏&#xff1a; STL || C 目录 前言&#x1f525;字符串操作&#xff08;String operations&#xff09;c_strdataget_allocatorcopyfindrfindfind_first_offind_last_offind_first_not_offind_last_not…

11、FreeRTOS 队列、队列集,邮箱的使用

文章目录 一、队列的特性1.1 队列常规操作1.2 传输数据的两种方法1.3 队列的阻塞访问 二 队列函数2.1创建2.2 复位2.3 删除2.4 写队列2.5 读队列2.6 查询2.7 覆盖/偷看 三、示例3.1示例 队列的基本使用3.2 示例: 分辨数据源3.3 示例: 传输大块数据3.4 : 邮箱(Mailbox) 四、队列…

白盒测试:覆盖测试及测试用例设计

白盒测试&#xff1a;覆盖测试及测试用例设计 一、实验目的 1、掌握白盒测试的概念。 2、掌握逻辑覆盖法。 二、实验任务 某工资计算程序功能如下&#xff1a;若雇员月工作小时超过40小时&#xff0c;则超过部分按原小时工资的1.5倍的加班工资来计算。若雇员月工作小时超过…

ansible -playbook运维工具、语法、数据结构、命令用法、触发器、角色

目录 配置文件 基本语法规则&#xff1a; YAML支持的数据结构 playbook核心元素 ansible-playbook用法&#xff1a; 触发器 特点&#xff1a; 角色&#xff1a; 习题&#xff1a; 配置文件 playbook配置文件使用yaml语法&#xff0c;YAML 是一门标记性语言,专门用来写配…

windows和Linux卸载移动磁盘

文章目录 Linux卸载磁盘target is busy.window卸载磁盘打开事件查看器 Linux卸载磁盘target is busy. #查看有哪些进程访问挂载点 lsof /media/lei/repository/#杀死进程 pkill node window卸载磁盘 #提示 #该设备正在使用中. 请关闭可能使用该设备的所有程序或窗口,然后重试…

Vue 局部布局 Layout 内部布局 [el-row]、[el-col]

之前的布局容器是一个整体的框架&#xff0c;layout里面的布局其实就是el-row和el-col的组合。 基础布局 使用单一分栏创建基础的栅格布局。 通过 ​row ​和 ​col ​组件&#xff0c;并通过 ​col ​组件的 ​span ​属性我们就可以自由地组合布局。 这种最简单&#xff0c;…

2024年可以做的网上兼职有哪些?10个正规赚钱软件平台分享

在数字化浪潮席卷全球的今天&#xff0c;兼职工作早已不再局限于传统的线下模式。只要有一部手机或电脑&#xff0c;你就能轻松开启兼职之旅&#xff0c;实现躺着也能赚钱的梦想&#xff01; 接下来&#xff0c;就让我们一起看看2024年那些靠谱又有趣的网上兼职项目吧&#xff…

STK12 RPO模块学习 (1)

一、背景介绍 在STK12中&#xff0c;在Astrogator的模块上开发了新的模块&#xff08;Rendezvous and proximity operations)。轨道交会接近通常来说是一个很复杂的过程。RPO实现需要对轨道动力学有一个清晰的理解&#xff0c;并且对于Astrogator模块具备很强的背景和经验&…

2024最新软件测试【测试理论+ Linux】面试题(内附答案)

一、测试理论 3.1 你们原来项目的测试流程是怎么样的? 我们的测试流程主要有三个阶段&#xff1a;需求了解分析、测试准备、测试执行。 1、需求了解分析阶段 我们的 SE 会把需求文档给我们自己先去了解一到两天这样&#xff0c;之后我们会有一个需求澄清会议&#xff0c; …

Kaspa是潜力金吗?那么如何获取,以bitget钱包为例

$KAS 这个币比较小众&#xff0c;华语区知道的不多&#xff0c;目前挖这个币的矿工也不多&#xff0c;但是大家都有一个默契&#xff0c;就是尽量不要宣传&#xff0c;先建设生态&#xff0c;自私的心理就是&#xff1a;自己先多挖些币。 简单介绍一下 #KASPA 这个项目&#x…

[算法][BFS][leetcode]994. 腐烂的橘子

题目地址 https://leetcode.cn/problems/rotting-oranges/description/ 错误解法 class Solution {public int orangesRotting(int[][] grid) {//层序遍历int ans 0;for (int i 0;i<grid.length;i) {for(int j 0;j<grid[0].length;j){boolean flag false;if(grid[i][j…