spring-cloud-stream

系列文章目录

第一章 Java线程池技术应用
第二章 CountDownLatch和Semaphone的应用
第三章 Spring Cloud 简介
第四章 Spring Cloud Netflix 之 Eureka
第五章 Spring Cloud Netflix 之 Ribbon
第六章 Spring Cloud 之 OpenFeign
第七章 Spring Cloud 之 GateWay
第八章 Spring Cloud Netflix 之 Hystrix
第九章 代码管理gitlab 使用
第十章 SpringCloud Alibaba 之 Nacos discovery
第十一章 SpringCloud Alibaba 之 Nacos Config
第十二章 Spring Cloud Alibaba 之 Sentinel
第十三章 JWT
第十四章 RabbitMQ应用
第十五章 RabbitMQ 延迟队列
第十六章 spring-cloud-stream

在这里插入图片描述


文章目录

  • 系列文章目录
    • @[TOC](文章目录)
  • 前言
  • 1、stream设计思想
  • 2、编码常用的注解
  • 3、编码步骤
    • 3.1、添加依赖
    • 3.2、修改配置文件
    • 3.3、生产
    • 3.4、消费
    • 3.5、延迟队列
      • 3.5.1、修改配置文件
      • 3.5.2、生产端
      • 3.5.2、消息确认机制 消费端
  • 总结

前言

https://github.com/spring-cloud/spring-cloud-stream-binder-rabbit
官方定义Spring Cloud Stream是一个构建消息驱动微服务的框架。应用程序通过inputs或者outputs来与Spring Cloud Stream中binder对象交互。通过我们配置来binding(绑定),而Spring Cloud Stream的binder对象负责与消息中间件交互。

SpringCloud stream通过使用Spring Integration来连接消息代理中间件以实现消息事件驱动。Spring Cloud Stream为一些供应商的消息中间件产品提供了个性化的自动化配置实现,引用了发布-订阅、消费组、分区的三个核心概念。

Stream让我们不再关注具体MQ的细节我们只需要用一种适配绑定的方式,自动的给我们在各种MQ内切换,总的来说Stream能够屏蔽底层消息中间件的差异、降低切换成本,是统一消息的编程模型。

1、stream设计思想

在这里插入图片描述
在这里插入图片描述

  • Binder:很方便的连接中间件,屏蔽差异
  • Channel:通道是队列Queue的一种抽象,在消息通讯系统中就是实现存储和转发的媒介,通过Channel对队列进行配置
  • Source和Sink:简单的可理解为参照对象是Spring Cloud Stream自身,从Stream发布消息就是输出,接受消息就是输入。

2、编码常用的注解

在这里插入图片描述

组成说明
Middleware中间件,目前只支持RabbitMQ和Kafka
BinderBinder是应用与消息中间件之间的封装,目前实现了Kafka和RabbitMQ的Binder,通过BInder可以很方便的连接中间件,可以动态的改变消息类型(对应于Kafka的topic,RabbitMQ的exchange),这些都可以通过配置文件来实现。
@Input注解标识输入通道,通过该输入通道接收到的消息进入应用程序
@Output注解标识输出通道,发布的消息将通过该通道离开应用程序
@StreamListener监听队列,用于消费者的队列的消息接收
@EnableBinding指信道channel和exchange绑定在一起

3、编码步骤

3.1、添加依赖

<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

3.2、修改配置文件

server:port: 8088spring:cloud:stream:binders: #需要绑定的rabbitmq的服务信息defaultRabbit:  #定义的名称,用于bidding整合type: rabbit  #消息组件类型environment:  #配置rabbimq连接环境spring:rabbitmq:host: localhost   #rabbitmq 服务器的地址port: 5672           #rabbitmq 服务器端口username: tiger       #rabbitmq 用户名password: tiger       #rabbitmq 密码virtual-host: tiger_vh  #虚拟路径bindings:        #服务的整合处理saveOrderOutput:    #这个是消息通道的名称 --->保存订单输出通道destination: exchange-saveOrder     #exchange名称,交换模式默认是topic;把SpringCloud stream的消息输出通道绑定到RabbitMQ的exchange-saveOrder交换器。content-type: application/json      #设置消息的类型,本次为jsondefault-binder: defaultRabbitgroup: saveOrderGroup               #分组saveOrderInput: #生产者绑定,这个是消息通道的名称---> 保存订单输入通道destination: exchange-saveOrder     #exchange名称,交换模式默认是topic;把SpringCloud stream的消息输出通道绑定到RabbitMQ的exchange-saveOrder交换器。content-type: application/json      #设置消息的类型,本次为jsondefault-binder: defaultRabbitgroup: saveOrderGroup               #分组

3.3、生产

/*** 订单消息输出通道处理器*/
@Component
public interface OrderOutputChannelProcesor {@Output("saveOrderOutput")MessageChannel saveOrderOutput();
}
@Slf4j
@EnableBinding(OrderOutputChannelProcesor.class)
public class OrderMessageProducer {@Autowired@Output("saveOrderOutput")private MessageChannel messageChannel;public void sentMsg(UserInfo userInfo){messageChannel.send(MessageBuilder.withPayload(userInfo).build());log.info("消息发送成功:" + userInfo);}
}

3.4、消费

/*** 订单消息输入通道处理器*/
@Component
public interface OrderInputChannelProcesor {@Input("saveOrderInput")SubscribableChannel saveOrderInput();
}
@Slf4j
@EnableBinding(OrderInputChannelProcesor.class)
public class OrderMessageConsumer {@StreamListener("saveOrderInput")public void receiveMsg(Message<UserInfo> userInfoMessage){log.info("接收消息成功:" + userInfoMessage.getPayload());}
}

3.5、延迟队列

安装延迟队列插件:
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/3.11.1/rabbitmq_delayed_message_exchange-3.11.1.ez
下载解压,到plugins目录,执行以下的命令:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

3.5.1、修改配置文件

server:port: 8088spring:cloud:stream:binders: #需要绑定的rabbitmq的服务信息defaultRabbit:  #定义的名称,用于bidding整合type: rabbit  #消息组件类型environment:  #配置rabbimq连接环境spring:rabbitmq:host: localhost   #rabbitmq 服务器的地址port: 5672           #rabbitmq 服务器端口username: tiger       #rabbitmq 用户名password: tiger       #rabbitmq 密码virtual-host: tiger_vh  #虚拟路径bindings:        #服务的整合处理saveOrderOutput:    #这个是消息通道的名称 --->保存订单输出通道destination: exchange-saveOrder-delay     #exchange名称,交换模式默认是topic;把SpringCloud stream的消息输出通道绑定到RabbitMQ的exchange-saveOrder交换器。content-type: application/json      #设置消息的类型,本次为jsondefault-binder: defaultRabbitgroup: saveOrderGroup               #分组saveOrderInput: #生产者绑定,这个是消息通道的名称---> 保存订单输入通道destination: exchange-saveOrder-delay     #exchange名称,交换模式默认是topic;把SpringCloud stream的消息输出通道绑定到RabbitMQ的exchange-saveOrder交换器。content-type: application/json      #设置消息的类型,本次为jsondefault-binder: defaultRabbitgroup: saveOrderGroup               #分组rabbit:bindings: #服务的整合处理saveOrderOutput:    #这个是消息通道的名称 --->保存订单输出通道producer:delayed-exchange: truesaveOrderInput:consumer:delayed-exchange: true

3.5.2、生产端

@Slf4j
@EnableBinding(OrderOutputChannelProcesor.class)
public class OrderMessageProducer {@Autowired@Output("saveOrderOutput")private MessageChannel messageChannel;public void sentMsg(UserInfo userInfo){messageChannel.send(MessageBuilder.withPayload(userInfo).setHeader("x-delay", 5000).build());log.info("消息发送成功:" + userInfo);}
}

3.5.2、消息确认机制 消费端

rabbit:bindings: #服务的整合处理saveOrderInput:consumer:acknowledge-mode: MANUAL #手动确认
@StreamListener("saveOrderInput")
public void receiveMsg(Message<UserInfo> userInfoMessage){log.info("接收消息成功:" + userInfoMessage.getPayload());Channel channel = (Channel) userInfoMessage.getHeaders().get(AmqpHeaders.CHANNEL);Long delieverTag = (Long) userInfoMessage.getHeaders().get(AmqpHeaders.DELIVERY_TAG);/** deliveryTag:Channel的消息投递的唯一标识符。* multiple:是否否定应答多条消息。如果设置为true,则否定应答带指定deliveryTag的消息及该deliveryTag之前的多条消息;* 如果设置为false,则仅否定应答带指定deliveryTag的单条消息。* requeue:被否定应答的消息是否重入队列。如果设置为true,则消息重入队列;* 如果设置为false,则消息被丢弃或发送到死信Exchange。*/try {channel.basicAck(delieverTag,true);} catch (IOException e) {e.printStackTrace();}
}

定义交换机类型为direct

rabbit:bindings: #服务的整合处理saveOrderInput:consumer:bindingRoutingKey: orderRoutingKeybindQueue: trueexchangeType: directsaveOrderOutput:producer:routingKeyExpression: orderRoutingKeyexchangeType: direct

总结

spring-cloud-stream目前支持RabbitMQ和Kafka,与spring-cloud无缝集成,非常方便。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/139423.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Gradio App生产环境部署教程

如果机器学习模型没有投入生产供人们使用&#xff0c;就无法充分发挥其潜力。 根据我们的经验&#xff0c;将模型投入生产的最常见方法是为其创建 API。 然而&#xff0c;我们发现这个过程对于 ML 开发人员来说可能相当令人畏惧&#xff0c;特别是如果他们不熟悉 Web 开发的话。…

汽车ECU的虚拟化技术初探(一)

目录 1.为什么要提汽车ECU的虚拟化&#xff1f; 2.虚拟化技术分类 2.1 硬件虚拟化 2.2 操作系统虚拟化 问题引入&#xff1a; Hypervisor是如何来管理和隔离硬件资源&#xff0c;保证各个不同功能的应用程序的资源使用安全和资源调度&#xff1f;没有MMU就做不了虚拟化&am…

4.HTML网页开发的工具

4. 网页开发的工具 4.1 快捷键 4.1.1 快速复制一行 快捷键&#xff1a;shiftalt下箭头&#xff08;上箭头&#xff09; 或者ctrlc 然后 ctrlv 4.1.2 选定多个相同的单词 快捷键&#xff1a; ctrld 4.1.3 添加多个光标 快捷键&#xff1a;ctrlalt上箭头&#xff08;下箭头&…

CS224W6.2——深度学习基础

在本文中&#xff0c;我们回顾了深度学习的概念和技术&#xff0c;这些概念和技术对理解图神经网络至关重要。从将机器学习表述为优化问题开始&#xff0c;介绍了目标函数、梯度下降、非线性和反向传播的概念。 文章目录 1. 大纲2. 优化问题2.1 举例损失函数 3. 如何优化目标函…

MySQL on duplicate key update用法

基本使用方法 public static final String SQL_TQI_SINK "insert into " ConfigureContext.get(ConfigKeyConstants.MYSQL_TABLE_TQI) " \n" "(mile_km, mile_start_km, mile_start_m, is_out, tqi_alig_l, \n" "tqi_alig_r, tqi_surf_l…

Redis系列-Redis性能优化与安全【9】

目录 Redis系列-Redis性能优化与安全【9】Redis性能优化策略Redis安全设置与防护措施Redis监控与诊断工具介绍 七、Redis应用案例与实战八、Redis未来发展与趋势 个人主页: 【⭐️个人主页】 需要您的【&#x1f496; 点赞关注】支持 &#x1f4af; Redis系列-Redis性能优化与安…

2023年第十六届山东省职业院校技能大赛高职组“信息安全管理与评估”赛项规程

第十六届山东省职业院校技能大赛 高职组“信息安全管理与评估”赛项规程 一、赛项名称 赛项名称&#xff1a;信息安全管理与评估 英文名称&#xff1a;Information Security Management and Evaluation 赛项组别&#xff1a;高职组 赛项归属&#xff1a;电子与信息大类 二…

运行npm install卡住不动的几种解决方案

在前端开发经常会遇到运行npm install 来安装工具包一直卡住不动&#xff0c;为此这里提供几种解决方案&#xff0c;供大家参考学习&#xff0c;不足之处还请指正。 第一种方案、首先检查npm代理&#xff0c;是否已经使用国内镜像 // 执行以下命令查看是否为国内镜像 npm con…

虚假内容检测,谣言检测,不实信息检测,事实核查;纯文本,多模态,多语言;数据集整理

本博客系博主个人理解和整理所得&#xff0c;包含内容无法详尽&#xff0c;如有补充&#xff0c;欢迎讨论。 这里只提供数据集相关介绍和来源出处&#xff0c;或者下载地址等&#xff0c;因版权原因不提供数据集所含的元数据。如有需要&#xff0c;请自行下载。 “Complete d…

深度学习之基于Django+Tensorflow商品识别管理系统

欢迎大家点赞、收藏、关注、评论啦 &#xff0c;由于篇幅有限&#xff0c;只展示了部分核心代码。 文章目录 一项目简介 二、功能三、系统四. 总结 一项目简介 项目简介 本系统是一个基于DjangoTensorflow的商品识别管理系统。通过深度学习技术&#xff0c;实现商品的自动识别…

arduino 简易智能花盆

编辑器&#xff1a;arduino IDE 主板&#xff1a;arduino uno 传感器&#xff1a; 0.96寸的OLED屏&#xff08;四脚&#xff09; 声音模块 土壤温湿度模块 DS18B20温度模块&#xff08;这里用到防水的&#xff09; 光敏电阻模块&#xff08;买成三脚的了只能显示高低&#x…

el-table实现展开当前行时收起上一行的功能

<el-tableref"tableRef":data"tableData":expand-row-keys"expandRowKeys":row-key"handleRowKey" // 必须指定 row-keyexpand-change"handleExpandChange" // 当用户对某一行展开或者关闭的时候会触发该事件> <…

【算法专题】双指针—三数之和

力扣题目链接&#xff1a;三数之和 一、题目解析 二、算法原理 解法一&#xff1a;排序暴力枚举利用set去重 代码就不写了&#xff0c;你们可以试着写一下 解法二&#xff1a;排序双指针 这题和上一篇文章的两数字和方法类似 排序固定一个数a在这个数的后面区间&#xff0…

《詩經别解》——國風·周南·雎鳩​​​​​​​

一、关于古文的一个认识 目前可以阅读的古文经典&#xff0c;大多是经历了几千年的传承。期间的武力战争、文化纷争、宗教侵袭、官僚介入及文人的私人恩怨与流派桎梏&#xff0c;印刷与制作技术&#xff0c;导致这些古文全部都已经面目全非。简单地说&#xff0c;你读到的都是…

2022最新版-李宏毅机器学习深度学习课程-P46 自监督学习Self-supervised Learning(BERT)

一、概述&#xff1a;自监督学习模型与芝麻街 参数量 ELMO&#xff1a;94MBERT&#xff1a;340MGPT-2&#xff1a;1542MMegatron&#xff1a;8BT5&#xff1a;11BTuring NLG&#xff1a;17BGPT-3&#xff1a;175BSwitch Transformer&#xff1a;1.6T 二、Self-supervised Lear…

[HXPCTF 2021]includer‘s revenge

文章目录 方法一前置知识Nginx 在后端 Fastcgi 响应过大产生临时文件竞争包含绕过include_once限制 解题过程 方法二前置知识Base64 Filter 宽松解析iconv filter 解题过程 方法一 NginxFastCGI临时文件 前置知识 Nginx 在后端 Fastcgi 响应过大产生临时文件 www-data用户在n…

SharePoint 页面中插入自定义代码

我们都知道 SharePoint 是对页面进行编辑的。 对于一些有编程基础的人来说&#xff0c;可能需要对页面中插入代码&#xff0c;这样才能更好的对页面进行配置。 但是在新版本的 SharePoint modern 页面来说&#xff0c;虽然我们可以插入 Embed 组件。 但是 Embed 组件中是不允…

【C++基础 】类和对象(上)

C基础 类和对象&#xff08;上&#xff09; 1.面向过程和面向对象初步认识2.类的引入3.类的定义4.类的访问限定符及封装4.1 访问限定符4.2 封装 5.类的作用域6.类的实例化7.类对象模型7.1 如何计算类对象的大小7.2 类对象的存储方式猜测7.3 结构体内存对齐规则 8.this指针8.1 t…

Kubernetes实战(四)-部署docker harbor私有仓库

1 Docker原生私有仓库Registry 1.1 原生私有仓库Registry概述 Docker的仓库主要分两类&#xff1a; 私有仓库公有仓库 共有仓库只要在官方注册用户&#xff0c;登录即可使用。但对于仓库的使用&#xff0c;企业还是会有自己的专属镜像&#xff0c;所以私有库的搭建也是很有…

分享vmware和Oracle VM VirtualBox虚拟机的区别,简述哪一个更适合我?

VMware和Oracle VM VirtualBox虚拟机的区别主要体现在以下几个方面&#xff1a; 首先两种软件的安装使用教程如下&#xff1a; 1&#xff1a;VMware ESXI 安装使用教程 2&#xff1a;Oracle VM VirtualBox安装使用教程 商业模式&#xff1a;VMware是一家商业公司&#xff0c;而…