深入学习 Kafka(3)- SpringBoot 整合 Kafka

1. 引入 jar

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>

2. yml 配置

spring:kafka:bootstrap-servers: localhost:9092,localhost:9093,localhost:9094 # kafka 服务地址,多个以逗号隔开producer:retries: 0 #若设置大于0的值,客户端会将发送失败的记录重新发送acks: 1 #只要Leader副本确认接收就算成功batch-size: 16384 #当将多个记录被发送到同一个分区时, Producer 将尝试将记录组合到更少的请求中。这有助于提升客户端和服务器端的性能。这个配置控制一个批次的默认大小(以字节为单位)。16384是缺省的配置buffer-memory: 33554432 #Producer 用来缓冲等待被发送到服务器的记录的总字节数,33554432是缺省配置key-serializer: org.apache.kafka.common.serialization.StringSerializer #关键字的序列化类value-serializer: org.apache.kafka.common.serialization.StringSerializer #值的序列化类consumer:enable-auto-commit: false # 关闭自动提交 ackauto-commit-interval: 100 # 消费者自动提交偏移量的时间间隔,enable-auto-commit=false时,次配置不生效auto-offset-reset: earliestmax-poll-records: 500 # 每次拉取的最大记录数key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializergroup-id: ${APP_NAME}listener:ack-mode: manual_immediate # 手动ackpoll-timeout: 500ms # 每次调用poll()时,如果没有消息可消费,将等待最多500ms

3. 实现代码

生产者:
@Slf4j
@Component
public class KafkaProducer {private final ExecutorService executorService = Executors.newFixedThreadPool(10);@Resourceprivate KafkaTemplate<String, String> kafkaTemplate;/*** @param topic 要发送到的Kafka主题名称* @param msg   要发送的消息内容*/public void send(String topic, String msg) {executorService.submit(() -> kafkaTemplate.send(topic, msg).addCallback(result -> {if (result != null && result.getRecordMetadata() != null) {log.info("消息发送成功,offset = {}", result.getRecordMetadata().offset());} else {log.warn("消息发送完成,但结果或其元数据为空");}}, throwable -> log.error("消息发送失败,原因 = {}", throwable.getMessage())));}@PreDestroypublic void shutdown() {executorService.shutdown();try {if (!executorService.awaitTermination(500, TimeUnit.MILLISECONDS)) {executorService.shutdownNow();}} catch (InterruptedException e) {executorService.shutdownNow();Thread.currentThread().interrupt();}}
}
消费者:
@Component
public class KafkaConsumer {@KafkaListener(topics = "myTopic",groupId = "xxx", properties = "max.poll.records:5", concurrency = "3")  public void listen(ConsumerRecord<?, ?> record) {  // 处理消息  ack.acknowledge();}
}// topics:可以消费多个topic
// groupId:不同的groupId可以独立消费topic
// properties = "max.poll.records:5":每次轮询时消费者从Kafka服务器拉取的最大记录数(即消息数)
// concurrency:并发消费者的数量

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/38663.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

大数据开发助手:Coze平台上一款致力于高效解决大数据开发问题的智能Bot!

大数据开发助手&#xff1a;Coze平台上一款致力于高效解决大数据开发问题的智能Bot 核心技术揭秘1. **自然语言处理&#xff08;NLP&#xff09;**2. **知识图谱构建**3. **个性化推荐算法** 功能特色概览1. **即时问题解答**2. **最佳实践分享**3. **个性化学习路径**4. **社区…

哪个牌子的超声波清洗器好?精选四大超强超声波清洗机力荐

生活中戴眼镜的人群不在少数&#xff0c;然而要维持眼镜的干净却不得不每次都需要清洗&#xff0c;只是通过手洗的方式实在太慢并且容易操作不当让镜片磨损更加严重&#xff01;所以超声波清洗机就诞生了&#xff01;超声波清洗机能够轻松清洗机眼镜上面的油脂污渍&#xff0c;…

使用Java构建可伸缩的云原生应用架构

使用Java构建可伸缩的云原生应用架构 大家好&#xff0c;我是免费搭建查券返利机器人省钱赚佣金就用微赚淘客系统3.0的小编&#xff0c;也是冬天不穿秋裤&#xff0c;天冷也要风度的程序猿&#xff01; 什么是云原生应用架构&#xff1f; 云原生应用架构是一种利用云计算平台…

synchronized 锁优化原理

目录 一、轻量级锁 二、锁膨胀 三、自旋优化 四、偏向锁 五、锁消除 一、轻量级锁 1. 会创建一个锁记录 Lock Record&#xff08;保存在线程栈中&#xff09;&#xff0c;尝试 CAS 修改 Mark Word 中的对象头&#xff0c;是一种乐观锁的思想&#xff0c;而不是将 Java 对…

【多线程开发 4】从源码学习LockSupport

从源码学习LockSupport 2024年6月30日 大家好啊&#xff0c;好久没写博客了&#xff0c;今天打算写一下&#xff0c;讲一下JUC里面LockSupport这个类。 这个是一个工具类&#xff0c;实际上也是为了线程通信开发的。它的源码比较短&#xff0c;也只引用了Unsafe一个类。所以…

机器学习——强化学习状态值函数V和动作值函数Q的个人思考

最近在回顾《西瓜书》的理论知识&#xff0c;回顾到最后一章——“强化学习”时对于值函数部分有些懵了&#xff0c;所以重新在网上查了一下&#xff0c;发现之前理解的&#xff0c;包括网上的大多数对于值函数的描述都过于学术化、公式化&#xff0c;不太能直观的理解值函数以…

SQL常用经典语句大全

SQL经典语句大全 一、基础 1、说明&#xff1a;创建数据库 CREATE DATABASE database-name 2、说明&#xff1a;删除数据库 drop database dbname 3、说明&#xff1a;备份sql server — 创建 备份数据的 device USE master EXEC sp_addumpdevice ‘disk’, ‘testBack’, ‘c:…

macos Automator自动操作 app, 创建自定义 应用程序 app 的方法

mac内置的这个 自动操作 automator 应用程序&#xff0c;可以帮助我们做很多的重复的工作&#xff0c;可以创建工作流&#xff0c; 可以录制并回放操作&#xff0c; 还可以帮助我们创建自定的应用程序&#xff0c;下面我们就以创建一个自定义启动参数的chrome.app为例&#xff…

C语言 求数列 S(n) = a + aa + aaa + …aa…a (n 个 a)的和

求数列S(n)aaaaaa…aa…a(n个a)之值&#xff0c;其中a是一个数字&#xff0c;n表示a的位数&#xff0c;n由键盘输入。例如222222222222222&#xff08;此时n5&#xff09; 这个程序读取用户输入的一个数字 a 和一个正整数 n&#xff0c;计算并输出数列 S(n) 的值。 #include …

cube-studio 开源一站式云原生机器学习/深度学习/大模型训练推理平台介绍

全栈工程师开发手册 &#xff08;作者&#xff1a;栾鹏&#xff09; 一站式云原生机器学习平台 前言 开源地址&#xff1a;https://github.com/tencentmusic/cube-studio cube studio 腾讯开源的国内最热门的一站式机器学习mlops/大模型训练平台&#xff0c;支持多租户&…

绘图黑系配色

随便看了几篇小论文&#xff0c;里面的黑配色挺喜欢的&#xff0c;虽然平时SCI系配色用的多&#xff0c;但看到纯黑配色与黑加蓝配色&#xff0c;那就是我最心上的最优style。

一文了解IP地址冲突的起因与解决方案

IP 地址冲突是困扰网络管理员影响网络的正常运行的常见因素。深入理解并有效解决 IP 地址冲突故障对于维护网络的高效稳定运行具有重要意义。 一、IP 地址冲突的原因 &#xff08;一&#xff09;人为配置错误 网络用户在手动配置 IP 地址时&#xff0c;对网络配置了解不多用户…

OpenGL3.3_C++_Windows(23)

伽ga马校正 物理亮度 光子数量 线性空间&#xff1a;光子数(亮度&#xff09;和颜色值的线性关系人眼感知的亮度&#xff1a;对比较暗的颜色变化更敏感&#xff0c;感知亮度基于人的感觉非线性空间&#xff1a;光子数(亮度&#xff09;和 颜色值^2.2&#xff0c;恰好符合屏幕…

一些项目的说明

这是一个管理系统&#xff0c;比较缝合&#xff0c;可能想到什么有用的功能就写&#xff0c;也没太多的针对性&#xff0c;需要的功能可以自己拆解去用&#xff0c;也欢迎往上添加新功能。 业余玩家&#xff0c;代码有空就写。 项目相关的业务设计写在CSDN博客里。用户IDYuboc…

为什么我学个 JAVA 就已经耗尽所有而有些人还能同时学习多门语言

在开始前刚好我有一些资料&#xff0c;是我根据网友给的问题精心整理了一份「JAVA的资料从专业入门到高级教程」&#xff0c; 点个关注在评论区回复“888”之后私信回复“888”&#xff0c;全部无偿共享给大家&#xff01;&#xff01;&#xff01;我的入门语言是C&#xff0c…

Android InputChannel连接

InputChannel是InputDispatcher 和应用程序 (InputTarget) 的通讯桥梁&#xff0c;InputDispatcher 通知应用程序有输入事件&#xff0c;通过InputChannel中的socket进行通信。 连接InputDispatcher和窗口 WinodwManagerService:addwindow: WMS 添加窗口时&#xff0c;会创建…

互联网场景下人脸服务基线方案总结

1.简介 1.1目的 在过去的一段时间里&#xff0c;因为听见业务对人脸服务方案的需求&#xff0c;针对网络视频中关键人物定位的检索任务&#xff0c;完成了基于互联网场景的人脸基线服务的构建。本文档是对当前基线服务以后之后解决方案的优化进行总结。 1.2范围 本文档描述的人…

c++读取文件时出现中文乱码

原因&#xff1a;UTF-8格式不支持汉字编码 解决&#xff1a;改成ANSI&#xff0c;因为ANSI编码支持汉字编码

Python学习路线图(2024最新版)

这是我最开始学Python时的一套学习路线&#xff0c;从入门到上手。&#xff08;不敢说精通&#xff0c;哈哈~&#xff09; 一、Python基础知识、变量、数据类型 二、Python条件结构、循环结构 三、Python函数 四、字符串 五、列表与元组 六、字典与集合 最后再送给大家一套免费…

FFmpeg开发笔记(四十二)使用ZLMediaKit开启SRT视频直播服务

《FFmpeg开发实战&#xff1a;从零基础到短视频上线》一书在第10章介绍了轻量级流媒体服务器MediaMTX&#xff0c;通过该工具可以测试RTSP/RTMP等流媒体协议的推拉流。不过MediaMTX的功能实在是太简单了&#xff0c;无法应用于真实直播的生产环境&#xff0c;真正能用于生产环境…