【大数据学习 | kafka】简述kafka的消费者consumer

1. 消费者的结构

能够在kafka中拉取数据进行消费的组件或者程序都叫做消费者。

这里面要涉及到一个动作叫做拉取。

首先我们要知道kafka这个消息队列主要的功能就是起到缓冲的作用,比如flume采集数据然后交给spark或者flink进行计算分析,但是flume采用的就是消息的push方式,这个方式不能够保证推送的数据消费者端一定会消费完毕,会出现数据的反压问题,这个问题很难解决,所以才出现了消息队列kafka,它可以起到一个缓冲的作用,生产者部分将数据直接全部推送到kafka,然后消费者从其中拉取数据,这边如果也采用推送的方式,那么也就在计算端会出现反压问题,所以kafka的消费者一般都是采用拉的方式pull,并不是push

1.1 消费者组

在一个topic中存在多个分区,可以分摊压力实现负载均衡,那么整体topic中的数据会很多,如果消费者只有一个的话很难全部消费其中的数据压力也会集中在一个消费者中,并且在大数据行业中几乎所有的计算架构都是分布式的集群模式,那么这个集群模式中,计算的节点也会存在多个,这些节点都是可以从kafka中拉取数据的,所有消费者不可能只有一个,一般情况下都会有多个消费者。

正因为topic存在多个分区,每个分区中的数据是独立的,那么消费者最好也是一个一个和分区进行一一对应的,所以有几个分区应该对应存在几个消费者是最好的。

这个和分蛋糕是一样的,一个蛋糕分成几块,那么有几个人吃,应该是对应关系的

消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费。

消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。

2. 消费者实现

在实现消费者的时候我们需要知道几个消费者的配置重要参数

参数解释
bootstrap.servers集群地址
key.deserializerkey反序列化器
value.deserializervalue反序列化器
group.id消费者组id

首先创建消费者对象

消费者对象订阅相应的topic然后拉取其中的数据进行消费

整体代码如下

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;public class Consumer1 {public static void main(String[] args) {Properties pro = new Properties();pro.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"nn1:9092");pro.put(ConsumerConfig.GROUP_ID_CONFIG,"hainiu_group");//设定组idpro.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());//设定key的反序列化器pro.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());//设定value的反序列化器KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(pro);List<String> topics = Arrays.asList("topic_a","topic_b");//一个消费者可以消费多个分区的数据consumer.subscribe(topics);//订阅这个topicwhile (true){//死循环要一直消费数据ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));//间隔一秒钟消费一次数据,拉取一批数据过来Iterator<ConsumerRecord<String, String>> it = records.iterator();while(it.hasNext()){ConsumerRecord<String, String> record = it.next();System.out.println(record.topic()+"->"+record.partition()+"->"+ record.offset()+"->"+record.key()+"->"+record.value());}}}
}
[hexuan@hadoop106 datas]$ kafka-console-producer.sh --bootstrap-server hadoop106:9092 --topic topic_b>>1
>2
>3
>4
>5
>

3. 消费者与分区之间的对应关系

一个消费者组中的消费者和分区是一一对应的关系,一个分区应该对应一个消费者,但是如果消费者多了,那么有的消费者就没有分区消费,如果消费者少了那么会出现一个消费者消费多个分区的情况。

# 首先创建topic_c 用于测试分区和消费者的对应关系
kafka-topics.sh --bootstrap-server hadoop106:9092 --create --topic topic_c --partitions 3 --replication-factor 2
# 启动两个消费者 刚才我们写的消费者main方法运行两次
# 然后分别在不同的分区使用生产者发送数据,看数据在消费者中的打印情况

首先选择任务可以并行执行

选择任务修改配置

我们可以看到允许多实例并行执行

启动两次,这个时候我们就有了两个消费者实例

生产者线程:分别向三个分区中发送1 2 3元素

package com.hainiu.kafka.consumer;/*** ClassName : test3_producer* Package : com.hainiu.kafka.consumer* Description** @Author HeXua* @Create 2024/11/3 23:40* Version 1.0*/import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;public class test3_producer {public static void main(String[] args) {Properties pro = new Properties();pro.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop106:9092");pro.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());pro.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());KafkaProducer<String, String> producer = new KafkaProducer<String, String>(pro);ProducerRecord<String, String> record1 = new ProducerRecord<>("topic_d", 0,null,"1");ProducerRecord<String, String> record2 = new ProducerRecord<>("topic_d", 1,null,"2");ProducerRecord<String, String> record3 = new ProducerRecord<>("topic_d", 2,null,"3");producer.send(record1);producer.send(record2);
//        producer.send(record3);producer.close();}
}

可以看到有的消费者消费了两个分区的数据

如果启动三个消费者会发现每个人消费一个分区的数据

如果启动四个消费者

我们发现有一个消费者没有数据

3. 1 消费多topic的数据

不同组消费不同的topic或者一个组可以消费多个topic都是可以的

3.2 多个组消费一个topic

同一个topic可以由多个消费者组进行消费数据,并且相互之间是没有任何影响的

修改同一份代码的组标识不同。启动两个实例查看里面的消费信息

   pro.put(ConsumerConfig.GROUP_ID_CONFIG,"hainiu_group1");pro.put(ConsumerConfig.GROUP_ID_CONFIG,"hainiu_group2");//分别修改消费者组的id不同
package com.hainiu.kafka;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;public class Consumer1 {public static void main(String[] args) {Properties pro = new Properties();pro.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"nn1:9092");pro.put(ConsumerConfig.GROUP_ID_CONFIG,"hainiu_group");pro.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());pro.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(pro);List<String> topics = Arrays.asList("topic_c");//订阅多个topic的数据变化consumer.subscribe(topics);while (true){ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));Iterator<ConsumerRecord<String, String>> it = records.iterator();while(it.hasNext()){ConsumerRecord<String, String> record = it.next();System.out.println(record.topic()+"->"+record.partition()+"->"+ record.offset()+"->"+record.key()+"->"+record.value());}}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/59720.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

CCF ChinaOSC |「开源科学计算与系统建模openSCS专题分论坛」11月9日与您相约深圳

2024年11月9日至10日&#xff0c;以“湾区聚力 开源启智”为主题的2024年中国计算机学会中国开源大会&#xff08;CCF ChinaOSC&#xff09;将在深圳召开。大会将汇聚国内外学术界、顶尖科技企业、科研机构及开源社区的精英力量&#xff0c;共同探索人工智能技术和人类智慧的无…

人工智能——小白学习指南

知孤云出岫 目录 1. **智能评测系统**2. **个性化学习路径推荐**3. **虚拟学习助手**4. **学习行为分析**5. **数据驱动的教学决策**6. **自动化课程推荐**7. **数据隐私与安全保护** 人工智能知识点的总结和学习路线&#xff0c;以数据表格形式呈现&#xff0c;并附带在教育行…

现代Web开发:React Hooks深入解析

&#x1f493; 博客主页&#xff1a;瑕疵的CSDN主页 &#x1f4dd; Gitee主页&#xff1a;瑕疵的gitee主页 ⏩ 文章专栏&#xff1a;《热点资讯》 现代Web开发&#xff1a;React Hooks深入解析 现代Web开发&#xff1a;React Hooks深入解析 现代Web开发&#xff1a;React Hook…

RTC精度及校准

RTC精度偏差&#xff1a; RTC的基准时间和精度与石英晶体的频率相关&#xff0c;晶体的谐振频率取决于温度&#xff0c;因此RTC性能与温度相关&#xff0c;晶体的频率偏差是晶体正常频率的温度反转函数。 一、硬件方面&#xff1a; 1.使用高精度振荡器的RTC模块&#xff1b; …

了解bootstrap改造asp.net core MVC的样式模板

我们都知道&#xff0c;在使用默认的asp.net core MVC模板建立项目的时候&#xff0c;里面的样式是已经事先被写好了的。一般来说都在css目录下的site.css和bootstrap.css及下面的bootstrap.min.css中。我们打开bootstrap这些样式文件&#xff0c;里面有大量的样式类的定义&…

流类库与输入输出

来源&#xff1a;《C语言程序设计》 像C语言一样&#xff0c;C语言也没有输入输出语句。 但C标准库中有一个面向对象的输入输出软件包&#xff0c;即I/O流类库。 流是I/O流类的中心概念。 ------ I/O流类库是C语言中I/O函数在面向对象的程序设计方法中的一个替换产品。 -…

cocos creator 3.8.3物理组件分组的坑

坑&#xff0c;坑的不行的大坑 group用的二进制的左移获取十进制的数值 目前是这样判断的&#xff0c;也不知道对不对&#xff0c;什么get、set Group没找到

基于MFC实现的赛车游戏

一、问题描述 游戏背景为一环形车道图&#xff0c;选择菜单选项“开始游戏”则可开始游戏。游戏的任务是使用键盘上的方向键操纵赛道上的蓝色赛车追赶红色赛车&#xff0c;红色赛车沿车道顺时针行驶&#xff0c;出发点和终点均位于车道左上方。任一赛车先达到终点则比赛结束。…

RHCE的学习(12)

第九章 Ubuntu 什么是Ubuntu 概述 Ubuntu&#xff08;乌班图&#xff09;属于Debian系列&#xff0c;Debian是社区类Linux的典范&#xff0c;是迄今为止最遵循GNU规范的Linux系统。 Debian最早由Ian Murdock于1993年创建&#xff0c;分为三个版本分支&#xff08;branch&…

【案例】故障雪花屏

开发平台&#xff1a;Unity 6.0 开发工具&#xff1a;Shader Graph 参考视频&#xff1a;【U2D Shader Graph】❄️雪❄️花❄️屏❄️   一、效果图 二、Shader Graph 路线图 三、案例分析 核心思路&#xff1a;雪花屏幕效果 &#xff08;混合&#xff09; 原图像 最终图像…

有什么办法换网络ip动态

在数字化时代&#xff0c;网络已成为我们生活、工作不可或缺的一部分。然而&#xff0c;随着网络应用的日益广泛&#xff0c;IP地址作为设备在网络中的唯一标识&#xff0c;其重要性不言而喻。动态换IP&#xff0c;作为一种灵活且高效的网络技术&#xff0c;正逐渐受到越来越多…

Spring Boot中集成MyBatis操作数据库详细教程

目录 前言1. 项目依赖配置1.1 引入MyBatis和数据库驱动依赖1.2 数据源配置 2. 创建数据库映射实体类3. 创建Mapper层接口4. 创建Service层4.1 定义Service接口4.2 实现Service接口 5. 创建Controller层6. 运行和测试项目6.1 启动项目6.2 测试接口 7. 总结 前言 在Java开发中&a…

【大语言模型】ACL2024论文-07 BitDistiller: 释放亚4比特大型语言模型的潜力通过自蒸馏

【大语言模型】ACL2024论文-07 BitDistiller: 释放亚4比特大型语言模型的潜力通过自蒸馏 目录 文章目录 【大语言模型】ACL2024论文-07 BitDistiller: 释放亚4比特大型语言模型的潜力通过自蒸馏目录摘要研究背景问题与挑战如何解决创新点算法模型实验效果代码推荐阅读指数&…

鸿蒙next打包流程

目录 下载团结引擎 添加开源鸿蒙打包支持 打包报错 路径问题 安装DevEcoStudio 可以在DevEcoStudio进行打包hap和app 包结构 没法直接用previewer运行 真机运行和测试需要配置签名,DevEcoStudio可以自动配置, 模拟器安装hap提示报错 安装成功,但无法打开 团结1.3版本新增工具…

基于Jeecgboot3.6.3vue3的flowable流程online表单的审批使用介绍

更多技术支持与服务请加入我的知识星球或加我微信&#xff0c;名称:亿事达nbcio技术交流社区https://t.zsxq.com/iPi8F 今天介绍一下基于jeecgboot3.6.3的flowable流程使用online表单进行审批的情况 1、首先建立一个online应用类型的流程&#xff0c;如下&#xff1a; 2、进行…

【LeetCode】【算法】238. 除自身以外数组的乘积

LeetCode 238. 除自身以外数组的乘积 题目描述 给你一个整数数组 nums&#xff0c;返回 数组 answer &#xff0c;其中 answer[i] 等于 nums 中除 nums[i] 之外其余各元素的乘积 。 题目数据保证数组 nums之中任意元素的全部前缀元素和后缀的乘积都在 32 位整数范围内。 请不…

如何构建一个可扩展的测试自动化框架?

以下为作者观点&#xff1a; 假设你是测试自动化方面的新手&#xff0c;想参与构建一个框架。在这种情况下&#xff0c;重要的是要了解框架所需的组件&#xff0c;以及它们是如何组合的。思考项目的具体需求和目标&#xff0c;以及可能遇到的困难和挑战。 假如你是一个测试架…

实战:索引的命中机制

在 SQL Server 中,查询是否能命中索引(即是否能使用 Index Seek)取决于多个因素,包括索引的结构、查询条件的排列、和数据库优化器的策略。以下是一些常见的命中索引和不能命中索引的情况,及其详细解释: 一、命中索引的情况 1. 前导列匹配(典型的命中索引场景) 索引结…

使用Docker快速部署FastAPI Web应用

Docker是基于 Linux 内核的cgroup、namespace以及 AUFS 类的Union FS 等技术&#xff0c;对进程进行封装隔离&#xff0c;一种操作系统层面的虚拟化技术。Docker中每个容器都基于镜像Image运行&#xff0c;镜像是容器的只读模板&#xff0c;容器是模板的一个实例。镜像是分层结…

C++【string类,模拟实现string类】

&#x1f31f;个人主页&#xff1a;落叶 &#x1f31f;当前专栏: C专栏 目录 为什么学习string类 C语言中的字符串 标准库中的string类 auto和范围for auto关键字 迭代器 范围for string类的常用接口说明和使用 1. string类对象的常见构造 2.string类对象的容量操作 3…