【实战】Spring Cloud Stream 3.1+整合Kafka

文章目录

    • 前言
    • 新版版本优势
    • 实战演示
      • 增加maven依赖
      • 增加applicaiton.yaml配置
      • 新增Kafka通道消费者
      • 新增发送消息的接口
    • 实战测试
      • postman发送一个正常的消息
      • postman发送异常消息

前言

之前我们已经整合过Spring Cloud Stream 3.0版本与Kafka、RabbitMQ中间件,简直不要太好,直接让我们不用再关心底层MQ如何集与消息收发。但是从Spring Cloud 2020版本开始,Spring Cloud Stream的版本升级至3.1.0以上版本,自此版本开始@StreamListener上面就增加@Deprecated注解,不赞成使用,有可能接下来的版本会删除掉。传说是有利于使用Project Reactor提供的事件流抽象(如Flux和Mono),命令函数在每个单独的事件上触发,而reactive函数只触发一次。故今天我们分享一期Spring Cloud Stream 3.1+整合Kafka,各位看官敬请鉴赏。

在这里插入图片描述

新版版本优势

新版提倡用函数式进行发送和消费信息

定义返回类型为Supplier, Function or Consumer的bean提供消息发送和消费的bean 看看绑定名称命名规则
input - + -in- +
output - + -out- +

在配置文件中指定spring.cloud.function.definition/spring.cloud.stream.function.definition的名称后会把这个bean绑定到对应的消费者和提供者上。

比如 inputChannel bean绑定了inputChannel-in-0通道,outputChannel bean绑定了outputChannel-out-0通道:

spring:kafka:bootstrap-servers: 192.168.112.10:9092,192.168.112.130:9092,192.168.112.129:9092cloud:stream:kafka:binder:brokers: ${spring.kafka.bootstrap-servers}binders:kafkahub:type: kafkaenvironment:spring:cloud:stream:kafka: ${spring.cloud.stream.kafka.binder}default-binder: kafkahub function:definition: inputChannel,outputChannelbindings:inputChannel-in-0:binder: kafkahubdestination: test-kafka-topicgroup: test-kafka-groupcontent-type: text/plainoutputChannel-out-0:binder: kafkahubdestination: test-kafka-topiccontent-type: text/plainproducer:partition-count: 3 #分区数目

此时消息生产者为:

@Resource
private StreamBridge streamBridge;@GetMapping("/send")
public Boolean sendMessageToKafka(String msg){boolean send = streamBridge.send("outputChannel-out-0", MessageBuilder.withPayload("kafka测试:"+msg).build());return send;
}

此时消息消费者为:

@Configuration
public class KafkaChannel {@Resourceprivate StreamBridge streamBridge;/*** inputChannel 消费者* @author senfel* @date 2024/6/18 15:26* @return java.util.function.Consumer<java.lang.String>*/@Beanpublic Consumer<Message<String>> inputChannel(){return message -> {System.out.println("接收到消息Payloa:" + message.getPayload());System.out.println("接收到消息Header:" + message.getHeaders());};}

}

实战演示

我们简单进行一下演示即可,kafka环境可以看我之前的博文搭建。
主要演示功能:

正常情况下生产者发送消息到kafka,消费者监听消息并消费成功
异常情况下消费者消费失败,立即将异常消息投递到另一个topic上,兜底topic消费者消费

本次全部采用自动ack模式,如果需要手动ack参照之前的博文配置即可,注意在消费者端加上手动ack逻辑。

增加maven依赖

 <parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.3.12.RELEASE</version><relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>cce-demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>seata-demo-order</name>
<description>Demo project for Spring Boot</description>
<properties><java.version>8</java.version><spring-cloud.version>Hoxton.SR12</spring-cloud.version>
</properties>
<dependencies><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-kafka</artifactId><version>3.2.4</version></dependency>
</dependencies>
<dependencyManagement><dependencies><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-dependencies</artifactId><version>${spring-cloud.version}</version><type>pom</type><scope>import</scope></dependency></dependencies>
</dependencyManagement>

增加applicaiton.yaml配置

spring:#kafkakafka:bootstrap-servers: 192.168.112.10:9092,192.168.112.130:9092,192.168.112.129:9092cloud:stream:kafka:  # kafka配置binder:brokers: ${spring.kafka.bootstrap-servers}auto-add-partitions: true #自动分区auto-create-topics: true #自动创建主题replication-factor: 3 #副本min-partition-count: 3 #最小分区bindings:outputChannel-out-0:producer:# 无限制重发不产生消息丢失retries: Integer.MAX_VALUE#acks =0:producer不等待broker的ack,broker一接收到还没有写入磁盘就已经返回,可靠性最低#acks =1:producer等待broker的ack,partition的leader刷盘成功后返回ack,如果在follower同步成功之前leader故障,那么将会丢失数据,可靠性中#acks = all 、 -1:producer等待broker的ack,partition的leader和follower全部落盘成功后才返回ack,可靠性高,但延迟时间长#可以设置的值为:all, -1, 0, 1acks: allmin:insync:replicas: 3 #感知副本数inputChannel-in-0:consumer:concurrency: 1 #消费者数量max-concurrency: 5 #最大消费者数量recovery-interval: 3000  #3s 重连auto-rebalance-enabled: true  #主题分区消费者组成员自动平衡auto-commit-offset: false   #手动提交偏移量enable-dlq: true  # 开启 dlq队列dlq-name: test-kafka-topic.dlqdeserializationExceptionHandler: sendToDlq #异常加入死信binders: # 与外部mq组件绑定kafkahub:type: kafkaenvironment:spring:cloud:stream:kafka: ${spring.cloud.stream.kafka.binder}default-binder: kafkahub #默认绑定function: # 定义channel名字,每个channel又可以作为生产者(in)与消费者(out)definition: inputChannel;outputChannel;dlqChannelbindings: # 通道绑定inputChannel-in-0:binder: kafkahubdestination: test-kafka-topicgroup: test-kafka-groupcontent-type: text/plainconsumer:maxAttempts: 1 # 尝试消费该消息的最大次数(消息消费失败后,发布者会重新投递)。默认3backOffInitialInterval: 1000 # 重试消费消息的初始化间隔时间。默认1s,即第一次重试消费会在1s后进行backOffMultiplier: 2 # 相邻两次重试之间的间隔时间的倍数。默认2backOffMaxInterval: 10000 # 下一次尝试重试的最大时间间隔,默认为10000ms,即10soutputChannel-out-0:binder: kafkahubdestination: test-kafka-topiccontent-type: text/plainproducer:partition-count: 3 #分区数目dlqChannel-in-0:binder: kafkahubdestination: test-kafka-topic.dlqgroup: test-kafka-groupcontent-type: text/plainconsumer:maxAttempts: 1 # 尝试消费该消息的最大次数(消息消费失败后,发布者会重新投递)。默认3backOffInitialInterval: 1000 # 重试消费消息的初始化间隔时间。默认1s,即第一次重试消费会在1s后进行backOffMultiplier: 2 # 相邻两次重试之间的间隔时间的倍数。默认2backOffMaxInterval: 10000 # 下一次尝试重试的最大时间间隔,默认为10000ms,即10sdlqChannel-out-0:binder: kafkahubdestination: test-kafka-topic.dlqcontent-type: text/plainproducer:partition-count: 3 #分区数目

新增Kafka通道消费者

import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import javax.annotation.Resource;
import java.util.function.Consumer;/*** KafkaCustomer* @author senfel* @version 1.0* @date 2024/6/18 15:22*/
@Configuration
public class KafkaChannel {@Resourceprivate StreamBridge streamBridge;/*** inputChannel 消费者* @author senfel* @date 2024/6/18 15:26* @return java.util.function.Consumer<java.lang.String>*/@Beanpublic Consumer<Message<String>> inputChannel(){return message -> {System.out.println("接收到消息:" + message.getPayload());System.out.println("接收到消息:" + message.getHeaders());if(message.getPayload().contains("9")){boolean send = streamBridge.send("dlqChannel-out-0", MessageBuilder.withPayload("kafka异常消息发送到dlq测试:"+message).build());System.err.println("向dlqChannel发送消息:"+send);}};}/*** dlqChannel 死信消费者* @author senfel* @date 2024/6/18 15:26* @return java.util.function.Consumer<java.lang.String>*/@Beanpublic Consumer<Message<String>> dlqChannel(){return message -> {System.out.println("死信dlqChannel接收到消息:" + message.getPayload());System.out.println("死信dlqChannel接收到消息:" + message.getHeaders());};}
}

新增发送消息的接口

@Resource
private StreamBridge streamBridge;@GetMapping("/send")
public Boolean sendMessageToKafka(String msg){boolean send = streamBridge.send("outputChannel-out-0", MessageBuilder.withPayload("kafka测试:"+msg).build());return send;
}

实战测试

postman发送一个正常的消息

在这里插入图片描述

postman发送异常消息

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/30032.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

华为鸿蒙 使用router跳转页面 和 router.getParams接收参数并使用参数 [最简单 最直接 的详细教程 ]

1, 准备两个页面 1. pages/DetailPage.ets 2. pages/Index.ets 2, 代码直接 cv 页面 // pages/Index.ets import router from ohos.router// 参数类型 class User {name: stringage: number }Entry Component struct Index {// 要传的参数Stateuser: User {name: John,…

避免指针成员重复释放

类中有指针成员时&#xff0c;如何避免重复释放 在下面的代码中&#xff0c;类A保存有1个指针&#xff0c;并且理所当然的在构造函数中分配内存&#xff0c;析构中释放内存&#xff0c;但会发生重复释放的问题。 原因&#xff1a;编译器自动生成如下的拷贝构造(析构函数不影响拷…

学习分享-Tomcat 的线程池在工作方式上与普通的 Java 线程池的区别

前言 最近在学习过程中遇到在某个场景下&#xff1a;修改某条数据时&#xff0c;给该线程上分布式写锁&#xff0c;然后引入延迟队列处理其他请求&#xff1b;这个方案有一定的缺点&#xff0c;因为在用到消息队列时&#xff0c;不存在占用过多线程从而导致OOM的问题&#xff…

【解决方案】Java 互联网项目中消息通知系统的设计与实现

前言 消息通知系统&#xff08;notification-system&#xff09;作为一个独立的微服务&#xff0c;完整地负责了 App 端内所有消息通知相关的后端功能实现。该系统既需要与文章系统、订单系统、会员系统等相关联&#xff0c;也需要和其它业务系统相关联&#xff0c;是一个偏底层…

docker-compose设置永久启动、自动重启

步骤一 找到 docker-compose.yml 文件 步骤二 vim 打开文件 找到 image: PS&#xff1a;就是为了对齐格式 步骤三 在其下方添加&#xff1a; restart: always而后保存即可

注意力机制简介

为了减少计算复杂度&#xff0c;通过借鉴生物神经网络的一些机制&#xff0c;我们引入了局部连接、权重共享以及汇聚操作来简化神经网络结构。神经网络中可以存储的信息量称为网络容量。一般来讲&#xff0c;利用一组神经元来存储信息的容量和神经元的数量以及网络的复杂度成正…

js算数数据失真

起因 数字的运算失真问题在每个语言中都有体现&#xff0c;在java中使用BigDecimal就可以很好的避免这种情况。前端中没有这样一种类型来处理这种情况 引入别人写好的包 一开始准备自己写&#xff0c;但是觉得太麻烦可以使用 BigNumber.js 这样的库来执行精确的除法运算&#x…

表面声波滤波器——工艺 (5)

制作工艺流程 声表面波器件制作采用半导体集成电路的平面工艺,首先在压电衬底上通过光刻、镀膜、剥离或刻蚀等工艺制备出叉指换能器&#xff0c;然后经过划片、粘片、压丝、封焊等后续封装工艺得到最后的器件。 整个工艺过程中需要操作使用各种机台 清洗机光刻机涂胶显影台全…

京东商品详情API:解锁电商数据的金钥匙

引言 京东开放平台为开发者提供了丰富的API资源&#xff0c;其中商品详情API尤其受到关注。它允许第三方应用和服务直接获取京东商城内商品的详尽信息&#xff0c;这对于电商平台、价格比较网站、数据分析公司以及移动应用开发商来说&#xff0c;都是一个宝贵的工具。本文将深…

Clickhouse副本和分片的概念

副本 https://clickhouse.com/docs/zh/engines/table-engines/mergetree-family/replication 副本是表级别的&#xff0c;不是整个服务器级的。所以&#xff0c;服务器里可以同时有复制表和非复制表。 副本不依赖分片。每个分片有它自己的独立副本。 ClickHouse 使用 Apache Zo…

专业和学校到底怎么选,兴趣和知名度到底哪个重要?

前言 2024高考已经落下帷幕&#xff0c;再过不久就到了激动人心的查分和填报志愿的时刻&#xff0c;在那天到来&#xff0c;小伙伴们就要根据自己的分数选取院校和专业&#xff0c;接下来我就以参加22年(破防年)河南高考的大二生来讲述一下我自己对于如何选取院校和专业的看法以…

香港电讯高可用网络助力企业变革金融计算

客户背景 客户是一家金融行业知名的量化私募对冲基金公司&#xff0c;专注于股票、期权、期货、债券等主要投资市场&#xff0c;在量化私募管理深耕多年&#xff0c;目前资管规模已达数百亿级&#xff0c;在国内多个城市均设有办公地点。 客户需求 由于客户业务倚重量化技术…

从“野人饭”走红,探索品牌户外化营销趋势丨小红书内容分析

wildeat&#xff0c;户外是人的天性的回归 近来&#xff0c;“wildeat&#xff08;户外野吃&#xff09;”的风潮在小红书逐渐兴起。越来越多的人选择到户外吃一顿&#xff0c;做一次“野人”&#xff0c;主打一个只要氛围到了&#xff0c;就地开饭&#xff0c;不愁吃什么&…

Ubuntu中防火墙的使用 和 开放 关闭 端口

目录 1.查看防火墙的状态 2.开启ufw防火墙 3.重启ufw防火墙 4.关闭ufw防火墙 5.设置外来访问默认权限 6.开放普通端口 7.关闭普通端口 8.开放规定协议的端口 9.关闭指定协议端口 10.重启防火墙&#xff0c;是配置生效 1.查看防火墙的状态 sudo ufw status 2.开启uf…

屏蔽房是做什么用的?为什么需要定期检测?

屏蔽房对于不了解的人来说&#xff0c;可能光看名字不知道是做什么的&#xff0c;但是对于一些企业或者机构&#xff0c;却是再熟悉不过的了。和名字一样&#xff0c;屏蔽房是对空间内的信号以及一些外界环境条件进行隔绝&#xff0c;在一些有特殊要求的企业机构中&#xff0c;…

睿治数据治理平台焕新升级,推出全新建模与调度平台

在数据治理的浩瀚征途中&#xff0c;企业常常面临着数据冗余如同连绵山峦&#xff0c;使得关键信息的获取变得困难重重&#xff1b;在数据检索的海洋中&#xff0c;有时迷失方向&#xff0c;消耗大量时间精力&#xff0c;严重影响了运营效率&#xff1b;特别是在处理大规模数据…

assertJ-db 科普

前言 今日我们看看 java 大名鼎鼎的 assertj 是怎么做断言的 数据库断言 在实际的测试中我们总是跟业务打交道的。跟业务打交道一般很难避免验证数据库中的东西。尤其在接口测试中&#xff0c;一个常见的例子是你测试一个下单的接口。 接口返回可能就是成功过或者失败。你无…

AI换脸实践

1.windows版本 解压即用&#xff0c;2024最简单好用AI开源换脸应用&#xff0c;整合包已备好&#xff0c;快试试吧&#xff01;_哔哩哔哩_bilibili 2.linux版本 1&#xff09;克隆roop项目 git clone https://github.com/s0md3v/roop 创建虚拟环境 python -m venv venv 激…

安装 Fedora CoreOS 操作系统

首发日期 2024-06-16, 以下为原文内容: 有一台吃灰几年的 e5-26v3 古老机器, 最近翻出来用一下. 首先从安装操作系统开始. 目录 1 FCOS 简介2 安装过程 2.1 下载 iso 镜像文件并制作安装 U 盘2.2 编写安装配置文件2.3 编译安装配置文件2.4 从 U 盘启动并安装 3 SSH 连接并测试…

直播平台美颜技术分析:视频美颜SDK功能实现原理

本篇文章&#xff0c;笔者将深入分析视频美颜SDK的功能实现原理&#xff0c;探讨其在直播平台中的应用。 一、视频美颜技术概述 通过这些功能&#xff0c;用户可以在直播过程中呈现更加理想的自己&#xff0c;从而提高观众的观看体验和互动积极性。 二、视频美颜SDK的功能 1…