SpringCloud Stream消息驱动

为啥有这个技术???

1. 这个stream是操作消息队列的,简化,学习消息队列的成本降低。
2. 可操作rabbitMQ兔子message queue,kafaka,可理解为jdbc可操作oracle, mysql..
3. spring家的技术学就完了。。

stream

    • 对消息驱动需要了解的概念
    • 消息驱动生产者,消费者
    • 再建一个服务,消息消费者,问题

对消息驱动需要了解的概念

(1) 网站 文档
https://spring.io/projects/spring-cloud-stream#overview

https://cloud.spring.io/spring-cloud-static/spring-cloud-stream/3.0.1.RELEASE/reference/html/


(2) 介绍

什么是SpringCloudStream
官方定义Spring Cloud Stream是一个构建消息驱动微服务的框架。
应用程序通过inputs或者outputs与Spring Cloud Stream中binder对象交互。
通过我们配置来binding(绑定),
而Spring Cloud Stream的binder对象负责与消息中间件交互。
所以,我们只需要搞清楚如何与Spring Cloud Stream
交互就可以访便使用消息驱动的方式。
通过使用Spring Integration来连接消息代理中间件以实现消息事件驱动。
Spring Cloud Stream为一些供应商的消息中间件产品提供了个性化的自动化配置实现,
引用了发布-订阅、消费组、分区的三个核心概念。
目前仅支持RabbitMQ、Kafka.

(3) 标准mq

  • Message

    • 生产者/消费者之间靠消息媒介传递信息内容
  • 消息通道MessageChannel

    • 消息必须走特定的通道
  • 消息通道MessageChannel的子接口SubscribableChannel,由MessageHandler消息处理器订阅

    • 消息通道里的消息如何被消费呢,谁负责收发处理

(4) 为什么用Cloud Stream

1 绑定stream凭什么可以统一底层差异

  • 在没有绑定器这个概念的情况下,我们的SpringBoot应用要直接与消息中间件进行信息交互的时候,
    于各消息中间件构建的初衷不同,它们的实现细节上会有较大的差异性
    通过定义绑定器作为中间层,完美地实现了应用程序与消息中间件细节之间的隔离。
    通过向应用程序暴露统- -的Channel通道, 使得应用程序不需要再考虑各种不同的消息中间件实现。

2 binder架构

在这里插入图片描述

INPUT对应于消费者
OUTPUT对应于生产者

(5) Stream中的消息通信方式遵循了发布-订阅模式

Topic主题进行广播
在RabbitMQ就是Exchange 交换机
在kafka中就是Topic

(6) 常用api,注解

在这里插入图片描述

消息驱动生产者,消费者

  1. 生产者
    pom
        <dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId></dependency>

ymal
server:port: 8802spring:application:name: cloud-stream-consumercloud:stream:binders: # 在此处配置要绑定的rabbitmq的服务信息;defaultRabbit: # 表示定义的名称,用于于binding整合type: rabbit # 消息组件类型environment: # 设置rabbitmq的相关的环境配置spring:rabbitmq:host: localhostport: 5672username: guestpassword: guestbindings: # 服务的整合处理input: # 这个名字是一个通道的名称destination: studyExchange # 表示要使用的Exchange名称定义content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”binder: defaultRabbit  # 设置要绑定的消息服务的具体设置eureka:client: # 客户端进行Eureka注册的配置service-url:defaultZone: http://localhost:7001/eurekainstance:lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒)lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒)instance-id: receive-8802.com  # 在信息列表时显示主机名称prefer-ip-address: true     # 访问的路径变为IP地址

发送消息接口,实现

public interface IMessageProvider
{public String send();
}/// 实现 
import com.atguigu.springcloud.service.IMessageProvider;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.integration.support.MessageBuilderFactory;
import org.springframework.messaging.MessageChannel;
import org.springframework.integration.support.MessageBuilder;
import javax.annotation.Resource;
import org.springframework.cloud.stream.messaging.Source;import javax.annotation.Resource;
import java.util.UUID;@EnableBinding(Source.class) //定义消息的推送管道
public class MessageProviderImpl implements IMessageProvider
{@Resourceprivate MessageChannel output; // 消息发送管道@Overridepublic String send(){String serial = UUID.randomUUID().toString();output.send(MessageBuilder.withPayload(serial).build());System.out.println("*****serial: "+serial);return null;}
}
  1. 消费者
    yaml, pom同时,yaml要改端口。
    定义controller接收消息
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.Message;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.stereotype.Component;@Component
@EnableBinding(Sink.class)
public class ReceiveMessageListenerController {@Value("${server.port}")private String serverPort;@StreamListener(Sink.INPUT)public void input(Message<String> message) {System.out.println("消费者1号,接受:"+message.getPayload()+"\t port:"+serverPort);}}

再建一个服务,消息消费者,问题

1. 消息重复。发送者发送消息,两个消费者都会接收到消息,如果是支付多个模块,
收到一条消息,多个模块会收到坏账,需要分组,只对一个支付模块发消息.
2. 消息持久化。当关掉消费者,消息丢失。

a. 新增group配置,自定义group

group: damn

b. 持久化,服务挂了,保证消息不丢失

页数配置分组解决

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/420436.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

前端学习(1291):nodejs的系统模块文件读取操作

//通过模块对模块进行引入 const fs require(fs); //读取文件 fs.readFile(./demo01.js, utf8, (err, doc) > {console.log(err);console.log(doc); }) 运行结果

解决MySQL忘记root密码

网上有很多关于忘记MySQL root密码的一些文章&#xff0c;里面都有写怎么去解决&#xff0c;但有时觉得写得太恶心&#xff0c;要么一字不漏的抄别人的&#xff0c;要么就说得不清不楚&#xff0c;好了&#xff0c;不吐槽了&#xff0c;以下是解决的整个过程。 首先我们要知道忘…

SpringCloud Sleuth分布式请求链路追踪

概念 1. 为什么需要链路追踪&#xff1f; 在微服务框架中&#xff0c;一个由客户端发起的请求在后端系统中 会经过多个不同的的服务节点调用来协同产生最后的请求结果, 每一个前段请求都会形成一复杂的分布式服务调用链路, 链路中的任何一环出现高延时或错误都会引起整个请求最…

前端学习(1292):文件写入操作

const fs require(fs);fs.writeFile(./demo.txt, 即将要写入的内容, err > {if (err ! null) {console.log(err);return;}console.log(文件内容写入成功); }) 运行结果

Android中如何下载文件并显示下载进度

原文地址&#xff1a;http://jcodecraeer.com/a/anzhuokaifa/androidkaifa/2014/1125/2057.html 这里主要讨论三种方式&#xff1a;AsyncTask、Service和使用DownloadManager。 一、使用AsyncTask并在进度对话框中显示下载进度 这种方式的优势是你可以在后台执行下载任务的同时…

前端学习(1293):系统模块path路径操作

//导入path模块 const path require(path); //路径拼接 const finaPath path.join(public, uploads, avater); console.log(finaPath); 运行结果

nacos作为服务注册中心

nacosnacos简介nacos 作为服务注册中心demo基于Nacos的服务提供者基于Nacos的服务消费者nacos切换ap和cp 模式nacos简介 为什么叫nacos 前四个字母分别为Naming和Configuration的前两个字母&#xff0c;最后的s为Service。是什么 一个更易于构建云原生应用的动态服务发现&am…

前端学习(1294):相对路径和绝对路径

const fs require(fs); const path require(path); console.log(__dirname); console.log(path.join(__dirname, ./demo01.js)); fs.readFile(path.join(__dirname, ./demo01.js), utf8, (err, doc) > {console.log(err);console.log(doc); }) 运行结果

layui 数据表格代码

一套增删改查&#xff0c;打完收工。 layui版本&#xff1a;2.4.5 默认请求,分页。 /rest_address?page1&limit1json数据格式要求. 参数说明文档 https://www.layui.com/doc/modules/table.html#cols <!DOCTYPE html> <html lang"en" xmlns:th"…

[BZOJ 1085] [SCOI2005] 骑士精神 [ IDA* 搜索 ]

题目链接 : BZOJ 1085 题目分析 : 本题中可能的状态会有 (2^24) * 25 种状态&#xff0c;需要使用优秀的搜索方式和一些优化技巧。 我使用的是 IDA* 搜索&#xff0c;从小到大枚举步数&#xff0c;每次 DFS 验证在当前枚举的步数之内能否到达目标状态。 如果不能到达&#xff0…

nacos服务配置中心演示

config centerNacos作为配置中心-基础配置Nacos作为配置中心-分类配置nacos将配置持久化到mysql新型技术&#xff0c;替代spring config center & bus Nacos作为配置中心-基础配置 ⑴ module cloudalibaba-config-nacos-client3377 (2) pom <dependencies><!--n…

前端学习(1296):第三方模块nodemon

修改保存重新执行 如何断开ctrlc

core java 8~9(GUI AWT事件处理机制)

MODULE 8 GUIs--------------------------------GUI中的包&#xff1a; java.awt.*; javax.swing.*; java.awt.event.*; 要求:1)了解GUI的开发流程&#xff1b;2&#xff09;掌握常用的布局管理器 开发GUI图形界面的步骤-------------------------------1.选择容器 1&#xff0…

note.. redis五大数据类型

redis 五大数据类型使用nosql介绍&#xff0c;由来什么是nosql阿里巴巴的架构nosql 四大分类redis入门概述redis 安装 &#xff08;docker&#xff09;基础的知识redis五大数据类型Redis-KeyStringList (列表)Set &#xff08;集合&#xff09;Hash(哈希)Zset 有序集合nosql介绍…

Arcengine 基本操作(待更新)

/// <summary>/// 删除fieldName属性值为1的弧段/// </summary>/// <param name"fieldName"></param>/// <param name"t"></param>public void DelectPolyline(string fieldName, int t){ILayer pLayer axMapControl…

redis 三种特殊数据类型

三种特性数据类型 geospatial 定位&#xff0c;附近的人&#xff0c;打车距离计算。 redis的geo在redis3.2版本就推出了。可推算地理位置的信息&#xff0c;两地之间的距离&#xff0c;方圆几里的人。 6个命令。 GEOADD GEODIST GEOHASH GEOPOS GEORADIUS GEORADIUSBYMEMB…

前端学习(1298):gulp使用

第一步安装 第二步建立文件夹 第三部 src放源代码 第四步 输入代码 执行