深入浅出分析kafka客户端程序设计 ----- 消费者篇----万字总结

1. Kafka 消费者的逻辑

  • 配置消费者客户端参数。
  • 创建相应的消费者实例。
  • 订阅主题。
  • 拉取消息并消费;
  • 提交消息位移;
  • 关闭消费者实例;

2 Kafka 的C++ API

2.1 RdKafka::Conf

见生成者实现文章。

2.2 RdKafka::Event

见生成者实现文章。

2.3 RdKafka::EventCb

见生成者实现文章。

2.4 RdKafka::TopicPartition

static TopicPartition * create(const std::string &topic, int partition);
//创建一个TopicPartition对象。static TopicPartition *create (const std::string &topic, int partition,int64_t offset);
//创建TopicPartition对象。static void destroy (std::vector<TopicPartition*> &partitions);
//销毁所有TopicPartition对象。const std::string & topic () const;
//返回Topic名称。int partition ();
//返回分区号。int64_t offset();
//返回位移。void set_offset(int64_t offset);
//设置位移。ErrorCode err();
//返回错误码。

2.5 RdKafka::RebalanceCb

virtual void rebalance_cb(RdKafka::KafkaConsumer *consumer, RdKafka::ErrorCode err, std::vector< TopicPartition * >&partitions)=0;

用于RdKafka::KafkaConsunmer的组再平衡回调函数;注册rebalance_cb回调函数会关闭rdkafka的自动分区赋值和再分配并替换应用程序的rebalance_cb回调函数

 再平衡回调函数负责对基于RdKafka::ERR_ASSIGN_PARTITIONS和RdKafka::ERR_REVOKE_PARTITIONS事件更新rdkafka的分区分配,也能处理任意前两者错误除外其它再平衡失败错误。对于RdKafka::ERR_ASSIGN_PARTITIONS和RdKafka::ERR_REVOKE_PARTITIONS事件之外的其它再平衡失败错误,必须调用unassign()同步状态。
没有再平衡回调函数,rdkafka也能自动完成再平衡过程,但注册一个再平衡回调函数可以使应用程序在执行其它操作时拥有更大的灵活性,例如从指定位置获取位移或手动提交位移。


C++封装API:

class ConsumerRebalanceCb : public RdKafka::RebalanceCb
{
private:static void printTopicPartition (const std::vector<RdKafka::TopicPartition*>&partitions)        // 打印当前获取的分区{for (unsigned int i = 0 ; i < partitions.size() ; i++)std::cerr << partitions[i]->topic() <<"[" << partitions[i]->partition() << "], ";std::cerr << "\n";}public:void rebalance_cb (RdKafka::KafkaConsumer *consumer,RdKafka::ErrorCode err,std::vector<RdKafka::TopicPartition*> &partitions){std::cerr << "RebalanceCb: " << RdKafka::err2str(err) << ": ";printTopicPartition(partitions);if (err == RdKafka::ERR__ASSIGN_PARTITIONS){consumer->assign(partitions);partition_count = (int)partitions.size();}else{consumer->unassign();partition_count = 0;}}
private:int partition_count;
};

2.6 RdKafka::Message

见生成者实现文章。

2.7 RdKafka::KafkaConsumer(核心)

KafkaConsumer是高级API,要求Kafka 0.9.0以上版本,当前支持range和roundrobin分区分配策略。

static KafkaConsumer * create(Conf *conf, std::string &errstr);
创建KafkaConsumer对象,conf对象必须配置Consumer要加入的消费者组。使用KafkaConsumer::close()进行关闭。ErrorCode assignment(std::vector< RdKafka::TopicPartition * > &partitions);
返回由RdKafka::KafkaConsumer::assign() 设置的当前分区。ErrorCode subscription(std::vector< std::string > &topics);
返回由RdKafka::KafkaConsumer::subscribe() 设置的当前订阅Topic。ErrorCode subscribe(const std::vector< std::string > &topics);
更新订阅Topic分区。ErrorCode unsubscribe();
将当前订阅Topic取消订阅分区。ErrorCode assign(const std::vector< TopicPartition * > &partitions);
将分配分区更新为partitions。ErrorCode unassign();
停止消费并删除当前分配的分区。Message * consume(int timeout_ms);
消费消息或获取错误事件,触发回调函数,会自动调用注册的回调函数,包括RebalanceCb、EventCb、OffsetCommitCb等。需要使用delete释放消息。应用程序必须确保consume在指定时间间隔内调用,为了执行等待调用的回调函数,即使没有消息。当RebalanceCb被注册时,在需要调用和适当处理内部Consumer同步状态时,确保consume在指定时间间隔内调用极为重要。应用程序必须禁止对KafkaConsumer对象调用poll函数。
如果RdKafka::Message::err()是ERR_NO_ERROR,则返回正常的消息;如果RdKafka::Message::err()是ERR_NO_ERRO,返回错误事件;如果RdKafka::Message::err()是ERR_TIMED_OUT,则超时。ErrorCode commitSync();
提交当前分配分区的位移,同步操作,会阻塞直到位移被提交或提交失败。如果注册了RdKafka::OffsetCommitCb回调函数,其会在KafkaConsumer::consume()函数内调用并提交位移。ErrorCode commitAsync();
异步提交位移。ErrorCode commitSync(Message *message);
基于消息对单个topic+partition对象同步提交位移。virtual ErrorCode commitSync (std::vector<TopicPartition*> &offsets) = 0;
对指定多个TopicPartition同步提交位移。ErrorCode commitAsync(Message *message);
基于消息对单个TopicPartition异步提交位移。virtual ErrorCode commitAsync (const std::vector<TopicPartition*> &offsets) = 0;
对多个TopicPartition异步提交位移。ErrorCode close();
正常关闭,会阻塞直到四个操作完成(触发避免当前分区分配的局部再平衡,停止当前赋值消费,提交位移,离开分组)virtual ConsumerGroupMetadata *groupMetadata () = 0;
返回本Consumer实例的Consumer Group的元数据。ErrorCode position (std::vector<TopicPartition*> &partitions)
获取TopicPartition对象中当前位移,会别填充TopicPartition对象的offset字段。ErrorCode seek (const TopicPartition &partition, int timeout_ms)
定位TopicPartition的Consumer到位移。timeout_ms为0,会开始Seek并立即返回;timeout_ms非0,Seek会等待timeout_ms时间。ErrorCode offsets_store (std::vector<TopicPartition*> &offsets)
为TopicPartition存储位移,位移会在auto.commit.interval.ms时提交或是被手动提交。enable.auto.offset.store属性必须设置为fasle。

3 Kafka 消费者客户端开发


3.1 必要的参数配置(bootstrap.servers)


在创建消费者的时候以下以下三个选项是必选的:

bootstrap.servers:指定 broker (kafka服务器)的地址清单,清单里不需要包含所有的 broker(kafka) 地址,生产者会从给定的 broker 里查找 broker 的信息。不过建议至少要提供两个 broker 的信息作为容错。
group.id:consumer group 是 kafka 提供的可扩展且具有容错性的消费者机制。既然是一个组,那么组内必然可以有多个消费者或消费者实例(consumer instance),它们共享一个公共的 ID,即group ID。组内的所有消费者协调在一起来消费订阅主题(subscribed topics)的所有分区(partition)。
auto.offset.reset:这个参数是针对新的 groupid 中的消费者而言的,当有新 groupid 的消费者来

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/205519.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C++ day49 买卖股票的最佳时机

题目1&#xff1a;121 买卖股票的最佳时机 题目链接&#xff1a;买卖股票的最佳时机 对题目的理解 prices[i]表示一支股票在第i天的价格&#xff0c;只能在某一天买入这支股票&#xff0c;并在之后的某一天卖出该股票&#xff0c;从而获得最大利润&#xff0c;返回该最大值&…

IPTABLES(一)

文章目录 1. iptables基本介绍1.1 什么是防火墙1.2 防火墙种类1.3 iptables介绍1.4 包过滤防火墙1.5 包过滤防火墙如何实现 2. iptables链的概念2.1 什么是链2.2 iptables有哪些链 3. iptables表的概念3.1 什么是表3.2 表的功能3.3 表与链的关系 4. iptables规则管理4.1 什么是…

Autosar通信实战系列05-CanNM模块进阶常见问题思考

本文框架 前言1. UDS 0x28服务控制Nm报文收发后对状态机有影响?2. 节点网络启动后第一帧是否必须是网络管理报文?3. 主动唤醒后发送的第一帧报文为NM报文如何配置?4. CanNmMsgCycleOffset的使用场景?5. 什么情况下CBV中RepeatMessageRequest Bit置位?6. 主动(本地)唤醒与…

【华为数据之道学习笔记】3-4主数据治理

主数据是参与业务事件的主体或资源&#xff0c;是具有高业务价值的、跨流程和跨系统重复使用的数据。主数据与基础数据有一定的相似性&#xff0c;都是在业务事件发生之前预先定义&#xff1b;但又与基础数据不同&#xff0c;主数据的取值不受限于预先定义的数据范围&#xff0…

MySql概述及其性能说明

MySQL是一种开源的关系型数据库管理系统&#xff0c;由瑞典MySQL AB公司开发&#xff0c;现属于Oracle公司。MySQL是最流行的开源数据库之一&#xff0c;被广泛地应用于Web开发中。MySQL提供了一个高度稳定可靠的数据存储解决方案&#xff0c;同时也可以很容易地跨平台运行。My…

LeetCode860. Lemonade Change

文章目录 一、题目二、题解 一、题目 At a lemonade stand, each lemonade costs $5. Customers are standing in a queue to buy from you and order one at a time (in the order specified by bills). Each customer will only buy one lemonade and pay with either a $5,…

2023年广东工业大学腾讯杯新生程序设计竞赛

E.不知道叫什么名字 题意&#xff1a;找一段连续的区间&#xff0c;使得区间和为0且区间长度最大&#xff0c;输出区间长度。 思路&#xff1a;考虑前缀和&#xff0c;然后使用map去记录每个前缀和第一次出现的位置&#xff0c;然后对数组进行扫描即可。原理&#xff1a;若 s …

【Android】Retrofit创建实例源理

文章目录 Retrofit类Builder内部类baseUrl()addConverterFactory()client() 对retrofit的创造实例过程进行源码剖析。 在说之前&#xff0c;介绍一个api&#xff0c;用于判断对象是否为空&#xff0c;然后执行&#xff0c;否则抛出异常&#xff0c;该api在下边很多地方都会出现…

Linux篇之在Centos环境下搭建Nvidia显卡驱动

一、前提条件 1、首先确认内核版本和发行版本&#xff0c;再确认显卡型号 uname -a // Linux localhost.localdomain 4.18.0-408.el8.x86_64 #1 SMP Mon Jul 18 17:42:52 UTC 2022 x86_64 x86_64 x86_64 GNU/Linux1.2 cat /etc/redhat-release // CentOS Stream release 81.3…

【S2ST】Direct Speech-to-Speech Translation With Discrete Units

【S2ST】Direct Speech-to-Speech Translation With Discrete Units AbstractIntroductionRelated workModelSpeech-to-unit translation (S2UT) modelMultitask learningUnit-based vocoder ExperimentsDataSystem setupBaselineASRMTTTSS2TTransformer Translatotron Evaluat…

Python Jinja2 库的无限可能性

更多资料获取 &#x1f4da; 个人网站&#xff1a;ipengtao.com Jinja2&#xff0c;作为Python中最流行的模板引擎之一&#xff0c;为开发者提供了强大的工具&#xff0c;用于在Web应用和其他项目中生成动态内容。本文将深入研究 Jinja2 库的各个方面&#xff0c;提供更丰富的…

SQL CREATE INDEX 语句- 提高数据库检索效率的关键步骤

SQL CREATE INDEX 语句 SQL CREATE INDEX 语句用于在表中创建索引。 索引用于比其他方式更快地从数据库中检索数据。用户无法看到索引&#xff0c;它们只是用于加速搜索/查询。 注意&#xff1a; 使用索引更新表比不使用索引更新表需要更多的时间&#xff08;因为索引也需要…

数据科学:Matplotlib、Seaborn笔记

数据科学&#xff1a;Numpy、Pandas 数据科学&#xff1a;Matplotlib、Seaborn笔记 数据科学&#xff1a;Numpy、Pandas、Matplotlib、Seaborn、Scipy、Scikit-Learn 三、Matplotlib1.Matplotlib subplots函数2.tight_layout()函数3.Matplotlib grid()设置网格格式4.fill_bet…

<蓝桥杯软件赛>零基础备赛20周--第9周--前缀和与差分

报名明年4月蓝桥杯软件赛的同学们&#xff0c;如果你是大一零基础&#xff0c;目前懵懂中&#xff0c;不知该怎么办&#xff0c;可以看看本博客系列&#xff1a;备赛20周合集 20周的完整安排请点击&#xff1a;20周计划 每周发1个博客&#xff0c;共20周&#xff08;读者可以按…

大屏可视化基础学习-通用可套用模板【大屏可视化项目案例-15】

🎉🎊🎉 你的技术旅程将在这里启航! 🚀🚀 本专栏包括但不限于大屏可视化、图表可视化等等。订阅专栏用户在文章底部可下载对应案例源码以供大家深入的学习研究。 🎓 每一个案例都会提供完整代码和详细的讲解,不论你是初学者还是资深开发者,这里都有适合你的内容。…

从零开始的c语言日记day41——自定义类型结构体

一、结构体的声明 1.1结构的基础知识 结构是一些值的集合&#xff0c;这些值称为成员变量。结构的每个成员可以是不同类型的变量。 Tag结构体标签 Member-list成员列表-里面可以有很多成员 Variable-list变量列表 结构体类型的定义方式 S1&#xff0c;s2是struct stu类型的…

黑苹果之显卡篇

一、什么是显卡 显卡GPU&#xff08;Video card、Display card、Graphics card、Video adapter&#xff09;是个人计算机基础的组成部分之一&#xff0c;将计算机系统需要的显示信息进行转换驱动显示器&#xff0c;并向显示器提供逐行或隔行扫描信号&#xff0c;控制显示器的正…

python数据分析

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 pandas统计分析基础实训 实训1 读取并查看某地区房屋销售数据的基本信息1. 训练要点2. 需求说明3.实现思路及步骤 实训2 提取房屋售出时间信息并描述房屋价格信息1. 训练要点…

数据结构 图的广度优先搜索和深度优先搜索

一、广度优先搜索 广度优先搜索等价于树的层次遍历&#xff0c;将起点的每一层进行遍历 当这一层结点全部被遍历完时&#xff0c;再遍历下一层次&#xff0c;从图中可以根据距离遍历起点的长度进行层次选择 例&#xff1a; 以a结点作为开始结点 a的下一层次有b c e三个结点 所以…

nginx 一键切换停机维护页面 —— 筑梦之路

背景说明 进行停机维护或者系统升级等操作&#xff0c;会影响到用户使用&#xff0c;如果停机维护期间用户未看到停机维护的通知&#xff0c;仍去访问系统&#xff0c;会提示默认不太友好的访问错误界面 &#xff0c;这时如果在维护的时候直接展示停机公告的具体信息&#xff0…