Kafka_02_Producer详解

Kafka_02_Producer详解

  • Producer
    • ProducerRecord
    • Send&Close
    • 实现原理
      • ProducerInterceptor
      • Serializer
      • Partitioner
    • 事务

Producer

Producer(生产者): 生产并发送消息到Broker(推送)

  1. Producer是多线程安全的(建议通过池化以提高性能)
  2. Producer实例后可发送多条消息(可对应多个ProducerRecord)

// 0.9之后的版本是基于Java实现(之前是Scala实现)


Producer客户端发送消息大致逻辑:

  1. 配置Producer客户端参数并创建该Producer实例
  2. 构建需发送的消息
  3. 发送构建的消息
  4. 关闭实例

构造Producer必填的3个参数:

参数说明
bootstrap.servers引导程序的服务地址
格式: 地址1:端口1,地址N:端口N
(建议指定两个以上的Broker地址以保证稳定性, 且使用主机名形式)
key.serializer发送时对Key调用的序列化器
Broker仅能接受字节数组形式的消息byte[]
value.serializer发送时对Value调用的序列化器
Broker仅能接受字节数组形式的消息byte[]

// 序列化器必须以全限定名方式指定, Java的ProducerConfig类中包含所有的配置参数


ProducerRecord

ProducerRecord(构建消息): Producer每次发送的消息体

  1. ProducerRecord由多个属性构成(Topic和消息是基础属性)
  2. ProducerRecord有多个构造方法(指定属性的个数)
  3. 可根据不同需求创建特定ProducerRecord

ProducerRecord定义:

public class ProducerRecord<K, V> {private final String topic;      // Topic(必填)private final Integer partition; // Partition// 消息头部(0.11版本引入)// 指定与应用相关信息(可忽略)private final Headers headers;// 键(附加信息)// 其会用于计算Partition(二次归类)private final K key;// 值(消息体, 必填)// 为空则代表: 墓碑消息private final V value;// 消息时间戳// 细分为CreateTime(消息创建时间)和LogAppendTime(追加日志时间)private final Long timestamp;......
}

Send&Close

Send(发送消息): Producer构建ProducerRecord之后发送给Broker

  1. 发送模式: 发后既忘(fire-and-forget)、同步(sync)、异步(async)
  2. 发送模式默认为异步(可通过获取返回值的方法以阻塞等待实现同步)
  3. 返回值通常为发送消息的元数据(Topic、Partition、偏移量和时间戳等)

Send()方法的定义:

public Future<RecordMetadata> send(ProducerRecord<K, V> record);public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback);
  1. 可通过Future的get()方法阻塞实现同步(返回RecordMetadata对象)
  2. Send()方法需配合try/catch(发送成功或发生异常)
  3. 发送导致的异常分为: 重试异常、不可重试异常

// 不可重试异常发生时会直接抛出并结束


常见的重试异常为:

可重试异常说明
NewworkException网络异常
LeaderNotAvailableException副本的leader不可用
(可能正在选举leader)
UnknownTopicOrPartitionExceptionTopic或Partition异常
NotEnoughReplicasException副本数量不足
NotCoordinatorException协调器异常

Send()方法中的Callback定义:

public interface Callback {void onCompletion(RecordMetadata var1, Exception var2);
}
  1. var1和var2参数互斥(两者必有个为null,后者代表异常)
  2. 若两个消息对相同Partition发送消息, 则按发送顺序调用Callback

Close(结束发送):回收Producer实例

  1. 发送结束后务必回收Producer实例(防止资源泄漏)
  2. Close默认会阻塞等待之前所有的发送请求完成之后再回收
  3. 可指定关闭的超时时间(超出该事件则强行回收, 不建议指定)

Close()方法的定义:

public void close();public void close(long timeout, TimeUnit timeUnit);

实现原理

Producer的发送消息由两个线程完成:

  1. 主线程: 构建并处理消息后发送至RecordAccumulator
  2. Sender线程: 从RecordAccumulator获取消息, 并发送至Broker

如: Producer发送消息链路图

image

  1. RecordAccumulator: 双端队列缓存待发送ProducerBatch以减少网络影响
  2. ProducerBatch: 包含任意多个待发送的ProducerRecord(消息批次)
  3. Request: Kafka支持的各种请求协议
  4. InFlightRequests: 缓存已发送但未响应的Request

// Interceptor和Partitioner可选择性处理, 但必须经Serializer处理


Producer发送ProducerRecord的流程:

  1. 主线程将ProducerRecord加工处理后发送至RecordAccumulator尾部
  2. RecordAccumulator根据ProducerRecord分区选择对应的ProducerBatch
  3. RecordAccumulator根据内存复用原则和ProducerBatch大小决定是否新建
  4. Sender线程从RecordAccumulator头部获取ProducerBatch
  5. <分区, <Deque<ProducerBatch>>形式变为<Node, List<ProducerBatch>>
  6. 再根据各种协议请求转换为<Node, Request>形式
  7. 发送前以Map<nodeId, Deque<Request>>缓存Request
  8. 返回发送后的响应并清理InFlightRequests和RecordAccumulator

// 形式转换是为完成应用逻辑层到网络I/O层的转换


RecordAccumulator内存复用原则:

  1. RecordAccumulator通过java.io.ByteBufferBufferPool实现内存复用
  2. 若内存申请不超过指定大小, 则申请指定大小并放置于BufferPool
  3. 若内存申请超过指定大小, 则申请该内存并再使用后直接释放

// BufferPool可避免频繁的申请和释放内存


InFlightRequest中包含leastLoadedNode

  1. leastLoadedNode: 负载最小的Broker(未确认请求最少的)
  2. leastLoadedNode常用于元数据请求和Consumer组播协议的交互
  3. leastLoadedNode由Sender线程根据指定过期时间维护(主线程也可访问)

// 元数据: Broker、Topic、Partition、leader和follower副本所在的Broker等


如: Sender线程维护leatLoadedNode信息

  1. Sender线程检查元数据是否过期(默认5m)
  2. 超出则挑出leastLoadedNode, 向该Broker发送MetadataRequest请求
  3. 获取结果后将其结果存入InFlightRequests中, 并更新元数据的过期时间

ProducerInterceptor

ProducerInterceptor(拦截器): 消息发送前/后的进行的操作

  1. 不建议通过ProducerInterceptor修改topic、key和partition
  2. 可指定多个ProducerInterceptor(拦截链按配置时顺序执行)
  3. 可通过interceptor.classes参数指定Producer所使用的ProducerInterceptor

ProducerInterceptor定义:

public interface ProducerInterceptor<K, V> extends Configurable {// 发送前进行的操作public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);// 发送后被应答之后或失败进行的操作// 优先于Send()方法中定义的Callback前执行// 由于该方法运行于Producer的IO线程中, 应简洁public void onAcknowledgement(RecordMetadata metadata, Exception exception);// 关闭拦截器public void close();
}

// 抛出的任何异常都会被记录到日志中, 并不再向上抛


Serializer

Serializer(序列化器): 将特定数据转换成字节数组(byte[])

  1. Broker仅能接受字节数组形式的数据(接收后会对其反序列化)
  2. Producer使用的Serializer需和Consumer使用的反序列化器需对应
  3. Producer指定Serializer时, 需通过全限定名方式指定(类的完整路径)

Serializer定义:

public interface Serializer<T> extends Closeable {// 配置序列化器// 常用于指定编码类型(默认UTF-8)void configure(Map<String, ?> configs, boolean isKey);// 执行序列化byte[] serialize(String topic, T data);// 关闭序列化器// 需保证幂等性void close();
}

// 不建议使用自定义Serializer或DeSerializer, 会增加耦合度


Partitioner

Partitioner(分区器): ProducerRecord分区的默认规则

  1. ProducerRecord中指定partition字段, 则略过Partitioner
  2. Partitioner的分区计算受Topic数量的影响(已分配的不受)
  3. 可通过partitioner.class参数指定Producer所使用的Partitioner

Partitioner定义:

public interface Partitioner extends Configurable, Closeable {// 计算并返回分区号public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);// 关闭分区器public void close();
}public interface Configurable {// 获取配置信息并初始化数据void configure(Map<String, ?> configs);
}

默认的Partitioner: org.apache.kafka.clients.producer.internals.DefaultPartitioner

  1. close()方法默认为空
  2. 消息为null时, 则以轮询的方式分配可用的分区号
  3. 消息不为null时, 则进行Hash计算(MurmurHash2算法)

// 消息相同的情况下会写入相同的分区(存在消息互相覆盖的情况)


事务

事务(Transaction): Producer操作的最小原子单位(可跨Partition)

  1. 开启事务时, 必须也需开启幂等性(enable.idempotence)
  2. 开启事务时必须指定事务ID(若事务ID重复, 将结束被覆盖的事务并抛出异常)
  3. 只能使事务处于以下两种状态(否则将抛出异常): COMMIT、ABORT
  4. 事务开启后需关闭自动位移提交, 也不能位移消费

Producer中常用的事务方法:

// 初始化事务
void initTransactions();// 开启事务
void beginTransaction();// 事务内的位移提交
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId)// 提交事务
void commitTransaction();// 终止事务(回滚)
void abortTransaction();

事务协调器(TransactionCoordinator): 负责事务中的各类操作

  1. 每个Producer都对应个事务协调器, 由其负责Producer中各类请求
  2. 事务协调器会将事务的信息都存储至内部Toipc的__transaction_state

如: 事务的执行流程

image

  1. 查找事务协调器: 找到事务协调器所在的Broker并建立连接(同时查找Partition)
  2. 获取PID: 通过InitProducerIdRequest请求获取该事务ID
  3. 执行事务: 通过各类请求处理Record并将数据存储至内部Topic
  4. 结束事务: 发送各类请求结束事务, 同时将事务信息存储至内部Topic和日志文件

Consumer的事务受以下限制:

  1. 采用日志压缩策略的Topic, 其Record可能被覆盖
  2. Consumer在消费时可能没有分配到事务内的所有Partition
  3. Record可能分布在Partition的多个LogSegment, 存在部分被清除的可能
  4. Consumer可通过位移提交/位移消费访问Record, 可能导致遗漏事务中的Record

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/603884.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

2024--Django平台开发-Django知识点(四)

1.知识回顾 创建项目&#xff1a;新项目、别人项目、新版版、老版本 项目目录&#xff08;v1.0版本&#xff09; 路由系统 常见路由编写加粗样式 /index/ 函数 /index/<str:v1> 函数 re_path(ryy/(\d{4})-(\d{2})-(\d{2})/, views.yy), re_path(ryy/(?…

科研上新 | 第4期:语言-音乐对比预训练;查找表实现的神经网络推理;大模型时代重新定义搜索框架

编者按&#xff1a;欢迎阅读“科研上新”栏目&#xff01;“科研上新”汇聚了微软亚洲研究院最新的创新成果与科研动态。在这里&#xff0c;你可以快速浏览研究院的亮点资讯&#xff0c;保持对前沿领域的敏锐嗅觉&#xff0c;同时也能找到先进实用的开源工具。 本期内容速览 …

什么是ajax,为什么使用ajax!

前言&#xff1a; 要学习一门新的、技术之前&#xff0c;首先我们要了解一下他是什么&#xff0c;为什么使用&#xff0c;有什么好处&#xff0c;该怎么理解。现在就从下文开始了解吧 什么是ajax: Ajax即“Asynchronous Javascript And XML”(异步JavaScript 和XML)&#xff0…

监控API的指标

监控服务器已经是常态了&#xff0c;但是监控API的表现是啥意思呢&#xff1f;还有监控指标&#xff1f;今天就来看看如何监控API。 正如监控应用程序以确保高质量性能一样&#xff0c;也必须监控API。 API是应用程序相互通信的管道。更具体地说&#xff0c;API提供了一种方法…

spring事务默认传播机制REQUIRED的试验(手动开启事务代码+feign远程调用)

transactional注解&#xff0c;默认啥都不指定的时候&#xff0c;我们使用的就是PROPAGATION_REQUIRED这种方式。 PROPAGATION_REQUIRED:业务方法需要在一个事务中运行&#xff0c;如果方法运行时&#xff0c;已处在一个事务中&#xff0c;那么就加入该事务&#xff0c;否则自…

C++20新特性解析:深入探讨协程库的实现原理与应用

C20新特性解析&#xff1a;深入探讨协程库的实现原理与应用 一、C20的协程库简介二、C20协程基础知识2.1、协程的基本概念和使用方法2.2、C20中的协程支持2.3、协程与传统线程的对比 三、C20协程库的实现原理四、C20协程库的应用实例总结 一、C20的协程库简介 C20引入了对协程…

特种印制电路技术

1特种印制电路技术现状、分类及特点 2006年&#xff0c;信息产业部(现工信部)电子信息产品管理司将高档PCB产品类型概括为HDI板、多层FPC、刚挠结合板、IC载板、通信背板、特种板材印制板、印制板新品种等种类。但直至目前&#xff0c;在印制电路设计与制造领域还没有形成特种…

软件测试|深入理解SQL RIGHT JOIN:语法、用法及示例解析

引言 在SQL中&#xff0c;JOIN是一种重要的操作&#xff0c;用于将两个或多个表中的数据关联在一起。SQL提供了多种JOIN类型&#xff0c;其中之一是RIGHT JOIN。RIGHT JOIN用于从右表中选择所有记录&#xff0c;并将其与左表中匹配的记录组合在一起。本文将深入探讨SQL RIGHT …

Python Selenium常见的报错以及措施

Python Selenium的常见报错主要包括以下几种&#xff1a; 1. NoSuchElementException: 当Selenium无法在DOM中找到元素时&#xff0c;会抛出此异常。这通常是因为元素不存在或者页面还未完全加载。 解决方法&#xff1a; 显式等待 隐式等待 越快越慢&#xff0c;越慢越快&#…

C#,冒泡排序算法(Bubble Sort)的源代码与数据可视化

排序算法是编程的基础。 常见的四种排序算法是&#xff1a;简单选择排序、冒泡排序、插入排序和快速排序。其中的快速排序的优势明显&#xff0c;一般使用递归方式实现&#xff0c;但遇到数据量大的情况则无法适用。实际工程中一般使用“非递归”方式实现。本文搜集发布四种算法…

嵌入式培训机构四个月实训课程笔记(完整版)-Linux系统编程第二天-Linux开发板外设练习题(物联技术666)

更多配套资料CSDN地址:点赞+关注,功德无量。更多配套资料,欢迎私信。 物联技术666_嵌入式C语言开发,嵌入式硬件,嵌入式培训笔记-CSDN博客物联技术666擅长嵌入式C语言开发,嵌入式硬件,嵌入式培训笔记,等方面的知识,物联技术666关注机器学习,arm开发,物联网,嵌入式硬件,单片机…

并发(6)

目录 31.JUC框架包含几个部分&#xff1f; 32.Lock框架和Tools包含哪些核心的类&#xff1f; 33.JUC原子类有哪些核心的类&#xff1f; 34.JUC线程池有哪些核心的类&#xff1f; 35.线程安全的实现方法有哪些&#xff1f; 31.JUC框架包含几个部分&#xff1f; 主要包含&am…

SpringCloud-高级篇(十三)

前面的主从集群可以应对Redis高并发读的问题&#xff0c;Redis主从之间可以做同步&#xff0c;为了提高主从同步时的性能&#xff0c;单节点Redis的内存不要设置太高&#xff0c;如果内存占用过多&#xff0c;做RDB的持久化&#xff0c;或者做全量同步的时候&#xff0c;导致大…

LDD学习笔记 -- Linux错误码

LDD学习笔记 -- Linux错误码 EACCES(Permission Denied) 13EEXIST(File Exits) 17EINVAL(Invalid Argument) 22ENOENT(No Such File or Directory)ENOMEM(Out of Memory)EIO(Input/Output Error) 5ENOSPC(No space Left on Device)ENOTTY(Not a Typewrite)EPIPE(Broken Pipe)EI…

使用邮箱发送验证码前端完成登录

前言 在前一篇使用C#发送邮箱验证码已经完成使用.net core web api写了完成往登录邮箱发送验证码的接口。现在就用前端调用接口模拟登录功能。 接口 public class ApiResp{public bool Success { get; set; }public int Code { get; set; }public int count { get; set; }pu…

元数据管理平台对比预研 Atlas VS Datahub VS Openmetadata

大家好&#xff0c;我是独孤风。元数据管理平台层出不穷&#xff0c;但目前主流的还是Atlas、Datahub、Openmetadata三家&#xff0c;那么我们该如何选择呢&#xff1f; 本文就带大家对比一下,这三个平台优势劣势。要了解元数据管理平台&#xff0c;先要从架构说起。 正文共&am…

【北邮国院大四上】Business Technology Strategy 企业技术战略

北邮国院电商大四在读&#xff0c;本笔记仅为PPT内容的整理与翻译&#xff0c;并不代表本课程的考纲及重点&#xff0c;仅为本人复习时方便阅读与思考之作。 写在前面 大家好&#xff0c;欢迎来到大学期间的最后一门课程&#xff0c;本门课程是中方课&#xff0c;所以很庆幸的…

【Apollo】阿波罗使用占位符 #{} 的异常分析

文章目录 1. 前言2. 复现3. 分析3.1 Value 注解3.2 根因 4. 后记5. 参考资料 1. 前言 出于线上 hotfix 报文请求模板的考虑&#xff0c;新增一个阿波罗配置&#xff0c;取值形如&#xff1a; 发布配置一段时间后&#xff0c;刚好需要重启服务&#xff0c;最终造成服务宕机&a…

在Ubuntu22.04上安装WordPress

WordPress是当今最简单、最强大的博客和网站建设工具。据统计全球大约有40% 以上网站是使用WordPress&#xff0c;这是个巨大的数字也侧面证明了WordPress的强大和普遍性。因此&#xff0c;如果你正在寻找一款高效、实用、可靠的CMS工具来构建网站&#xff0c;那么WordPress无疑…

关于HAL库外部中断的开关流程

通过HAL库配置好外部中断后&#xff0c;会生成如下代码&#xff1a; static void MX_GPIO_Init(void) {GPIO_InitTypeDef GPIO_InitStruct {0}; /* USER CODE BEGIN MX_GPIO_Init_1 */ /* USER CODE END MX_GPIO_Init_1 *//* GPIO Ports Clock Enable */__HAL_RCC_GPIOD_CLK_…