Spring Boot集成kafka的相关配置

引入依赖:

额外依赖只需要这一个,kafka-client 不是springboot 的东西,那是原生的 kafka 客户端, kafka-test也不需要,是用代码控制broker的东西。

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>

yml配置:

也可以用java类Config 方式配置,如果没有特殊要求,可以只用spring配置的方式

server:port: 8080
spring:kafka:# Kafka服务器,支持集群bootstrap-servers: 127.0.0.1:9092,127.0.0.2:9092# 生产者配置producer:# 消息发送重试次数,注意会引起重复消费,消费者需要做幂等retries: 3# acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。# acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。# acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。# 开启事务时,必须设置为allacks: all# 当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。batch-size: 16384# 生产者内存缓冲区的大小。buffer-memory: 1024000# 键的序列化方式key-serializer: org.apache.kafka.common.serialization.StringSerializer# 值的序列化方式value-serializer: org.apache.kafka.common.serialization.StringSerializer# 消费者配置consumer:# 消费组ID,同一个消费组不会重复消费数据group-id: testGroup# 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5D#auto-commit-interval: 1S# 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:# earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费分区的记录# latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据(在消费者启动之后生成的记录)# none:当各分区都存在已提交的offset时,从提交的offset开始消费;只要有一个分区不存在已提交的offset,则抛出异常auto-offset-reset: latest# 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量enable-auto-commit: false# 键的序列化方式key-serializer: org.apache.kafka.common.serialization.StringSerializer# 值的序列化方式value-serializer: org.apache.kafka.common.serialization.StringSerializer# 这个参数定义了poll方法最多可以拉取多少条消息,默认值为500。如果在拉取消息的时候新消息不足500条,那有多少返回多少;如果超过500条,每次只返回500。# 这个默认值在有些场景下太大,有些场景很难保证能够在5min内处理完500条消息,# 如果消费者无法在5分钟内处理完500条消息的话就会触发reBalance,# 然后这批消息会被分配到另一个消费者中,还是会处理不完,这样这批消息就永远也处理不完。# 要避免出现上述问题,提前评估好处理一条消息最长需要多少时间,然后覆盖默认的max.poll.records参数# 注:需要开启BatchListener批量监听才会生效,如果不开启BatchListener则不会出现reBalance情况max-poll-records: 3properties:# 两次poll之间的最大间隔,默认值为5分钟。如果超过这个间隔会触发reBalancemax:poll:interval:ms: 600000# 当broker多久没有收到consumer的心跳请求后就触发reBalance,默认值是10ssession:timeout:ms: 10000listener:# 在侦听器容器中运行的线程数,一般设置为 机器数*分区数concurrency: 4# 自动提交关闭,需要设置手动消息确认ack-mode: manual_immediate# 消费监听接口监听的主题不存在时,默认会报错,所以设置为false忽略错误missing-topics-fatal: false# 两次poll之间的最大间隔,默认值为5分钟。如果超过这个间隔会触发reBalancepoll-timeout: 600000#批量提交#      type: batch

业务代码:

  1. 简单生产

    @RestController
    public class kafkaProducer {@Autowiredprivate KafkaTemplate<String, Object> kafkaTemplate;@GetMapping("/kafka/normal/{message}")public void sendNormalMessage(@PathVariable("message") String message) {kafkaTemplate.send("testTopic", message);}
    }
    
  2. 简单消费

注意加上@Component,被spring管理监听才有效

@Component
public class KafkaConsumer {//监听消费@KafkaListener(topics = {"testTopic"})public void onNormalMessage(ConsumerRecord<String, Object> record) {System.out.println("简单消费:" + record.topic() + "-" + record.partition() + "=" +record.value());}}

主题使用配置的方式:

  1. 在yml配置里加上topic

    server:port: 8080
    spring:kafka:# Kafka服务器,支持集群bootstrap-servers: 127.0.0.1:9092,127.0.0.2:9092# 自定义配置myconfig:topic: testTopic
  2. 简单生产+配置主题

    @RestController
    public class kafkaProducer {@Autowiredprivate KafkaTemplate<String, Object> kafkaTemplate;@Value("${spring.kafka.myconfig.topic}")private String topic;@GetMapping("/kafka/normal/{message}")public void sendNormalMessage(@PathVariable("message") String message) {kafkaTemplate.send(topic, message);}
    }
  3. 简单消费+配置主题

注意这里不能用@Value注解的方式,会报错

@Component
public class KafkaConsumer {//监听消费@KafkaListener(topics = {"${spring.kafka.myconfig.topic}"})public void onNormalMessage(ConsumerRecord<String, Object> record) {System.out.println("简单消费:" + record.topic() + "-" + record.partition() + "=" +record.value());}}

如果是多个主题需要消费, 可以使用Spring的SpEl表达式

server:port: 8080
spring:kafka:# Kafka服务器,支持集群bootstrap-servers: 127.0.0.1:9092,127.0.0.2:9092# 自定义配置myconfig:topic: testTopic1,testTopic2
@KafkaListener(topics = {"#{'${spring.kafka.myconfig.topic}'.split(',')}"})
@Component
public class KafkaConsumer {//监听消费@KafkaListener(topics = {"#{'${spring.kafka.myconfig.topic}'.split(',')}"})public void onNormalMessage(ConsumerRecord<String, Object> record) {System.out.println("简单消费:" + record.topic() + "-" + record.partition() + "=" +record.value());}}

 Kafka数据重复和数据丢失的解决方案_kafka相同数据_谢小涛的博客-CSDN博客

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/104135.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

基于Effect的组件设计 | 京东云技术团队

Effect的概念起源 从输入输出的角度理解Effect https://link.excalidraw.com/p/readonly/KXAy7d2DlnkM8X1yps6L 编程中的Effect起源于函数式编程中纯函数的概念 纯函数是指在相同的输入下&#xff0c;总是产生相同的输出&#xff0c;并且没有任何副作用(side effect)的函数。…

flutter 开发中的问题与技巧

一、概述 刚开始上手 flutter 开发的时候&#xff0c;总会遇到这样那样的小问题&#xff0c;而官方文档又没有明确说明不能这样使用&#xff0c;本文总结了一些开发中经常会遇到的一些问题和一些开发小技巧。 二、常见问题 1、Expanded 组件只能在 Row、Column、Flex 中使用 C…

缓存设计的创新之旅:架构的灵魂之一

缓存在架构设计中占有重要地位。缓存在提升性能中也扮演重要的角色。常见的有对资源的缓存&#xff0c;比如数据库连接池、http连接池&#xff0c;还有对数据的缓存等。缓存的设计可复杂也可简单&#xff0c;但是需要考虑的点却很多。 缓存对象 设计缓存的时候一定要考虑的是&…

行业追踪,2023-10-13

自动复盘 2023-10-13 凡所有相&#xff0c;皆是虚妄。若见诸相非相&#xff0c;即见如来。 k 线图是最好的老师&#xff0c;每天持续发布板块的rps排名&#xff0c;追踪板块&#xff0c;板块来开仓&#xff0c;板块去清仓&#xff0c;丢弃自以为是的想法&#xff0c;板块去留让…

3D 生成重建007-Fantasia3D和Magic3d两阶段玩转文生3D

3D生成重建3D 生成重建007-Fantasia3D和magic3d 文章目录 0 论文工作1 论文方法1.1 magic3d1.2 Fantasia3D 2 效果2.1 magic3d2.2 fantasia3d 0 论文工作 两篇论文都是两阶段法进行文生3d&#xff0c;其中fantasia3D主要对形状和外表进行解耦&#xff0c;然后先对geometry进行…

微服务11-Sentinel中的授权规则以及Sentinel服务规则持久化

文章目录 授权规则自定义异常结果规则持久化实现Push模式 授权规则 根据来源名称对请求进行拦截 ——>我们需要解析来源名称&#xff08;RequestOriginParser默认解析都为default&#xff09;&#xff0c;所以我们要自定义一个实现类&#xff08;根据请求头解析&#xff0c…

springcloud笔记(7)-限流降级Sentinel

官方文档&#xff1a;概述 | Spring Cloud Alibaba basic-api-resource-rule | Sentinel (sentinelguard.io) Sentinel是SpringCloudAlibaba的组件。 sentinel的功能 introduction | Sentinel 流量控制 熔断降级&#xff1a;降低调用链路中的不稳定资源 系统负载保护&am…

private key ssh连接服务器

这里用到的软件是PuTTY。 https://www.chiark.greenend.org.uk/~sgtatham/putty/latest.html 保存本地rsa文件后&#xff0c;打开软件PuTTYgen&#xff0c;点击Load导入文件&#xff0c;输入Key passphrase即密码&#xff0c;保存至本地。 随后在PuTTY配置ssh的用户名 来Cred…

单调栈总结

自底向上的单调递减栈&#xff0c;一般用来寻找下一个更大的元素。 常见做法是从后往前遍历数组&#xff0c;栈顶元素如果小于当前元素就出栈&#xff0c;保持单调性 def nextGreaterElements(self, nums):""":type nums: List[int]:rtype: List[int]"&qu…

纽带|MaaS,连接AI与商业落地,重塑行业版图

熟悉人工智能的你可曾听过“MaaS”&#xff1f;MaaS实际上便是模型即服务&#xff08;Model as a Service&#xff09;的英文首字母组合。 它代表着一种全新的AI应用思维方式&#xff0c;将AI模型封装成可调用的云服务&#xff0c;通过云平台呈现给广大用户&#xff0c;其核心…

系统架构设计:17 论信息系统的安全性与保密性设计

目录 一 信息安全基础 1 信息安全的基本要素 2 信息安全的范围 3 网络安全

文件介绍---C语言编程

0 Preface/Foreword 1 C文件概述 文件&#xff08;File&#xff09;是程序设计中的一个重要的概念。所谓“文件”一般指存储在外部介质上数据的集合。一批数据是以文件的形式存放在外部介质&#xff08;如磁盘&#xff09;上。操作系统是以文件为单位对数据进行管理&#xff0c…

14SpringMVC中的默认的异常解析器,以及如何基于XML和注解的方式配置自定义异常解析器

异常处理器 默认异常解析器 控制器方法执行过程中如果出现了异常,此时就可以使用SpringMVC提供的HandlerExceptionResolver接口帮助我们处理出现的异常 HandlerExceptionResolver接口有DefaultHandlerExceptionResolver(默认)和SimpleMappingExceptionResolver(自定义)两种实…

Rocket Typist pro for mac 「Macos文本快速输入工具」

Rocket Typist Pro是一款在Mac上使用的文本快速输入工具&#xff0c;它可以帮助用户更快速、更准确地输入文本。 这款软件的设计非常简单、高效&#xff0c;它通过使用短语或宏&#xff0c;可以快速插入文本&#xff0c;减少重复性工作&#xff0c;提高工作效率。 Rocket Typ…

【大数据】Hadoop MapReduce与Hadoop YARN(学习笔记)

一、Hadoop MapReduce介绍 1、设计构思 1&#xff09;如何对付大数据处理场景 对相互间不具有计算依赖关系的大数据计算任务&#xff0c;实现并行最自然的办法就是采取MapReduce分而治之的策略。 不可拆分的计算任务或相互间有依赖关系的数据无法进行并行计算&#xff01; …

Spring实战 | Spring AOP核心功能分析之葵花宝典

国庆中秋特辑系列文章&#xff1a; 国庆中秋特辑&#xff08;八&#xff09;Spring Boot项目如何使用JPA 国庆中秋特辑&#xff08;七&#xff09;Java软件工程师常见20道编程面试题 国庆中秋特辑&#xff08;六&#xff09;大学生常见30道宝藏编程面试题 国庆中秋特辑&…

计算机毕业设计选什么题目好?springboot 试题库管理系统

✍✍计算机编程指导师 ⭐⭐个人介绍&#xff1a;自己非常喜欢研究技术问题&#xff01;专业做Java、Python、微信小程序、安卓、大数据、爬虫、Golang、大屏等实战项目。 ⛽⛽实战项目&#xff1a;有源码或者技术上的问题欢迎在评论区一起讨论交流&#xff01; ⚡⚡ Java实战 |…

学信息系统项目管理师第4版系列26_项目绩效域(下)

1. 项目工作绩效域 1.1. 涉及项目工作相关的活动和职能 1.2. 预期目标 1.2.1. 高效且有效的项目绩效 1.2.2. 适合项目和环境的项目过程 1.2.3. 干系人适当的沟通和参与 1.2.4. 对实物资源进行了有效管理 1.2.5. 对采购进行了有效管理 1.2.6. 有效处理了变更 1.2.7. 通…

css3 table表格

使用CSS3来美化HTML表格&#xff08;table&#xff09;可以提高表格的外观和可读性 表格样式&#xff1a; table { width: 100%; border-collapse: collapse; } width: 100%; 使表格宽度充满其容器。border-collapse: collapse; 合并相邻的表格边框&#xff0c;使表格看起来更整…

互联网Java工程师面试题·Java 并发编程篇·第五弹

目录 52、什么是线程池&#xff1f; 为什么要使用它&#xff1f; 53、怎么检测一个线程是否拥有锁&#xff1f; 54、你如何在 Java 中获取线程堆栈&#xff1f; 55、JVM 中哪个参数是用来控制线程的栈堆栈小的? 56、Thread 类中的 yield 方法有什么作用&#xff1f; 57、…