springCloud之Stream

1、简介

Spring Cloud Stream是一个用来为微服务应用构建 消息驱动 能力的框架。通过使用 Spring Cloud Strea m ,可以有效简化开发人员对消息中间件的使用复杂度,降低代码与消息中间件间的耦合度,屏蔽消息中间件 之 间的差异性,让开发人员可以有更多的精力关注于核心业务逻辑的处理。

主要有以下几个组件:

1)、目的地绑定器(Destination Binders):负责提供与外部消息系统集成的组件。

2)、固定器(Bindings):介于外部消息系统与应用程序间的桥梁 ,这个应用程序提供了生产者和消费者的消息 (由 Destination Binders 创建)。

3)、输入管道(Input Bindings):消费者通过Input Bindings 连接 Binder ,而 Binder 与 MQ 连接,即消费者通过 Input Bindings 从 MQ 读取数据。

4)、输出管道(Output Bindings):生产者通过Output Bindings 连接 Binder ,而 Binder 与 MQ 连接,即生产者通过 Output Bindings 向 MQ 写入数据。

5)、消息(Message):生产者和消费者使用的规范数据结构,用于与 Binders 通信(从而通过外部消息系统与其他应用程序通信)。

2、具体应用示例1(MQ使用kafka)

引入依赖

<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
2.1、生产者

配置文件

server:port: 8090
spring:cloud:stream:kafka:binder:brokers: 192.168.30.88:9092,192.168.30.89:9092bindings:producer-out-0:destination: topic1content-type: application/json

代码实现

@Autowired
private StreamBridge streamBridge;@GetMapping("/test/send")
public String sendMsg(@RequestParam("msg") String msg){Map<String , Object> map = new HashMap<>();map.put("tag", "tags");MessageHeaders headers = new MessageHeaders(map);// 封装消息Message<String> message = MessageBuilder.createMessage(msg, headers);//发送消息streamBridge.send("producer-out-0", message);return msg;
}
2.2、消费者

配置文件

server:port: 8091
spring:cloud:stream:kafka:binder:brokers: 192.168.30.88:9092,192.168.30.89:9092function:definition: consumer # 这个名称要和下面bindings的consumer-in-0第一个单词一样bindings:consumer-in-0:destination: topic1content-type: application/json

代码实现

// 向容器中添加Consumer<Message<String>>类型的bean即可
@Bean
public Consumer<Message<String>> consumer(){return msg -> {System.out.println("接收到消息:" + msg.getPayload());};
}
3、具体应用示例2(MQ使用Rocketmq)

引入依赖

<dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>

代码实现

@Autowired
private StreamBridge streamBridge;@GetMapping("/test/send")
public String sendMsg(@RequestParam("msg") String msg){Map<String , Object> map = new HashMap<>();map.put(MessageConst.PROPERTY_TAGS, "tags");MessageHeaders headers = new MessageHeaders(map);// 封装消息Message<String> message = MessageBuilder.createMessage(msg, headers);//发送消息streamBridge.send("producer-out-0", message);return JSON.toJSONString(message);
}
3.2、消费者

配置文件:

server:port: 8091
spring:cloud:stream:rocketmq:binder:name-server: 192.168.30.88:9876function:definition: consumer # 这个名称要和下面bindings的consumer-in-0第一个单词一样bindings:consumer-in-0:destination: topic1content-type: application/json

代码实现:

// 向容器中添加Consumer<Message<String>>类型的bean即可
@Bean
public Consumer<Message<String>> consumer(){return msg -> {System.out.println("接收到消息:" + msg.getPayload());};
}

注:

1、在spring-cloud-stream 3.1.0之前的版本,还有采用定义Source、Sink等方式编写消息生产者和消费者,在3.1.0以后的版本中弃用@StreamListener的方式,而采用函数式编程的方式接入,使用StreamBrige来进行发送。

2、注意binding的名称命名规则

例如:上面的代码中定义的consumer。

# 输入:    <方法名> + -in- + <index>
# 输出:    <方法名> + -out- + <index>

总结:本文介绍Stream统一消息中间件的模型,给出基于kafka和Rocketmq两种消息中间件模型下的使用案例,以及给出废弃使用老版本的Source、Sink模式解释。帮助大家快速上手Stream的使用。

       本人是一个从小白自学计算机技术,对运维、后端、各种中间件技术、大数据等有一定的学习心得,想获取自学总结资料(pdf版本)或者希望共同学习,关注微信公众号:上了年纪的小男孩。后台回复相应技术名称/技术点即可获得。(本人学习宗旨:学会了就要免费分享)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/593924.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

stm32学习总结:5、Proteus8+STM32CubeMX+MDK仿真串口并使用串口打印日志(注意重定向printf到串口打印的问题)

stm32学习总结&#xff1a;5、Proteus8STM32CubeMXMDK仿真串口并使用串口打印日志&#xff08;注意重定向printf到串口打印的问题&#xff09; 文章目录 stm32学习总结&#xff1a;5、Proteus8STM32CubeMXMDK仿真串口并使用串口打印日志&#xff08;注意重定向printf到串口打印…

软碟通UltraISO制作U盘安装Ubuntu

清华大学开源软件镜像站https://mirrors.tuna.tsinghua.edu.cn/ 从里面下载ubuntu-22.04-desktop-amd64.iso UltraISO是一款非常不错的U盘启动盘制作工具&#xff0c;一直被许多网友们所喜欢&#xff0c;使用简单、方便。 UltraISO官方下载地址&#xff1a;https://cn.ultrais…

魔改版小市值策略

策略思路 最近几年&#xff0c;小市值策略一直都收益不错&#xff08;当然&#xff0c;不包含17年和18年&#xff09;。小市值因子对收益的影响是很大的。特别是行情不好的时候&#xff0c;大家都忙着炒作热点&#xff0c;那么这时候符合题材的小市值更加符合炒作标准了。 为…

安装tensorrt环境在linux上

在linux上输入命令 bash cat /etc/os-release 命令查看系统版本 nvidia-smi命令后有内容弹出而没有报错,表明系统中安装了NVIDIA显卡驱动&#xff0c;并且该命令成功地显示了有关NVIDIA GPU的信息。 输入nvcc -V并且看到输出时,这表明您的系统中已经安装了NVIDIA的CUDA工具…

MySQL第三战:CRUD,函数1以及unionunion all

前言 在当今的数字化时代&#xff0c;数据库已经成为信息管理的重要工具。其中&#xff0c;MySQL作为一种流行的关系型数据库管理系统&#xff0c;已经广泛应用于各种业务场景。在本文中&#xff0c;我们将深入探讨MySQL中的核心概念&#xff0c;包括创建&#xff08;Create&a…

[每周一更]-(第51期):Go的调度器GMP

参考文献 https://learnku.com/articles/41728http://go.cyub.vip/gmp/gmp-model.html#g-m-phttps://blog.csdn.net/ByteDanceTech/article/details/129292683https://www.ququ123.top/2022/04/golang_gmp_principle/ 什么是GMP? GMP模型是Go语言并发模型的核心概念&#x…

ASP.NET Core基础之图片文件(一)-WebApi访问静态图片

阅读本文你的收获&#xff1a; 学会在WebApi项目中访问静态图片了解静态文件中间件UseStaticFiles的用法 系统中免不了要去处理图片文件&#xff0c;比如上传商品的图片、显示商品的图片&#xff0c;访问系统中的图片等等&#xff0c;根据微软官网描述&#xff1a; 静态文件&a…

自动化测试框架 —— pytest框架入门到入职篇

01、pytest框架介绍 pytest 是 python 的第三方单元测试框架&#xff0c;比自带 unittest 更简洁和高效&#xff0c;支持非常丰富的插件&#xff0c;同时兼容 unittest 框架。这就使得我们在 unittest 框架迁移到 pytest 框架的时候不需要重写代码。 pytest框架优点 1、简单…

Ubuntu Server 22.04 连接Wifi并配置静态IP

Ubuntu Server 22.04 连接Wifi并配置静态IP 前言&#xff1a;我家最近好几台电脑&#xff0c;我都想跑着Ubuntu Server做服务器&#xff0c;但是近几年的超级本已经不自带网口了&#xff0c;所以我就考虑用Wifi来联网&#xff0c;速度也还可以&#xff0c;但是既然是跑服务&…

Nginx 中的日志

目录 1.定制访问日志记录格式 1.1 全部日志记录 1.2 每个网站独属一份日志 2.日志路径 3.错误日志 1.定制访问日志记录格式 1.1 全部日志记录 该配置处于nginx.conf 文件中 log_format compression $remote_addr - $remote_user [$time_local] "$request" $sta…

技术扫盲:如何优雅的使用 java -jar

java -jar xxx.jar java -jar 是一个用于在命令行界面中执行 Java 可执行 JAR 文件的命令。它的语法如下&#xff1a; java -jar <JAR 文件路径> [参数]其中&#xff1a; java 是 Java 运行时环境的可执行文件。-jar 是一个选项&#xff0c;表示要执行的文件是一个 JA…

Postman版IDEA插件!免费!

Postman是大家最常用的API调试工具&#xff0c;那么有没有一种方法可以不用手动写入接口到Postman&#xff0c;即可进行接口调试操作&#xff1f;今天给大家推荐一款IDEA插件&#xff1a;Apipost Helper&#xff0c;写完代码就可以调试接口并一键生成接口文档&#xff01;而且还…

一文读懂Solana 上最正统的铭文通证$mash

早在 2023 年的 11 月&#xff0c;包括 Solana、Avalanche、Polygon、Arbitrum、zkSync 等生态正在承接比特币铭文生态外溢的价值。当然&#xff0c;因铭文赛道过于火爆&#xff0c;当 Avalanche、BNB Chain 以及 Polygon 等链上 Gas 飙升至极值&#xff0c;Arbitrum、zkSync 等…

C++学习(二)

我们是在学习过了C语言&#xff0c;基础上来看这篇文章的&#xff0c;如果你是直接学C&#xff0c;这篇文章不太适合你的&#xff0c;因为这里只讲C基础中与C语言不同之处。 一.main函数区别 在C语言中&#xff0c;我们写main函数是不是可以省略前面的int,但是在C中&#xff…

深度学习|5.2 偏差和方差

偏差和方差 Bias&#xff08;偏差&#xff09;&#xff1a;偏差是指对样本点的估计值和实际值的偏离程度。偏差越大&#xff0c;样本点越不符合实际值。偏差衡量单个数据点的偏离程度&#xff0c;如下图的第二行。 Variance&#xff08;方差&#xff09;&#xff1a;方差能代表…

希尔排序算法——交换法

希尔排序&#xff0c;又称缩小增量排序&#xff0c;是插入排序的改进版。它是因DL&#xff0e;Shell于1959年提出而得名。希尔排序的实质就是分组插入排序&#xff0c;该方法是非稳定的排序算法。 具体来说&#xff0c;希尔排序通过将待排序序列分成多个子序列&#xff0c;分别…

大数据Doris(四十九):Doris数据导出介绍

文章目录 Doris数据导出介绍 一、​​​​​​​使用示例

CMake入门教程【核心篇】引用子模块.cmake文件(include)

&#x1f608;「CSDN主页」&#xff1a;传送门 &#x1f608;「Bilibil首页」&#xff1a;传送门 &#x1f608;「本文的内容」&#xff1a;CMake入门教程 &#x1f608;「动动你的小手」&#xff1a;点赞&#x1f44d;收藏⭐️评论&#x1f4dd; 文章目录 include子模块举个例…

C#编程-使用构造函数和析构函数

使用构造函数和析构函数 如果想要在以创建对象时就初始化成员变量,可以创建名为构造函数的特殊函数。您可能还需要使用对象后从内存中删除这些对象。这可通过称为析构函数的函数实现。 实现构造函数 构造函数是在创建对象时自动调用的特殊方法。无须显式地调用构造函数。请…

使用MQTT.JS创建一个网页版的MQTT客户端

一、MQTT.JS介绍 MQTT.js 是一个开源的 MQTT 协议的客户端库&#xff0c;使用 JavaScript 编写&#xff0c;主要用于 Node.js 和 浏览器环境中。是JavaScript 环境下的 MQTT 客户端库。可以用于微信小程序、支付宝小程序等定制浏览器环境。 我们可以直接在HTML文件中进行调用…