【实战】Spring Cloud Stream 3.1+整合Kafka

文章目录

    • 前言
    • 新版版本优势
    • 实战演示
      • 增加maven依赖
      • 增加applicaiton.yaml配置
      • 新增Kafka通道消费者
      • 新增发送消息的接口
    • 实战测试
      • postman发送一个正常的消息
      • postman发送异常消息

前言

之前我们已经整合过Spring Cloud Stream 3.0版本与Kafka、RabbitMQ中间件,简直不要太好,直接让我们不用再关心底层MQ如何集与消息收发。但是从Spring Cloud 2020版本开始,Spring Cloud Stream的版本升级至3.1.0以上版本,自此版本开始@StreamListener上面就增加@Deprecated注解,不赞成使用,有可能接下来的版本会删除掉。传说是有利于使用Project Reactor提供的事件流抽象(如Flux和Mono),命令函数在每个单独的事件上触发,而reactive函数只触发一次。故今天我们分享一期Spring Cloud Stream 3.1+整合Kafka,各位看官敬请鉴赏。

在这里插入图片描述

新版版本优势

新版提倡用函数式进行发送和消费信息

定义返回类型为Supplier, Function or Consumer的bean提供消息发送和消费的bean 看看绑定名称命名规则
input - + -in- +
output - + -out- +

在配置文件中指定spring.cloud.function.definition/spring.cloud.stream.function.definition的名称后会把这个bean绑定到对应的消费者和提供者上。

比如 inputChannel bean绑定了inputChannel-in-0通道,outputChannel bean绑定了outputChannel-out-0通道:

spring:kafka:bootstrap-servers: 192.168.112.10:9092,192.168.112.130:9092,192.168.112.129:9092cloud:stream:kafka:binder:brokers: ${spring.kafka.bootstrap-servers}binders:kafkahub:type: kafkaenvironment:spring:cloud:stream:kafka: ${spring.cloud.stream.kafka.binder}default-binder: kafkahub function:definition: inputChannel,outputChannelbindings:inputChannel-in-0:binder: kafkahubdestination: test-kafka-topicgroup: test-kafka-groupcontent-type: text/plainoutputChannel-out-0:binder: kafkahubdestination: test-kafka-topiccontent-type: text/plainproducer:partition-count: 3 #分区数目

此时消息生产者为:

@Resource
private StreamBridge streamBridge;@GetMapping("/send")
public Boolean sendMessageToKafka(String msg){boolean send = streamBridge.send("outputChannel-out-0", MessageBuilder.withPayload("kafka测试:"+msg).build());return send;
}

此时消息消费者为:

@Configuration
public class KafkaChannel {@Resourceprivate StreamBridge streamBridge;/*** inputChannel 消费者* @author senfel* @date 2024/6/18 15:26* @return java.util.function.Consumer<java.lang.String>*/@Beanpublic Consumer<Message<String>> inputChannel(){return message -> {System.out.println("接收到消息Payloa:" + message.getPayload());System.out.println("接收到消息Header:" + message.getHeaders());};}

}

实战演示

我们简单进行一下演示即可,kafka环境可以看我之前的博文搭建。
主要演示功能:

正常情况下生产者发送消息到kafka,消费者监听消息并消费成功
异常情况下消费者消费失败,立即将异常消息投递到另一个topic上,兜底topic消费者消费

本次全部采用自动ack模式,如果需要手动ack参照之前的博文配置即可,注意在消费者端加上手动ack逻辑。

增加maven依赖

 <parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.3.12.RELEASE</version><relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>cce-demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>seata-demo-order</name>
<description>Demo project for Spring Boot</description>
<properties><java.version>8</java.version><spring-cloud.version>Hoxton.SR12</spring-cloud.version>
</properties>
<dependencies><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-kafka</artifactId><version>3.2.4</version></dependency>
</dependencies>
<dependencyManagement><dependencies><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-dependencies</artifactId><version>${spring-cloud.version}</version><type>pom</type><scope>import</scope></dependency></dependencies>
</dependencyManagement>

增加applicaiton.yaml配置

spring:#kafkakafka:bootstrap-servers: 192.168.112.10:9092,192.168.112.130:9092,192.168.112.129:9092cloud:stream:kafka:  # kafka配置binder:brokers: ${spring.kafka.bootstrap-servers}auto-add-partitions: true #自动分区auto-create-topics: true #自动创建主题replication-factor: 3 #副本min-partition-count: 3 #最小分区bindings:outputChannel-out-0:producer:# 无限制重发不产生消息丢失retries: Integer.MAX_VALUE#acks =0:producer不等待broker的ack,broker一接收到还没有写入磁盘就已经返回,可靠性最低#acks =1:producer等待broker的ack,partition的leader刷盘成功后返回ack,如果在follower同步成功之前leader故障,那么将会丢失数据,可靠性中#acks = all 、 -1:producer等待broker的ack,partition的leader和follower全部落盘成功后才返回ack,可靠性高,但延迟时间长#可以设置的值为:all, -1, 0, 1acks: allmin:insync:replicas: 3 #感知副本数inputChannel-in-0:consumer:concurrency: 1 #消费者数量max-concurrency: 5 #最大消费者数量recovery-interval: 3000  #3s 重连auto-rebalance-enabled: true  #主题分区消费者组成员自动平衡auto-commit-offset: false   #手动提交偏移量enable-dlq: true  # 开启 dlq队列dlq-name: test-kafka-topic.dlqdeserializationExceptionHandler: sendToDlq #异常加入死信binders: # 与外部mq组件绑定kafkahub:type: kafkaenvironment:spring:cloud:stream:kafka: ${spring.cloud.stream.kafka.binder}default-binder: kafkahub #默认绑定function: # 定义channel名字,每个channel又可以作为生产者(in)与消费者(out)definition: inputChannel;outputChannel;dlqChannelbindings: # 通道绑定inputChannel-in-0:binder: kafkahubdestination: test-kafka-topicgroup: test-kafka-groupcontent-type: text/plainconsumer:maxAttempts: 1 # 尝试消费该消息的最大次数(消息消费失败后,发布者会重新投递)。默认3backOffInitialInterval: 1000 # 重试消费消息的初始化间隔时间。默认1s,即第一次重试消费会在1s后进行backOffMultiplier: 2 # 相邻两次重试之间的间隔时间的倍数。默认2backOffMaxInterval: 10000 # 下一次尝试重试的最大时间间隔,默认为10000ms,即10soutputChannel-out-0:binder: kafkahubdestination: test-kafka-topiccontent-type: text/plainproducer:partition-count: 3 #分区数目dlqChannel-in-0:binder: kafkahubdestination: test-kafka-topic.dlqgroup: test-kafka-groupcontent-type: text/plainconsumer:maxAttempts: 1 # 尝试消费该消息的最大次数(消息消费失败后,发布者会重新投递)。默认3backOffInitialInterval: 1000 # 重试消费消息的初始化间隔时间。默认1s,即第一次重试消费会在1s后进行backOffMultiplier: 2 # 相邻两次重试之间的间隔时间的倍数。默认2backOffMaxInterval: 10000 # 下一次尝试重试的最大时间间隔,默认为10000ms,即10sdlqChannel-out-0:binder: kafkahubdestination: test-kafka-topic.dlqcontent-type: text/plainproducer:partition-count: 3 #分区数目

新增Kafka通道消费者

import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import javax.annotation.Resource;
import java.util.function.Consumer;/*** KafkaCustomer* @author senfel* @version 1.0* @date 2024/6/18 15:22*/
@Configuration
public class KafkaChannel {@Resourceprivate StreamBridge streamBridge;/*** inputChannel 消费者* @author senfel* @date 2024/6/18 15:26* @return java.util.function.Consumer<java.lang.String>*/@Beanpublic Consumer<Message<String>> inputChannel(){return message -> {System.out.println("接收到消息:" + message.getPayload());System.out.println("接收到消息:" + message.getHeaders());if(message.getPayload().contains("9")){boolean send = streamBridge.send("dlqChannel-out-0", MessageBuilder.withPayload("kafka异常消息发送到dlq测试:"+message).build());System.err.println("向dlqChannel发送消息:"+send);}};}/*** dlqChannel 死信消费者* @author senfel* @date 2024/6/18 15:26* @return java.util.function.Consumer<java.lang.String>*/@Beanpublic Consumer<Message<String>> dlqChannel(){return message -> {System.out.println("死信dlqChannel接收到消息:" + message.getPayload());System.out.println("死信dlqChannel接收到消息:" + message.getHeaders());};}
}

新增发送消息的接口

@Resource
private StreamBridge streamBridge;@GetMapping("/send")
public Boolean sendMessageToKafka(String msg){boolean send = streamBridge.send("outputChannel-out-0", MessageBuilder.withPayload("kafka测试:"+msg).build());return send;
}

实战测试

postman发送一个正常的消息

在这里插入图片描述

postman发送异常消息

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/30032.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

华为鸿蒙 使用router跳转页面 和 router.getParams接收参数并使用参数 [最简单 最直接 的详细教程 ]

1, 准备两个页面 1. pages/DetailPage.ets 2. pages/Index.ets 2, 代码直接 cv 页面 // pages/Index.ets import router from ohos.router// 参数类型 class User {name: stringage: number }Entry Component struct Index {// 要传的参数Stateuser: User {name: John,…

【解决方案】Java 互联网项目中消息通知系统的设计与实现

前言 消息通知系统&#xff08;notification-system&#xff09;作为一个独立的微服务&#xff0c;完整地负责了 App 端内所有消息通知相关的后端功能实现。该系统既需要与文章系统、订单系统、会员系统等相关联&#xff0c;也需要和其它业务系统相关联&#xff0c;是一个偏底层…

docker-compose设置永久启动、自动重启

步骤一 找到 docker-compose.yml 文件 步骤二 vim 打开文件 找到 image: PS&#xff1a;就是为了对齐格式 步骤三 在其下方添加&#xff1a; restart: always而后保存即可

注意力机制简介

为了减少计算复杂度&#xff0c;通过借鉴生物神经网络的一些机制&#xff0c;我们引入了局部连接、权重共享以及汇聚操作来简化神经网络结构。神经网络中可以存储的信息量称为网络容量。一般来讲&#xff0c;利用一组神经元来存储信息的容量和神经元的数量以及网络的复杂度成正…

表面声波滤波器——工艺 (5)

制作工艺流程 声表面波器件制作采用半导体集成电路的平面工艺,首先在压电衬底上通过光刻、镀膜、剥离或刻蚀等工艺制备出叉指换能器&#xff0c;然后经过划片、粘片、压丝、封焊等后续封装工艺得到最后的器件。 整个工艺过程中需要操作使用各种机台 清洗机光刻机涂胶显影台全…

专业和学校到底怎么选,兴趣和知名度到底哪个重要?

前言 2024高考已经落下帷幕&#xff0c;再过不久就到了激动人心的查分和填报志愿的时刻&#xff0c;在那天到来&#xff0c;小伙伴们就要根据自己的分数选取院校和专业&#xff0c;接下来我就以参加22年(破防年)河南高考的大二生来讲述一下我自己对于如何选取院校和专业的看法以…

香港电讯高可用网络助力企业变革金融计算

客户背景 客户是一家金融行业知名的量化私募对冲基金公司&#xff0c;专注于股票、期权、期货、债券等主要投资市场&#xff0c;在量化私募管理深耕多年&#xff0c;目前资管规模已达数百亿级&#xff0c;在国内多个城市均设有办公地点。 客户需求 由于客户业务倚重量化技术…

从“野人饭”走红,探索品牌户外化营销趋势丨小红书内容分析

wildeat&#xff0c;户外是人的天性的回归 近来&#xff0c;“wildeat&#xff08;户外野吃&#xff09;”的风潮在小红书逐渐兴起。越来越多的人选择到户外吃一顿&#xff0c;做一次“野人”&#xff0c;主打一个只要氛围到了&#xff0c;就地开饭&#xff0c;不愁吃什么&…

屏蔽房是做什么用的?为什么需要定期检测?

屏蔽房对于不了解的人来说&#xff0c;可能光看名字不知道是做什么的&#xff0c;但是对于一些企业或者机构&#xff0c;却是再熟悉不过的了。和名字一样&#xff0c;屏蔽房是对空间内的信号以及一些外界环境条件进行隔绝&#xff0c;在一些有特殊要求的企业机构中&#xff0c;…

睿治数据治理平台焕新升级,推出全新建模与调度平台

在数据治理的浩瀚征途中&#xff0c;企业常常面临着数据冗余如同连绵山峦&#xff0c;使得关键信息的获取变得困难重重&#xff1b;在数据检索的海洋中&#xff0c;有时迷失方向&#xff0c;消耗大量时间精力&#xff0c;严重影响了运营效率&#xff1b;特别是在处理大规模数据…

assertJ-db 科普

前言 今日我们看看 java 大名鼎鼎的 assertj 是怎么做断言的 数据库断言 在实际的测试中我们总是跟业务打交道的。跟业务打交道一般很难避免验证数据库中的东西。尤其在接口测试中&#xff0c;一个常见的例子是你测试一个下单的接口。 接口返回可能就是成功过或者失败。你无…

安装 Fedora CoreOS 操作系统

首发日期 2024-06-16, 以下为原文内容: 有一台吃灰几年的 e5-26v3 古老机器, 最近翻出来用一下. 首先从安装操作系统开始. 目录 1 FCOS 简介2 安装过程 2.1 下载 iso 镜像文件并制作安装 U 盘2.2 编写安装配置文件2.3 编译安装配置文件2.4 从 U 盘启动并安装 3 SSH 连接并测试…

直播平台美颜技术分析:视频美颜SDK功能实现原理

本篇文章&#xff0c;笔者将深入分析视频美颜SDK的功能实现原理&#xff0c;探讨其在直播平台中的应用。 一、视频美颜技术概述 通过这些功能&#xff0c;用户可以在直播过程中呈现更加理想的自己&#xff0c;从而提高观众的观看体验和互动积极性。 二、视频美颜SDK的功能 1…

不“卷”低价,品牌如何让客户愿意“留下”?

天猫取消预售制度&#xff0c;满减力度大于往年&#xff0c;京东直接将“又便宜又好”定为大促主题。今年的618&#xff0c;离不开两大关键词&#xff1a;拼低价 和 回归用户。 价格“内卷”&#xff0c;消费者可以花更少的钱买到商品&#xff0c;但对商家来说&#xff0c;意味…

6月19日(周三)A股行情总结:A股震荡收跌,恒生科技指数大涨3%,10年期国债期货转涨续创新高

内容提要 车路云概念延续昨日涨势&#xff0c;华铭智能20CM 3连板。贵金属及PEEK材料概念全日走强&#xff1b;港股有色金属及能源股走强&#xff0c;紫金矿业涨超3%&#xff0c;中石油涨超3%。国债期货午后全线转涨&#xff0c;10年期主力合约涨0.05%报104.925元&#xff0c;…

6.17继承

面向对象的特征&#xff1a;封装&#xff0c;继承&#xff0c;多态 使用背景&#xff1a;比如说在动物类底下可以有带毛的动物&#xff0c;带毛的动物符合所有的动物的特征&#xff0c;只是在这个基础上再继续添加一些特征 命名&#xff1a;原有类型称为“基类”或“父类”&a…

计算机网络(1) OSI七层模型与TCP/IP四层模型

一.OSI七层模型 OSI 七层模型是国际标准化组织ISO提出的一个网络分层模型&#xff0c;它的目的是使各种不同的计算机和网络在世界范围内按照相同的标准框架实现互联。OSI 模型把网络通信的工作分为 7 层&#xff0c;从下到上分别是物理层、数据链路层、网络层、传输层、会话层、…

类加载器、反射、注解

1、类加载器 1.1类加载器作用 负责将.class文件&#xff08;存储的物理文件&#xff09;加载在到内存中。 1.2类加载的过程 类加载时机 创建类的实例&#xff08;对象&#xff09;调用类的类方法访问类或者接口的类变量&#xff0c;或者为该类变量赋值使用反射方式来强制创建…

Java学习 (一) 环境安装及入门程序

一、安装java环境 1、获取软件包 https://www.oracle.com/java/technologies/downloads/ .exe 文件一路装过去就行&#xff0c;最好别装c盘 &#xff0c;我这里演示的时候是云主机只有C盘 2、配置环境变量 我的电脑--右键属性--高级系统设置--环境变量 在环境变量中添加如下配…

数字孪生涉及到的9大技术栈,都是难啃骨头呀。

数字孪生涉及到多个技术栈&#xff0c;包括但不限于以下几个方面&#xff1a; 数据采集和传感器技术&#xff1a; 数字孪生需要实时获取物理世界的数据&#xff0c;因此需要使用各种传感器技术&#xff08;如温度传感器、压力传感器、运动传感器等&#xff09;来采集数据&…