kafka(四)——生产者流程分析(c++)

前言

  • kafka生产者负责将数据发布到kafka集群的主题;
  • kafka生产者消息发送方式有两种:
    • 同步发送
    • 异步+回调发送

流程

在这里插入图片描述

流程说明:

  • Kafka Producer整体可看作是一个异步处理操作;
  • 消息发送过程中涉及两个线程:main线程和sender线程;
  • main线程负责将消息发送至一个双端队列,sender线程负责从双端队列取消息并发送至kafka broker;

消息可靠性

producer的acks参数表示生产者生产消息时,写入到副本的严格程度。决定了生产者的性能与可靠性

  • 0:生产者发送过来的数据,不等待broker确认,直接发送下一条数据,性能最高,但可能存在丢数据;

在这里插入图片描述

  • 1:生产者发送过来的数据,等待Leader副本确认后发送下一条数据,性能中等;
    在这里插入图片描述

  • -1(all):生产者发送过来的数据,等待所有副本将数据同步后发送下一条数据,性能最慢,安全性最高;

在这里插入图片描述

消息有序性

消息保序策略:按key分区,可以实现局部有序,但这又可能会导致数据倾斜,可根据实际情况选择。

示例:

// 指定消息key,即倒数第二个参数,当有相同的两条消息先后存储同一个key,消费者可按顺序消费到RdKafka::ErrorCode errorCode = m_producer->produce(m_topic,                      // 指定发送到的主题RdKafka::Topic::PARTITION_UA, // 指定分区,如果为PARTITION_UA则通过// partitioner_cb的回调选择合适的分区RdKafka::Producer::RK_MSG_COPY, // 消息拷贝payload,                        // 消息本身len,                            // 消息长度&key,                           // 消息keyNULL);

Main线程与Sender线程

Main线程

流程

  • 创建消息
// librdkafka源码 rdkafka_msg.c/* Create message */
rkm = rd_kafka_msg_new0(rkt, force_partition, msgflags, payload, len,key, keylen, msg_opaque, &err, &errnox, NULL, 0,rd_clock());
if (unlikely(!rkm)) {/* errno is already set by msg_new() */rd_kafka_set_last_error(err, errnox);return -1;
}
  • 选择分区
/* Partition the message */
err = rd_kafka_msg_partitioner(rkt, rkm, 1);
if (likely(!err)) {rd_kafka_set_last_error(0, 0);return 0;
}
  • 调用拦截器
/* Interceptor: unroll failing messages by triggering on_ack.. */
rkm->rkm_err = err;
rd_kafka_interceptors_on_acknowledgement(rkt->rkt_rk,&rkm->rkm_rkmessage);

Sender线程

参数说明

batch.size缓冲区一批数据最大值,默认16k。适当增加该值,可以提高吞吐量,但是如果该值设置太大,会导致数据传输延迟增加。
linger.ms如果数据迟迟未达到batch.size,sender等待linger.time之后就会发送数据。单位ms,默认值是0ms,表示没有延迟。生产环境建议该值大小为5-100ms之间。
acks见“消息可靠性”章节
max.in.flight.requests.per.connection允许最多没有返回ack的次数,默认为5,开启幂等性要保证该值是 1-5的数字。
retries当消息发送出现错误的时候,系统会重发消息。retries表示重试次数。默认是int最大值,2147483647。 如果设置了重试,还想保证消息的有序性,需要设置 MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1否则在重试此失败消息的时候,其他的消息可能发送成功了。
retry.backoff.ms两次重试之间的时间间隔,默认是100ms。
enable.idempotence是否开启幂等性,默认true,开启幂等性。

流程

  • 达到batch.size大小或满足linger.ms时间发送消息;
  • 消息发送至的kafka服务器后,如果kafka没有应答,默认每个broker节点队列最多缓存 5 个请求,与“max.in.flight.requests.per.connection”参数有关;
  • 如配置了“retries”、“ retry.backoff.ms”参数,消息发送失败由kafka内部自动重试,无需手动在回调函数中重试;

同步和异步流程

同步流程

流程说明

  • 通过produce方法将消息推送至双端队列;
  • 通过flush方法等待发送结果,如outq_len()大于0,说明存在未发送成功的消息;

代码示例

int KafkaProducer::PushMessage(const std::string &str, const std::string &key)
{int32_t len = (int32_t)str.length();void *payload = const_cast<void *>(static_cast<const void *>(str.data()));// produce 方法,生产和发送单条消息到 Broker// 如果不加时间戳,内部会自动加上当前的时间戳RdKafka::ErrorCode errorCode = m_producer->produce(m_topic,                      // 指定发送到的主题RdKafka::Topic::PARTITION_UA, // 指定分区,如果为PARTITION_UA则通过// partitioner_cb的回调选择合适的分区RdKafka::Producer::RK_MSG_COPY, // 消息拷贝payload,                        // 消息本身len,                            // 消息长度&key,                           // 消息keyNULL);if (RdKafka::ERR_NO_ERROR != errorCode) {// kafka 队列满,等待 100 msif (RdKafka::ERR__QUEUE_FULL == errorCode) {m_producer->poll(100);}return -1;}// 同步等待200msm_producer->flush(200);if(m_producer->outq_len() > 0)  // 用于调试{printf("Existed not send message.size:%d\n", m_producer->outq_len());return -1;}return 0;
}

异步流程

流程说明

  • 设置生产者投递报告回调
  • 设置生产者自定义分区策略回调
  • 消息发送

代码示例

  • 设置生产者投递回调
// 生产者投递报告回调
class ProducerDeliveryReportCb : public RdKafka::DeliveryReportCb 
{
public:void dr_cb(RdKafka::Message& message){	if (message.err())   // 出错回调{// TODO} else                 // 正常回调{  // TODO}}
};// 设置生产者投递报告回调
m_dr_cb = new ProducerDeliveryReportCb; // 创建投递报告回调
errCode = m_config->set("dr_cb", m_dr_cb, errorStr);    // 异步方式发送数据
if (RdKafka::Conf::CONF_OK != errCode) 
{printf("Conf set(dr_cb) failed, errorStr:%s", errorStr.c_str());break;
}
  • 设置生产者自定义分区策略回调
// 生产者自定义分区策略回调:partitioner_cb
class HashPartitionerCb : public RdKafka::PartitionerCb 
{
public:// @brief 返回 topic 中使用 key 的分区,msg_opaque 置 NULL// @return 返回分区,(0, partition_cnt)int32_t partitioner_cb(const RdKafka::Topic *topic, const std::string *key,int32_t partition_cnt, void *msg_opaque) {// 用于自定义分区策略:这里用 hash。例:轮询方式:p_id++ % partition_cntint32_t partition_id = generate_hash(key->c_str(), key->size()) % partition_cnt;return partition_id;}private:// 自定义哈希函数 static inline unsigned int generate_hash(const char *str, size_t len) {unsigned int hash = 5381;for (size_t i = 0; i < len; i++)hash = ((hash << 5) + hash) + str[i];return hash;}
};// 设置生产者自定义分区策略回调
m_partitioner_cb = new HashPartitionerCb; // 创建自定义分区投递回调
errCode = m_topicConfig->set("partitioner_cb", m_partitioner_cb, errorStr);
if (RdKafka::Conf::CONF_OK != errCode) 
{printf("Conf set(partitioner_cb) failed, errorStr:%s", errorStr.c_str());break;
}
  • 消息发送

注意:此处produce执行成功不代表消息发送成功,需根据dr_cb消息回调结果判断消息是否发送成功。

int KafkaProducer::PushMessage(const std::string &str, const std::string &key)
{int32_t len = (int32_t)str.length();void *payload = const_cast<void *>(static_cast<const void *>(str.data()));// produce 方法,生产和发送单条消息到 Broker// 如果不加时间戳,内部会自动加上当前的时间戳RdKafka::ErrorCode errorCode = m_producer->produce(m_topic,                      // 指定发送到的主题RdKafka::Topic::PARTITION_UA, // 指定分区,如果为PARTITION_UA则通过// partitioner_cb的回调选择合适的分区RdKafka::Producer::RK_MSG_COPY, // 消息拷贝payload,                        // 消息本身len,                            // 消息长度&key,                           // 消息keyNULL);// 轮询处理m_producer->poll(0);if (RdKafka::ERR_NO_ERROR != errorCode) {// kafka 队列满,等待 100 msif (RdKafka::ERR__QUEUE_FULL == errorCode) {m_producer->poll(100);}return -1;}return 0;
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/801075.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Java变量详解

​ 这里写目录标题 第一章、Java中的变量分类1.1&#xff09;变量分类1.2&#xff09;成员变量分类1.3&#xff09;成员变量和局部变量的区别 第二章、成员变量详解2.1&#xff09;成员变量作用域/权限修饰符2.2&#xff09;成员变量和成员属性的区别2.3&#xff09;成员变量初…

为什么 GraphQL 是构建微服务的更好选择

关于使用REST还是GraphQL来构建微服务哪个更好&#xff0c;一直存在争论。这两种技术都有其支持者和批评者&#xff0c;但当涉及微服务架构的特定需求时&#xff0c;GraphQL 成为明显的领先者。原因如下。 了解 RESTful 的关注点 虽然 REST 多年来一直是首选 API 风格&#x…

蓝桥杯 历届真题 时间显示【第十二届】【省赛】【C组】

资源限制 内存限制&#xff1a;256.0MB C/C时间限制&#xff1a;1.0s Java时间限制&#xff1a;3.0s Python时间限制&#xff1a;5.0s #include<bits/stdc.h> #define int long long using namespace std; const int N 1e510; int n,m,t,d; int a[2][N],b[N]; //…

数据库关系模式三元及以上分解无损连接判断(表格法)

例题 1.首先构造初始表&#xff0c;如下表所示。 A B C D E ABC a1 a2 a3 b14 b15 CD b21 b22 a3 a4 b15 DE b31 b32 b33 a4 a5 2.遍历函数依赖&#xff0c;对AB→C&#xff0c;因各元组的第一、二列没有相同的分量&#xff0c;所以表不改变。 3.由C→D…

chabot项目介绍

项目介绍 整体的目录如下所示&#xff1a; 上述的项目结构中出了model是必须的外&#xff0c;其他的都可以根据训练的代码参数传入进行调整&#xff0c;有些不需要一定存在data train.pkl:对原始训练语料进行tokenize之后的文件,存储一个list对象&#xff0c;list的每条数据表…

javaWeb物流信息网的设计与实现

摘要 本文讲述了基于JSP物流信息网的设计与实现。该系统使用java语言开发&#xff0c;使系统具有更好的平台性和可扩展性。 该系统实现了用户登录、注册、查询快递信息、快递公司注册成为合作伙伴以及系统管理员对信息进行管理等功能。系统的主要界面会将所有的服务排列好&…

【java基础-高级篇十】、注解

自定义目录 一、什么是注解二、常见的注解示例三、自定义 annotation四、JDK 中的元注解五、利用反射获取注解信息六、jdk8之后注解的新特性1、可重复注解2、类型注解 一、什么是注解 加在包,类, 构造器, 方法, 成员变量, 参数, 局部变量声明上面的特殊标记就称为注解未来的开…

力扣2- 两数相加

给你两个 非空 的链表&#xff0c;表示两个非负的整数。它们每位数字都是按照 逆序 的方式存储的&#xff0c;并且每个节点只能存储 一位 数字。 请你将两个数相加&#xff0c;并以相同形式返回一个表示和的链表。 你可以假设除了数字 0 之外&#xff0c;这两个数都不会以 0 …

vmware和ubuntu的问题与解决

1.问题与对策 最近使用vmware安装ubuntu16和ubuntu20&#xff0c;遇到了挺多的问题&#xff0c;如下 ubuntu在用过多次后&#xff0c;重启后登录用户名后会出现花屏的现象。 解决方案如下 在键盘上同时按键&#xff1a;Ctrl Alt F4&#xff0c;进入命令行模式&#xff0c;…

基于深度学习的电动自行车头盔佩戴检测系统

文章目录 1. 文档说明2. 运行环境说明2.1 硬件配置2.2 软件配置2.3 程序依赖库 3. 基本环境配置3.1 软件安装3.1.1 集成开发环境安装与配置3.1.2 数据库安装与配置3.1.3 编程语言安装3.1.4 CUDA和cuDNN安装与配置3.1.5 机器学习库安装 3.2 依赖库安装 4. 运行程序资源下载地 1.…

Binder通信模型

Binder是Android最主要的进程间通信方式&#xff0c;下面简单认识一下它的通信模型&#xff0c;如下图所示 服务管理进程启动时会变成上下文管理者&#xff0c;在驱动层创建一个全局的binder_node对象binder_context_mgr_node记录进程信息&#xff0c;BpServiceManager中BpBind…

HarmonyOS实战开发-如何实现跨应用数据共享实例。

介绍 本示例实现了一个跨应用数据共享实例&#xff0c;分为联系人&#xff08;数据提供方&#xff09;和联系人助手&#xff08;数据使用方&#xff09;两部分&#xff1a;联系人支持联系人数据的增、删、改、查等功能&#xff1b;联系人助手支持同步联系人数据&#xff0c;当…

血细胞检测数据集 | 用于血细胞计数+检测的小规模数据集_已经整理成VOC格式_总共410张图

项目应用场景 面向血细胞检测计数数据集&#xff0c;已经整理成 VOC 格式&#xff0c;可以直接用于目标检测算法的训练&#xff0c;如 YOLO 等目标检测算法的训练。血细胞检测数据集图片质量好&#xff0c;可直接训练出一个血细胞检测模型&#xff0c;或者作为血细胞检测数据集…

AI智能分析盒子在工地的应用,提高工地管理效率和安全性

工地ai智能分析盒子是一种基于人工智能视觉分析技术的人工智能盒子&#xff0c;旨在提升工地作业区域的管理效率和保障作业人员的安全。通过最前沿的AI视觉算法、大数据&#xff0c;能够实时监控工地现场视频流画面&#xff0c;对施工工地人员的工作着装及日常作业行为进行规范…

1.C++编译过程

1.Linux 如何安装 C 编译环境 &#xff08;1&#xff09;安装 gcc 下载&#xff1a;sudo apt install gcc g 安装&#xff1a;gcc/g -v/--version其中 gcc 用于编译 C &#xff0c;g 用于编译 C 版本要 >4.8.5 这样才能使用 C11 的特性 &#xff08;2&#xff09; 使用…

【QT学习】4.对话框学习,浮动窗口,模态对话框,非模态对话框,消息框,文件对话框

1.浮动窗口 代码&#xff1a; //制作核心控件&#xff1a;文本编辑框QTextEdit* pTextEditnew QTextEdit;//制作浮动控件connect(pMenu1,&QMenu::triggered,[](QAction* pAction){qDebug()<<pAction->text()<<endl;if(pAction->text()"浮动控件&quo…

Vulnhub:DEVCONTAINER: 1

目录 信息收集 arp nmap nikto whatweb WEB 信息收集 dirmap 文件上传 提权 系统信息收集 横向提权 信息泄露 get root 信息收集 arp ┌──(root㉿ru)-[~/kali/vulnhub] └─# arp-scan -l Interface: eth0, type: EN10MB, MAC: 00:50:56:2f:dd…

YUM仓库和编译安装

目录 一.YUM仓库搭建 1.简介&#xff1a; 2.搭建思路&#xff1a; 3.实验&#xff1a;单机yum的创建 二.编译安装 1.简介 2.安装过程 3.实验&#xff1a;编译安装nginx 一.YUM仓库搭建 1.简介&#xff1a; yum是一个基于RPM包&#xff08;是Red-Hat Package Manager红…

超级agent的端语言模型Octopus v2: On-device language model for super agent

大型语言模型&#xff08;LLMs&#xff09;在函数调用方面展现出卓越的应用潜力&#xff0c;特别是针对Android API的定制应用。与那些需要详尽描述潜在函数参数、有时甚至涉及数万个输入标记的检索增强生成&#xff08;RAG&#xff09;方法相比&#xff0c;Octopus-V2-2B在训练…

fastlio2 保存每帧的点云和每帧的里程计为单独的文件做后端回环优化和手动回环优化

为了 提供数据做后端回环优化和手动回环优化,需要保存每帧的点云和每帧的里程计为单独的文件,并且需要保存的名字为ros时间戳。 效果很好,比我自己写的手动回环模块好用 // This is an advanced implementation of the algorithm described in the // following paper: /…