kafka之消费者(Consumer)

1、kafka消费者消费方式

        kafka 的消费者(Consumer)采用 pull 的方式主动从 broker 中拉取数据,这种不足之处会有:当 broker 中没有消息时,消费者会不断循环取数据,一直返回空数据。

2、消费者组

Consumer Group(CG):消费者组,由多个consumer组成。形成一个消费者组的条件,是所有消费者的 group_id 相同。

1)、 消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费。 2)、消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。

3)、在命令行中,使用消费者消费消息没有指定消费者组,会自动分配一个消费者组,而不是没没有消费者组。

4)、如果向消费组中添加更多的消费者大于主题分区数量,则有一部分消费者就会闲置,不会接收任何消息。

5)、如果消费组中的消费者小于主题分区数量,则部分消费者消费的分区数量不止一个

2.1、消费者重要参数
参数名称描述
bootstrap.servers向Kafka 集群建立初始连接用到的host/port 列表。示例:node-1:9092,node-2:9092...
key.deserializer 和value.deserializer指定接收消息的key 和value 的反序列化类型。一定要写全限定类名。
group.id标记消费者所属的消费者组。(必须参数)
enable.auto.commit默认值为true,消费者会自动周期性地向服务器提交偏移量。
auto.commit.interval.ms如果设置了 enable.auto.commit 的值为true, 则该值定义了消费者偏移量向Kafka 提交的频率,默认5s。
auto.offset.reset当Kafka 中没有初始偏移量或当前偏移量在服务器中不存在(如,数据被删除了),该如何处理? earliest:自动重置偏移量到最早的偏移量。 latest:默认,自动重置偏移量为最新的偏移量。 none:如果消费组原来的(previous)偏移量不存在,则向消费者抛异常。 anything:向消费者抛异常。
offsets.topic.num.partitions__consumer_offsets 的分区数,默认是50 个分区
heartbeat.interval.msKafka 消费者和coordinator 之间的心跳时间,默认3s。该条目的值必须小于 session.timeout.ms ,也不应该高于session.timeout.ms 的1/3。
session.timeout.msKafka 消费者和coordinator 之间连接超时时间,默认45s。超过该值,该消费者被移除,消费者组执行再平衡。
max.poll.interval.ms消费者处理消息的最大时长,默认是 5 分钟 。超过该值,该消费者被移除,消费者组执行再平衡。
fetch.min.bytes默认1个字节。消费者获取服务器端一批消息最小的字节数。
fetch.max.wait.ms默认500ms 。如果没有从服务器端获取到一批数据的最小字节数 。该时间到,仍然会返回数据。
fetch.max.bytes默认Default: 52428800 (50m)。消费者 获取 服务器端 一 批消息最大的字节数 。如果服务器端一批次的数据大于该值50m)仍然可以拉取回来这批数据,因此,这不是一个绝对最大值。一批次的大小受 message.max.bytes (broker config) or max.message.bytes (topic config)影响。
max.poll.records一次poll拉取数据返回消息的最大条数, 默认是 500 条 。
2.2、消费者代码实现
2.2.1、引入依赖
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.6.1</version>
</dependency>
2.2.2、消费者代码实现(自定义消费者组实现)
public class KafkaConsumerTest {public static void main(String[] args) {Properties properties = new Properties();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node-1:9092,node-2:9092");properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 指定分区策略properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.RoundRobinAssignor");// 指定消费者组,必须参数properties.put(ConsumerConfig.GROUP_ID_CONFIG, " test1");KafkaConsumer<String,String> consumer = new KafkaConsumer<>(properties);// 订阅主题,可以定义多个主题List<String> topics = new ArrayList<>();topics.add("topic1");// 订阅consumer.subscribe(topics);while (true){// 拉取消息ConsumerRecords<String, String> msg = consumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> consumerRecord : msg) {System.out.println(consumerRecord);System.out.println(consumerRecord.value());}}}
}
2.2.3、消费者代码实现(指定主题分区实现)
public class KafkaConsumerTest {public static void main(String[] args) {Properties properties = new Properties();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node-1:9092,node-2:9092");properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 指定分区策略properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.RoundRobinAssignor");// 指定消费者组,必须参数properties.put(ConsumerConfig.GROUP_ID_CONFIG, " test1");KafkaConsumer<String,String> consumer = new KafkaConsumer<>(properties);// 订阅主题分区List<TopicPartition> topicPartitions = new ArrayList<>();topicPartitions.add(new TopicPartition("topic1", 1));consumer.assign(topicPartitions);while (true){// 拉取消息ConsumerRecords<String, String> msg = consumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> consumerRecord : msg) {System.out.println(consumerRecord);System.out.println(consumerRecord.value());}}}
}
3、总结

        本文介绍kafka的消费者是如何消费消息,简单介绍消费者的使用,关于消费者更高级的部分,关注我,在博客和微信公众号中都会发布。

        本人是一个从小白自学计算机技术,对运维、后端、各种中间件技术、大数据等有一定的学习心得,想获取自学总结资料(pdf版本)或者希望共同学习,关注微信公众号:it自学社团。后台回复相应技术名称/技术点即可获得。(本人学习宗旨:学会了就要免费分享)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/605554.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【数据结构】平衡二叉树

导语 对于二叉搜索树 而言&#xff0c;它的 增、 删 、 改 、 查 的时间复杂度为 O() ~ O(n) 。原因是最坏情况下&#xff0c;二叉搜索树会退化成 线性表 。换言之&#xff0c;树的高度决定了它插入、删除和查找的时间复杂度。 为了对上述缺点进一步优化&#xff0c;设计了一…

APM32F035有感矢量控制方案

一、先来几句废话 首先这两年公司越来越多的开始使用国产的MCU&#xff0c;用过GD32、AT32、APM32等等&#xff0c;目前稳定使用的是APM32,包括身边朋友工作室&#xff0c;也开始从TI、STM、NXP换成APM32。上个月有幸拿到APM32F035电路控制板&#xff0c;非常感谢面包板社区提供…

实战环境搭建-linux下安装tomcat

安装tomcat Index of /dist/tomcat/tomcat-9/v9.0.8/bin 下载apache-tomcat-9.0.8.tar.gz,可以使用wget; 2、将压缩包tar -zxvf apache-tomcat-9.0.8.tar.gz解压到/home/tomcat 3、修改环境变量 vi /etc/profile export JAVA_HOME=/home/java/jdk1.8.0_221 export JRE_HO…

sublime如何取消运行代码状态

sublime如何取消运行代码状态 解决方案待续、更新中 解决方案 1 顶部取消: 工具-----取消编译 这个看自己编译器sublime取消编译是否可用,可用则用 ,否则使用下面方法 2 底部栏取消–如图所示: 取消成功: 待续、更新中 ————————————————————— 以上就…

基于php应用的文件管理器eXtplorer部署网站并内网穿透远程访问

文章目录 1. 前言2. eXtplorer网站搭建2.1 eXtplorer下载和安装2.2 eXtplorer网页测试2.3 cpolar的安装和注册 3.本地网页发布3.1.Cpolar云端设置3.2.Cpolar本地设置 4.公网访问测试5.结语 1. 前言 通过互联网传输文件&#xff0c;是互联网最重要的应用之一&#xff0c;无论是…

7.数据转换、格式化、校验

日期字符串格式的表单参数,提交后转换为 Date 类型 <!-- 解决问题: 1.数据类型转换 2.数据格式 3.数据校验 --> BirthDay :<form:input path="birthDay"/>Employee 类中增加日期类型属性: //关于类型转换 private Date birthDay ;数据绑定流程原理 …

Qt QWidget窗口基类

文章目录 1 QWidget介绍2 如何显示 QWidget窗口2.1 新建基于QWidget的窗口类2.2 再添加一个QWidget窗口类2.3 显示新添加的 QWidget窗口 3 常用的属性和方法3.1 窗口位置3.2 窗口大小3.3 窗口标题3.4 窗口图标3.5 资源文件 4 实例 1 QWidget介绍 Qt 中的常用控件&#xff0c;比…

什么是CDN,优势在哪里

随着互联网的普及和用户需求的多样化&#xff0c;网站的速度和稳定性已经成为影响用户体验的关键因素。CDN加速作为解决这一问题的有效手段&#xff0c;正逐渐受到业界的广泛关注。 为什么说对网站这一块起到这么关键性的作用呢&#xff1f;它的优势在哪&#xff1f; 1.提升网…

【机器学习】卷积神经网络(五)-计算机视觉应用

七、应用-计算机视觉 7.1 人脸检测 DenseBox\Femaleness-Net\MT-CNN\Cascade CNN 介绍 VJ框架的分类器级联用于卷积网络 用于人脸检测的紧凑卷积神经网络级联 问题&#xff1a;作者希望实时检测高分辨率视频流中的正面&#xff0c;由于人脸图像和背景的多样性和复杂性&#xff…

【MIdjourney】图像角度关键词

本篇仅是我个人在使用过程中的一些经验之谈&#xff0c;不代表一定是对的&#xff0c;如有任何问题欢迎在评论区指正&#xff0c;如有补充也欢迎在评论区留言。 1.侧面视角(from side) 侧面视角观察或拍摄的主体通常以其侧面的特征为主要焦点&#xff0c;以便更好地展示其轮廓…

02. Eureka、Nacos注册中心及负载均衡原理

01小节中订单服务远程调用用户服务案例实现了跨服务请求&#xff0c;在微服务中一个服务可能是集群部署的&#xff0c;也就是一个服务有多个实例&#xff0c;但是我们在调用服务时需要指定具体的服务实例才能调用该服务&#xff0c;在集群模式下&#xff0c;服务地址应该写哪个…

1.3号io网络

文件IO 1.文件IO是基于系统调用 2.程序每进行一次系统调用&#xff0c;就会从用户空间向内核空间进行一次切换&#xff0c;执行效率较慢 3.目的&#xff1a;由于后期进程间通信&#xff0c;如管道、套接字通信&#xff0c;都使用的是文件IO&#xff0c;所以引入文件IO操作的…

MATLAB根据数据拟合曲线

MATLAB根据数据拟合曲线 MATLAB根据数据拟合曲线视频观看 MATLAB根据数据拟合曲线 x1[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54,55,56,57,58,59,60,61,6…

C语言之详解数组【附三子棋和扫雷游戏实战】

文章目录 一、一维数组的创建和初始化1、数组的创建2、数组的初始化3、一维数组的使用4、 一维数组在内存中的存储 二、二维数组的创建和初始化1、二维数组的创建2、二维数组的初始化3、二维数组的使用4、二维数组在内存中的存储 三、数组越界边界值考虑不当导致越界访问数组大…

Mysql数据库的基础操作

1、数据库的数据类型和结构设置&#xff0c;修改等 DML&#xff1a;针对数据的增删改 where条件更像是这一条命令中的限制条件&#xff0c;如果不带where条件的时候&#xff0c;相当于针对全表所有字段进行操作 DQL&#xff1b; 数据查询语言 1、查询关键词使用 select 这个里…

MYSQL学习之buffer pool的理论学习

MYSQL学习之buffer pool的理论学习 by 小乌龟 文章目录 MYSQL学习之buffer pool的理论学习前言一、buffer pool是什么&#xff1f;二、buffer pool 的内存结构三、buffer pool 的初始化和配置初始化配置 四、buffer pool 空间管理LRU淘汰法冷热数据分离的LRU算法1.引入库2.读入…

MacBook Pro M1搭建Kafka2.7版本源码运行环境

原创/朱季谦 最近在阅读Kafka的源码&#xff0c;想可以在阅读过程当中&#xff0c;在代码写一些注释&#xff0c;便决定将源码部署到本地运行。 日常开发过程中&#xff0c;用得比较多一个版本是Kafka2.7版本&#xff0c;故而在MacBook Pro笔记本上用这个版本的源码进行搭建&…

计算机网络实验(二):Wireshark网络协议分析

一、实验名称&#xff1a;Wireshark网络协议分析 二、实验原理 HTTP协议分析 1.超文本传输协议&#xff08;Hypertext Transfer Protocol, HTTP&#xff09;是万维网&#xff08;World Wide Web&#xff09;的传输机制&#xff0c;允许浏览器通过连接Web服务器浏览网页。目…

高性能、可扩展、支持二次开发的企业电子招标采购系统源码

在数字化时代&#xff0c;企业需要借助先进的数字化技术来提高工程管理效率和质量。招投标管理系统作为企业内部业务项目管理的重要应用平台&#xff0c;涵盖了门户管理、立项管理、采购项目管理、采购公告管理、考核管理、报表管理、评审管理、企业管理、采购管理和系统管理等…

2023我的编程之旅、2024新的启程

目录 一、2023年结束、2024年开始 1、回顾2023年 1.1、发表文章概述 1.2、开发中遇到的问题与解决方案 2、展望2024年 2.1、新年Flag 2.2、收获与成长 一、2023年结束、2024年开始 光阴荏苒&#xff0c;从我开始在CSDN写作已经2年零5个月了&#xff0c;我也在不断的思考…