kafka(五)——消费者流程分析(c++)

概念

​ 消费者组(Consumer Group):由多个consumer组成。消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费;消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者

整体流程

在这里插入图片描述

流程说明:

  • 消费者组包括多个消费者,每个消费者只能消费分区中的一部分数据;
  • 当一个消费者组中的消费者读取一个分区中的数据时,其他消费者就不能再读取该分区中的数据;
  • 一个消费者组可以有多个消费者,每个消费者只能消费分配给该消费者组的某些主题的某些分区;
  • 同一个分区只会被一个消费者组中的一个消费者消费,不同消费者组之间可以重复消费
  • 当消费者组中的某个消费者宕机后,Kafka会将该消费者所消费的分区重新分配给其他消费者,从而实现消费者的高可用性;
  • 消费者组中的消费者可以动态加入和退出,Kafka会自动重新分配分区;
  • 在同一个消费者组内,消费者之间可以进行负载均衡,以此来提高消息的吞吐量和消费的效率;
  • 消费者组可以通过消费者组ID(groupid)来标识,一个消费者组ID可以同时消费多个主题;

配置参数说明

参数名称描述
bootstrap.servers向Kafka集群建立初始连接用到的host/port列表。
key.deserializer和value.deserializer指定接收消息的key和value的反序列化类型。一定要写全类名。
group.id标记消费者所属的消费者组。
enable.auto.commit默认值为true,消费者会自动周期性地向服务器提交偏移量。
auto.commit.interval.ms如果设置了 enable.auto.commit 的值为true, 则该值定义了消费者偏移量向Kafka提交的频率,默认5s。
auto.offset.reset当Kafka中没有初始偏移量或当前偏移量在服务器中不存在(如,数据被删除了),该如何处理? earliest:自动重置偏移量到最早的偏移量。 latest:默认,自动重置偏移量为最新的偏移量。 none:如果消费组原来的(previous)偏移量不存在,则向消费者抛异常。 anything:向消费者抛异常。
offsets.topic.num.partitions__consumer_offsets的分区数,默认是50个分区。
heartbeat.interval.msKafka消费者和coordinator之间的心跳时间,默认3s。 该条目的值必须小于 session.timeout.ms ,也不应该高于 session.timeout.ms 的1/3。
session.timeout.msKafka消费者和coordinator之间连接超时时间,默认45s。超过该值,该消费者被移除,消费者组执行再平衡。
max.poll.interval.ms消费者处理消息的最大时长,默认是5分钟。超过该值,该消费者被移除,消费者组执行再平衡。
fetch.min.bytes默认1个字节。消费者获取服务器端一批消息最小的字节数。
fetch.max.wait.ms默认500ms。如果没有从服务器端获取到一批数据的最小字节数。该时间到,仍然会返回数据。
fetch.max.bytes默认Default: 52428800(50 m)。消费者获取服务器端一批消息最大的字节数。如果服务器端一批次的数据大于该值(50m)仍然可以拉取回来这批数据,因此,这不是一个绝对最大值。一批次的大小受message.max.bytes (broker config)or max.message.bytes (topic config)影响。
max.poll.records一次poll拉取数据返回消息的最大条数,默认是500条。

分区策略

  • Range
# 特点
确保每个消费者消费的分区数量是均衡的。
注意:Rangle范围分配策略是针对每个Topic的。
# 配置
配置消费者的partition.assignment.strategy为org.apache.kafka.clients.consumer.RangeAssignor。
# 算法公式
n = 分区数量 / 消费者数量
m = 分区数量 % 消费者数量
前m个消费者消费n+1个,剩余消费者消费n个。

在这里插入图片描述

  • RoundRobin
# 特点
将消费组内所有消费者以及消费者所订阅的所有topic的partition按照字典序排序(topic和分区的hashcode进行排序),然后通过轮询方式逐个将分区以此分配给每个消费者。
# 配置
配置消费者的partition.assignment.strategy为org.apache.kafka.clients.consumer.RoundRobinAssignor。

在这里插入图片描述

  • Sticky
# 特点
在执行一次新的分配之前,考虑上一次分配的结果,尽量少的调整分配的变动,可以节省大量的开销。
粘性分区是Kafka从0.11.x版本开始引入这种分配策略,首先会尽量均衡的放置分区到消费者上面,在出现同一消费者组内消费者出现问题的时候,会尽量保持原有分配的分区不变化。
  • CooperativeSticky

可以通过配置参数partition.assignment.strategy,修改分区的分配策略。默认策略是Range+ CooperativeSticky。

回调函数说明

事件回调

  • 设置回调
// 设置事件回调
m_event_cb = new ConsumerEventCb;
errorCode = m_config->set("event_cb", m_event_cb, errorStr);
if (RdKafka::Conf::CONF_OK != errorCode) 
{printf("Conf set(event_cb) failed, errorStr:%s\n", errorStr.c_str());return;
}
  • 回调处理
// 设置事件回调
class ConsumerEventCb : public RdKafka::EventCb 
{
public:void event_cb(RdKafka::Event &event) {switch (event.type()) {case RdKafka::Event::EVENT_ERROR:break;case RdKafka::Event::EVENT_STATS:break;case RdKafka::Event::EVENT_LOG:break;case RdKafka::Event::EVENT_THROTTLE:break;default:break;}}
};

消费者组再平衡回调

  • 设置回调
// 设置消费者组再平衡回调
m_rebalance_cb = new ConsumerRebalanceCb;
errorCode = m_config->set("rebalance_cb", m_rebalance_cb, errorStr);
if (RdKafka::Conf::CONF_OK != errorCode) 
{ELogError(("%s|Conf set(rebalance_cb) failed, errorStr:%s", GET_CODE_INFO(), errorStr.c_str()));break;
}
  • 回调处理
// 设置消费者组再平衡回调
// 注册该函数会关闭 rdkafka 的自动分区赋值和再分配
class ConsumerRebalanceCb : public RdKafka::RebalanceCb 
{
private:// 打印当前获取的分区static void printTopicPartition(const std::vector<RdKafka::TopicPartition *>&partitions) {for (unsigned int i = 0; i < partitions.size(); i++) {printf("count:%d, topic:%s,partition:%d\n",i, partitions[i]->topic().c_str(),partitions[i]->partition());}}public:// 消费者组再平衡回调void rebalance_cb(RdKafka::KafkaConsumer *consumer, RdKafka::ErrorCode err,std::vector<RdKafka::TopicPartition *> &partitions) {printf("RebalanceCb: %s", RdKafka::err2str(err).c_str());printTopicPartition(partitions);// 分区分配成功if (RdKafka::ERR__ASSIGN_PARTITIONS == err) {// 消费者订阅这些分区consumer->assign(partitions);// 获取消费者组本次订阅的分区数量,可以属于不同的topicm_partitionCount = (int)partitions.size();} else   // 分区分配失败{// 消费者取消订阅所有的分区consumer->unassign();// 消费者订阅分区的数量为0m_partitionCount = 0;}}private:int m_partitionCount;    // 消费者组本次订阅的分区数量
};

流程(c++)

  • 配置消费者客户端;
  • 订阅主题和分区;
  • 拉取消息;
  • 处理消息;
  • 提交消费位移;

配置消费者客户端

int CKafkaConsumer::Create()
{std::string errorStr;RdKafka::Conf::ConfResult errorCode;do {// 1、创建配置对象// 1.1、构造 consumer conf 对象m_config = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);if(nullptr == m_config){printf("Create RdKafka Conf failed.\n");break;}// 必要参数1:指定 broker 地址列表errorCode = m_config->set("bootstrap.servers", m_brokers, errorStr);if (RdKafka::Conf::CONF_OK != errorCode) {printf("Conf set(bootstrap.servers) failed, errorStr:%s.\n",errorStr.c_str());break;}// 必要参数2:设置消费者组 iderrorCode = m_config->set("group.id", m_groupID, errorStr);if (RdKafka::Conf::CONF_OK != errorCode) {printf("Conf set(group.id) failed, errorStr:%s.\n",errorStr.c_str());break;}// 设置事件回调m_event_cb = new ConsumerEventCb;errorCode = m_config->set("event_cb", m_event_cb, errorStr);if (RdKafka::Conf::CONF_OK != errorCode) {printf("Conf set(event_cb) failed, errorStr:%s.\n",errorStr.c_str());break;}// 设置消费者组再平衡回调m_rebalance_cb = new ConsumerRebalanceCb;errorCode = m_config->set("rebalance_cb", m_rebalance_cb, errorStr);if (RdKafka::Conf::CONF_OK != errorCode) {printf("Conf set(rebalance_cb) failed, errorStr:%s.\n",errorStr.c_str());break;}// 当消费者到达分区结尾,发送 RD_KAFKA_RESP_ERR__PARTITION_EOF 事件errorCode = m_config->set("enable.partition.eof", "false", errorStr);if (RdKafka::Conf::CONF_OK != errorCode) {printf("Conf set(enable.partition.eof) failed, errorStr:%s.\n",errorStr.c_str());break;}// 每次最大拉取的数据大小errorCode = m_config->set("max.partition.fetch.bytes", "1024000", errorStr);if (RdKafka::Conf::CONF_OK != errorCode) {printf("Conf set(max.partition.fetch.bytes) failed, errorStr:%s.\n",errorStr.c_str());break;}// 设置分区分配策略:range、roundrobin、cooperative-stickyerrorCode = m_config->set("partition.assignment.strategy", "range", errorStr);if (RdKafka::Conf::CONF_OK != errorCode) {printf("Conf set(partition.assignment.strategy) failed, errorStr:%s.\n",errorStr.c_str());break;}// 心跳探活超时时间---1serrorCode = m_config->set("session.timeout.ms", "6000", errorStr);if (RdKafka::Conf::CONF_OK != errorCode) {printf("Conf set(session.timeout.ms) failed, errorStr:%s.\n",errorStr.c_str());break;}// 心跳保活间隔errorCode = m_config->set("heartbeat.interval.ms", "2000", errorStr);if (RdKafka::Conf::CONF_OK != errorCode) {printf("Conf set(heartbeat.interval.ms) failed, errorStr:%s.\n",errorStr.c_str());break;}// 1.2、创建 topic conf 对象m_topicConfig = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);if (nullptr == m_topicConfig) {printf("Create RdKafka Topic Conf failed.\n");break;}// 必要参数3:设置新到来消费者的消费起始位置,latest 消费最新的数据,earliest 从头开始消费errorCode = m_topicConfig->set("auto.offset.reset", "latest", errorStr);if (RdKafka::Conf::CONF_OK != errorCode) {printf("Topic Conf set(auto.offset.reset) failed, errorStr:%s.\n",errorStr.c_str());break;}// 默认 topic 配置,用于自动订阅 topicserrorCode = m_config->set("default_topic_conf", m_topicConfig, errorStr);if (RdKafka::Conf::CONF_OK != errorCode) {printf("Conf set(default_topic_conf) failed, errorStr:%s.\n",errorStr.c_str());break;}// 2、创建 Consumer 对象m_consumer = RdKafka::KafkaConsumer::create(m_config, errorStr);if (nullptr == m_consumer) {printf("Create KafkaConsumer failed, errorStr:%s.\n",errorStr.c_str());break;}printf("Created consumer success, consumerName:%s.\n",m_consumer->name().c_str());return 0;} while (0);Destroy();return -1;
}

订阅主题和分区

std::vector<std::string> topicsVec;
topicsVec.push_back("zd_test_topic_one");
topicsVec.push_back("zd_test_topic_two");RdKafka::ErrorCode errorCode = m_consumer->subscribe(topicsVec);
if (RdKafka::ERR_NO_ERROR != errorCode) 
{printf("Subscribe failed, errorStr:%s\n", RdKafka::err2str(errorCode).c_str());return;
}

拉取消息

// 可放到线程中处理while (m_running) 
{RdKafka::Message *msg = m_consumer->consume(1000); // 1000ms超时if(NULL != msg){// 消费消息ConsumeMsg_(msg, NULL);m_consumer->commitAsync(); delete msg;}
}

处理消息

void KafkaConsumer::ConsumeMsg_(RdKafka::Message *msg, void *opaque)
{switch (msg->err()) {case RdKafka::ERR__TIMED_OUT: // 超时break;case RdKafka::ERR_NO_ERROR:   // 有消息进来printf("Message in, topic:%s, partition:[%d], key:%s, payload:%s\n",msg->topic_name().c_str(), msg->partition(), msg->key()->c_str(), (char *)msg->payload());// 消息处理break;default:break;}
}

提交消费位移

m_consumer->commitAsync(); 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/805830.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

今日arXiv最热大模型论文:Dataverse,针对大模型的开源ETL工具,数据清洗不再难!

引言&#xff1a;大数据时代下的ETL挑战 随着大数据时代的到来&#xff0c;数据处理的规模和复杂性不断增加&#xff0c;尤其是在大语言模型&#xff08;LLMs&#xff09;的开发中&#xff0c;对海量数据的需求呈指数级增长。这种所谓的“规模化法则”表明&#xff0c;LLM的性…

HTML - 请你说一下如何阻止a标签跳转

难度级别:初级及以上 提问概率:55% a标签的默认语义化功能就是超链接,HTML给它的定位就是与外部页面进行交流,不过也可以通过锚点功能,定位到本页面的固定id区域去。但在开发场景中,又避免不了禁用a标签的需求,那么都有哪些方式可以禁用…

【就近接入,智能DNS-Geo DNS ,大揭秘!】

做过后端服务或者网络加速的小伙伴&#xff0c;可能或多或少都听说过&#xff0c;智能DNS或者Geo DNS&#xff0c;就是根据用户的位置&#xff0c;返回离用户最近的服务节点&#xff0c;就近接入&#xff0c;以达到服务提速的效果。 那么大家有没想过&#xff0c;这个背后的原理…

C++:日期类的实现 const修饰 取地址及const取地址操作符重载(类的6个默认成员函数完结篇)

一、日期类的实现 根据之前赋值运算符重载逻辑&#xff0c;我们现在来实现完整的日期类。 1.1 判断小于 上篇博客已经实现: bool operator<(const Date& d) {if (_year < d._year){return true;}else if (_year d._year){if (_month < d._month){return true…

总结C/C++中程序内存区域划分

C/C程序内存分配的几个区域&#xff1a; 1. 栈区&#xff08;stack&#xff09;&#xff1a;在执行函数时&#xff0c;函数内局部变量的存储单元都可以在栈上创建&#xff0c;函数执行结束时这些存储单元自动被释放。栈内存分配运算内置于处理器的指令集中&#xff0c;效率很⾼…

OpenHarmony开发技术:【国际化】实例

国际化 如今越来的越多的应用都走向了海外&#xff0c;应用走向海外需要支持不同国家的语言&#xff0c;这就意味着应用资源文件需要支持不同语言环境下的显示。本节就介绍一下设备语言环境变更后&#xff0c;如何让应用支持多语言。 应用支持多语言 ArkUI开发框架对多语言的…

TypeScript—详解、小案例(配合源代码)

简介&#xff1a;TypeScript是微软开发的 JavaScript 的超集&#xff0c;TypeScript兼容JavaScript&#xff0c;可以载入JavaScript代码然后运行。TypeScript与JavaScript相比进步的地方 包括&#xff1a;加入注释&#xff0c;让编译器理解所支持的对象和函数&#xff0c;编译器…

Web路径专题

文章目录 1.资源定位1.前置条件上下文路径设置 2.上下文路径介绍重点说明 3.资源定位方式资源路径 上下文路径 资源位置a.html定位C.java定位 4.浏览器和服务器解析的区别1.浏览器解析/&#xff08;地址变化&#xff09;2.服务器解析/&#xff08;地址不变&#xff09; 5.带/…

git学习 1

打开自己想要存放git仓库的文件夹&#xff0c;右键打开git bush&#xff0c;用git init命令建立仓库 用 ls -a(表示全都要看&#xff0c;包括隐藏的)可以看到git仓库 也可以用 git clone 接github链接&#xff08;点code选项里面会给链接&#xff0c;结尾是git的那个&#xf…

OpenHarmony南向开发实例:【智能可燃气体检测系统】

样例简介 本项目是基于BearPi套件开发的智能可燃气体检测Demo&#xff0c;该系统内主要由小熊派单板套件和和MQ5可燃气体检测传感器组成。 智能可燃气体检测系统可以通过云和手机建立连接&#xff0c;可以在手机上控制感应的阈值&#xff0c;传感器感知到的可燃气体浓度超过阈…

C++ | Leetcode C++题解之第12题整数转罗马数字

题目&#xff1a; 题解&#xff1a; const string thousands[] {"", "M", "MM", "MMM"}; const string hundreds[] {"", "C", "CC", "CCC", "CD", "D", "DC&qu…

azkaban的写法

先创建一个.job文件和一个.sql文件 sql语法写到一个test名字的文件里&#xff0c;之后job写法如下&#xff1a; typecommand commandhive -f test6.sql 一定要严格写&#xff0c;不管是字母还是空格&#xff0c;单引号中就是sql文件的名字 然后将它们一块打包&#xff0c;启动…

ubuntu系统逻辑卷Logical Volume扩容根分区

Linux LVM详解 https://blog.csdn.net/qq_35745940/article/details/119054949 https://blog.csdn.net/weixin_41891696/article/details/118805670 https://blog.51cto.com/woyaoxuelinux/1870299 LVM&#xff08;Logical Volume Manager&#xff09;逻辑卷管理&#xff0c…

贪心算法|452.用最少数量的箭引爆气球

力扣题目链接 class Solution { private:static bool cmp(const vector<int>& a, const vector<int>& b) {return a[0] < b[0];} public:int findMinArrowShots(vector<vector<int>>& points) {if (points.size() 0) return 0;sort(p…

rk3588开发板上安装ssh服务

目的&#xff1a;实现远程访问和控制&#xff0c;其他主机远程控制rk3588 方法及操作步骤&#xff1a; 1&#xff09;安装&#xff1a;sudo apt install openssh-server 2&#xff09; 查看运行状态 sudo systemctl status ssh 其它主机远程连接该开发板的ip和端口22即可

urwid,一个好用的 Python 库!

更多资料获取 &#x1f4da; 个人网站&#xff1a;ipengtao.com 大家好&#xff0c;今天为大家分享一个好用的 Python 库 - urwid。 Github地址&#xff1a;https://github.com/urwid/urwid Urwid 是一个功能强大的 Python 库&#xff0c;用于创建基于文本的用户界面&#xf…

[23年蓝桥杯H题] 合并石子

问题描述 在桌面从左至右横向摆放着 N 堆石子。每一堆石子都有着相同的颜色&#xff0c;颜 色可能是颜色 0 &#xff0c;颜色 1 或者颜色 2 中的其中一种。 现在要对石子进行合并&#xff0c;规定每次只能选择位置相邻并且颜色相同的两堆 石子进行合并。合并后新堆的相对位置保…

unipush+个推实现消息推送

1.注册个推平台的帐号个推&#xff0c;专业的数据智能服务商-为垂直领域提供数据智能解决方案 2.应用列表中选择新增应用/服务 3.填写下应用信息4.创建好应用后在manifest.json中的sdkConfigs配置上写入appid、appkey、appsecret "sdkConfigs" : {"ad" :…

【Keil5-Boot和APP配置】

Keil5-Boot和App配置 ■ Keil5-Boot和APP配置■ 一&#xff1a;sct文件 sct文件配置■ 二&#xff1a;发布版本不需要在 C/C&#xff0c;Asm&#xff0c;Linker&#xff0c;中添加 CMDDEBUG 宏定义。■ 三&#xff1a;Debug版本需要在Linker添加 --pd"-DCMDDEBUG" 才…

windows版本-idea中下载的java版本在哪

1、点击idea的file-projectStructure 进入&#xff1a; 通过电脑目录进入该目录 找到bin目录&#xff0c;copy该目录地址 copy下来之后设置到系统环境变量中