【kafka实战】03 SpringBoot使用kafka生产者和消费者示例

本节主要介绍用SpringBoot进行开发时,使用kafka进行生产和消费

一、引入依赖

<dependencies><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>com.google.code.gson</groupId><artifactId>gson</artifactId><version>2.8.6</version></dependency>
</dependencies>

二、添加topic

使用offset explore软件,添加一个topic
在这里插入图片描述

三、配置文件

server:port: 8080
spring:kafka:consumer:# Kafka服务器bootstrap-servers: 192.168.56.201:9092# 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5D#auto-commit-interval: 2s# 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:# earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费分区的记录# latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据(在消费者启动之后生成的记录)# none:当各分区都存在已提交的offset时,从提交的offset开始消费;只要有一个分区不存在已提交的offset,则抛出异常auto-offset-reset: latest# 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量enable-auto-commit: false# 键的反序列化方式#key-deserializer: org.apache.kafka.common.serialization.StringDeserializerkey-deserializer: org.apache.kafka.common.serialization.StringDeserializer# 值的反序列化方式value-deserializer: org.apache.kafka.common.serialization.StringDeserializer# 这个参数定义了poll方法最多可以拉取多少条消息,默认值为500。如果在拉取消息的时候新消息不足500条,那有多少返回多少;如果超过500条,每次只返回500。# 这个默认值在有些场景下太大,有些场景很难保证能够在5min内处理完500条消息,# 如果消费者无法在5分钟内处理完500条消息的话就会触发reBalance,# 然后这批消息会被分配到另一个消费者中,还是会处理不完,这样这批消息就永远也处理不完。# 要避免出现上述问题,提前评估好处理一条消息最长需要多少时间,然后覆盖默认的max.poll.records参数# 注:需要开启BatchListener批量监听才会生效,如果不开启BatchListener则不会出现reBalance情况max-poll-records: 100properties:# 两次poll之间的最大间隔,默认值为5分钟。如果超过这个间隔会触发reBalancemax:poll:interval:ms: 600000# 当broker多久没有收到consumer的心跳请求后就触发reBalance,默认值是10ssession:timeout:ms: 10000listener:# 在侦听器容器中运行的线程数,一般设置为 机器数*分区数concurrency: 4# 自动提交关闭,需要设置手动消息确认ack-mode: manual_immediate# 消费监听接口监听的主题不存在时,默认会报错,所以设置为false忽略错误missing-topics-fatal: false# 两次poll之间的最大间隔,默认值为5分钟。如果超过这个间隔会触发reBalancepoll-timeout: 600000producer:# Kafka服务器bootstrap-servers: 192.168.56.201:9092# 发生错误后,消息重发的次数,开启事务必须设置大于0。retries: 3# acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。# acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。# acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。# 开启事务时,必须设置为allacks: all# 当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。batch-size: 16384# 生产者内存缓冲区的大小。buffer-memory: 1024000# 键的序列化方式key-serializer: org.apache.kafka.common.serialization.StringSerializer# 值的序列化方式value-serializer: org.apache.kafka.common.serialization.StringSerializer

四、生产者代码

@Slf4j
@RestController
@RequestMapping("/msg")
public class SendMessageController {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;@GetMapping("send")public String send() {for (int i = 0; i < 100; i++) {OrderInfo orderInfo = new OrderInfo();orderInfo.setAddress("成都市高新区");orderInfo.setOrderId(String.valueOf(i));orderInfo.setProductName("华为P60:" + i);kafkaTemplate.send("order.topic", "order:" + i, new Gson().toJson(orderInfo));}return "success";}
}

五、消费者代码

消费者代码采用手动ack的方式

@Slf4j
@Component
public class KafkaOrderConsumer {@KafkaListener(topics = "order.topic", groupId = "orderGroup")public void consumeOrder(ConsumerRecord<String, String> record, Acknowledgment ack) {log.info("消费订单消息key={},value={}", record.key(), record.value());ack.acknowledge();}
}

代码git仓库:https://gitee.com/syk1234/mqdmo

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/88026.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

智能热水器丨打造智能家居新体验

随着科学技术的不断发展&#xff0c;智能电器越来越被大众所采纳&#xff0c;如智能扫地机&#xff0c;智能洗衣机&#xff0c;智能微波炉等等&#xff0c;越来越智能的电器为人们的生活带来了许多便利。以往的热水器一般都是只有按键/机械的控制方式&#xff0c;没有其他无线控…

2015年蓝桥杯省赛C/C++ A组 灾后重建题解(100分)

10. 灾后重建 Pear市一共有N&#xff08;<50000&#xff09;个居民点&#xff0c;居民点之间有M&#xff08;<200000&#xff09;条双向道路相连。这些居民点两两之间都可以通过双向道路到达。这种情况一直持续到最近&#xff0c;一次严重的地震毁坏了全部M条道路。 震后…

操作系统级 ChatGPT 爆火!

本期推荐开源项目目录&#xff1a; 1. 操作系统贾维斯&#xff1f; 2. 开源翻译模型 3. 整理的 AI 技术资料 01 操作系统贾维斯&#xff1f; 让你的操作系统额变得智能&#xff0c;通过在终端输入自然语言 来让计算机执行一些通用的功能&#xff0c;比如创建、编辑照片、文件夹…

根据excel批量修改文件夹及其文件名称

简介 表哥公司电脑上有一大批文件夹&#xff0c;用于存放一些pdf。他希望对这些文件进行整理。文件夹批量重命名为好记一些的名字&#xff0c;文件夹下的pdf改成的名字格式为&#xff1a;文件夹名序号。 例如&#xff1a;文件夹从“1234”&#xff0c;改成“案件001”&#xf…

亚马逊投资Anthropic; OpenAI将推出新版ChatGPT

&#x1f989; AI新闻 &#x1f680; 亚马逊投资Anthropic获得可靠AI基础模型开发合作 摘要&#xff1a;亚马逊投资Anthropic至多40亿美元&#xff0c;将共同开发可靠高性能的基础模型&#xff0c;并能提前使用Anthropic技术。Anthropic将主要依赖亚马逊的云服务来训练未来的…

如何使用show profile 查看sql的执行周期

修改配置文件/etc/my.cnf 新增一行&#xff1a;query_cache_type1 重启mysql 先开启 show variables like %profiling%; set profiling1;select * from xxx ;show profiles; #显示最近的几次查询show profile cpu,block io for query 编号 #查看程序的执行步骤

微表情识别API + c++并发服务器系统

微表情识别API c并发服务器系统 该项目只开源c并发服务器程序&#xff0c;模型API部分不开源 地址&#xff1a;https://github.com/lin-lai/-API- 更新功能 4.1版本 改用epoll实现IO多路复用并发服务器 项目介绍 本项目用于检测并识别视频中人脸的微表情 目标任务: 用户上…

黑马JVM总结(二十四)

&#xff08;1&#xff09;练习-分析a a:先执行iload1&#xff1a;把数据读入到操作数栈中 iinc&#xff1a;把局部变量表中的1号曹位做一个自增&#xff0c;他在局部变量表中发生的并没有影响到操作数栈 a&#xff1a;限制性自增在做iload 自增变成12 iload把12读取到操作数…

Linux学习之HIS部署(4)

ElasticSearch部署 ElasticSearch资源 RabbitMQ资源 ElasticSearch服务部署 #OpenJDK环境部署 [rootServices ~]# yum clean all; yum repolist -v ... Total packages: 8,265 [rootServices ~]# yum -y install java-1.8.0-openjdk-devel.x86_64 #安装OpenJDk ... Compl…

微信删除好友对方知道吗?如何加回微信好友?

微信是我们日常生活中使用最多的社交软件&#xff0c;很多小伙伴在使用微信时都曾发出过这样的疑问&#xff1a;微信删除好友对方知道吗&#xff1f;当自己在微信中删除某人后&#xff0c;对方是否会收到信息提醒&#xff1f;另外&#xff0c;如果删除好友后感到后悔&#xff0…

start()方法源码分析

当我们创建好一个线程之后&#xff0c;可以调用.start()方法进行启动&#xff0c;start()方法的内部其实是调用本地的start0()方法&#xff0c; 其实Thread.java这个类中的方法在底层的Thread.c文件中都是一一对应的&#xff0c;在Thread.c中start0方法的底层调用了jvm.cpp文件…

seata的启动与使用

1 下载seata 下载地址&#xff1a;https://github.com/seata/seata/releases/v0.9.0/ 1.1 修改配置文件 将下载得到的压缩包进行解压&#xff0c;进入conf目录&#xff0c;调整下面的配置文件&#xff1a; registry.conf registry {type "nacos"nacos {serverA…

Spring 学习(八)事务管理

1. 事务 1.1 事务的 ACID 原则 数据库事务&#xff08;transaction&#xff09;是访问并可能操作各种数据项的一个数据库操作序列。事务必须满足 ACID 原则——即原子性&#xff08;Atomicity&#xff09;、一致性&#xff08;Consistency&#xff09;、隔离性&#xff08;Iso…

uniapp:tabBar点击后设置动画效果

APP端不支持dom操作&#xff0c;也不支持active伪类&#xff0c;绞尽脑汁也没办法给uniapp原生的tabBar点击加动画效果&#xff0c;所以最终只能舍弃原生tabBar&#xff0c;改用自定义tabBar。 自定义tabBar的原理是&#xff0c;页面的上部分分别是tabBar对应的页面组件&#…

Matlab绘图函数subplot、tiledlayout、plot和scatter

一、绘图函数subplot subplot(m,n,p)将当前图窗划分为 mn 网格&#xff0c;并在 p 指定的位置创建坐标区。MATLAB按行号对子图位置进行编号。第一个子图是第一行的第一列&#xff0c;第二个子图是第一行的第二列&#xff0c;依此类推。如果指定的位置已存在坐标区&#xff0c;…

计算物理专题----随机游走实战

计算物理专题----随机游走实战 Problem 1 Implement the 3D random walk 拟合线 自旋的 拟合函数&#xff08;没有数学意义&#xff09; 参数&#xff1a;0.627,3.336,0.603&#xff0c;-3.234 自由程满足在一定范围内的均匀分布以标准自由程为单位长度&#xff0c;…

node的服务端对接科大讯飞-火星ai解决方案

序&#xff1a; 官方给的node对接火星的demo其实只适用于node开发的web应用&#xff0c;但是对于纯node 作为服务端&#xff0c;也就是作为webapi来调用&#xff0c;你会发现&#xff0c;location.host直接是获取不到location的。这个时候&#xff0c;其实要单独起个wss的服务的…

C++: stack 与 queue

目录 1.stack与queue stack queue 2.priority_queue 2.1相关介绍 2.2模拟实现priority_queue --仿函数: --push --pop --top --size --empty --迭代器区间构造 2.3仿函数 3.容器适配器 stack模拟实现 queue模拟实现 学习目标: 1.stack和queue介绍与使用 2.pri…

PHP8中伪变量“$this->”和操作符“::”的使用-PHP8知识详解

对象不仅可以调用自己的变量和方法&#xff0c;也可以调用类中的变量和方法。PHP8通过伪变量“$this->”和操作符“::”来实现这些功能。 1.伪变量“$this->” 在通过对象名->方法调用对象的方法时&#xff0c;如果不知道对象的名称&#xff0c;而又想调用类中的方法…

基于微信小程序的校园代送跑腿系统(源码+lw+部署文档+讲解等)

文章目录 前言系统主要功能&#xff1a;具体实现截图论文参考详细视频演示为什么选择我自己的网站自己的小程序&#xff08;小蔡coding&#xff09;有保障的售后福利 代码参考源码获取 前言 &#x1f497;博主介绍&#xff1a;✌全网粉丝10W,CSDN特邀作者、博客专家、CSDN新星计…