Kafka 实现之接口设计 —— 生产者 API、消费者 API

目录

一. 前言

二. 生产者 API

三. 消费者 API

3.1. 低级别 API

3.2. 高级别 API


一. 前言

    Kafka 强大的应用程序层的基础是用于访问存储的两个基本 API,即用于写入事件的生产者 API和用于读取事件的消费者 API。在这两个 API 之上构建了用于集成和处理的 API。

二. 生产者 API

The Producer API that wraps the 2 low-level producers - kafka.producer.SyncProducer and kafka.producer.async.AsyncProducer.

    生产者 API,它封装了2个低级别的生产者 - kafka.producer.SyncProducer  和  kafka.producer.async.AsyncProducer。

class Producer {/* Sends the data, partitioned by key to the topic using either the *//* synchronous or the asynchronous producer */public void send(kafka.javaapi.producer.ProducerData<K,V> producerData);/* Sends a list of data, partitioned by key to the topic using either *//* the synchronous or the asynchronous producer */public void send(java.util.List<kafka.javaapi.producer.ProducerData<K,V>> producerData);/* Closes the producer and cleans up */public void close();}

The goal is to expose all the producer functionality through a single API to the client. The new producer -

    通过 API 提供给客户端,来暴露生产者所有的功能。新的生产者有如下功能:

1. can handle queueing/buffering of multiple producer requests and asynchronous dispatch of the batched data -

  • kafka.producer.Producer provides the ability to batch multiple produce requests (producer.type=async), before serializing and dispatching them to the appropriate kafka broker partition. The size of the batch can be controlled by a few config parameters. As events enter a queue, they are buffered in a queue, until either queue.time or batch.sizeis reached. A background thread (kafka.producer.async.ProducerSendThread) dequeues the batch of data and lets the kafka.producer.EventHandler serialize and send the data to the appropriate kafka broker partition. A custom event handler can be plugged in through the event.handler config parameter. At various stages of this producer queue pipeline, it is helpful to be able to inject callbacks, either for plugging in custom logging/tracing code or custom monitoring logic. This is possible by implementing the kafka.producer.async.CallbackHandler interface and setting callback.handler config parameter to that class.

1. 可以处理多个生产者请求和异步批量数据派发的队列/缓冲 -

  • kafka.producer.Producer 提供批处理多个生产请求的能力(producer.type=async),序列化和派发到 Broker 分区之前,可以配置参数控制批量的大小。随着事件进入队列,缓存在队列,直到满足 queue.time 或 batch.size。后台线程(kafka.producer.async.ProducerSendThread)发送一批数据用kafka.producer.EventHandler 序列化并发送到 Broker 分区。还可以通过 event.handler 配置参数可以插入自定义的事件处理。生产者队列管道在不同的阶段,无论是插入自动的日志记录/跟踪代码或自定义的监控逻辑,能够注入回调,通过实现kafka.producer.async.CallbackHandler 接口和设置 callback.handler 配置参数的类。

2. handles the serialization of data through a user-specified Encoder-

interface Encoder<T> {public Message toMessage(T data);
}

The default is the no-op kafka.serializer.DefaultEncoder 

2. 处理数据序列化,通过用户指定的 Encoder-

interface Encoder<T> {public Message toMessage(T data);
}

默认是空操作 kafka.serializer.DefaultEncoder。

3. provides software load balancing through an optionally user-specified Partitioner-

The routing decision is influenced by the kafka.producer.Partitioner.

interface Partitioner<T> {int partition(T key, int numPartitions);
}

The partition API uses the key and the number of available broker partitions to return a partition id. This id is used as an index into a sorted list of broker_ids and partitions to pick a broker partition for the producer request. The default partitioning strategy ishash(key)%numPartitions. If the key is null, then a random broker partition is picked. A custom partitioning strategy can also be plugged in using thepartitioner.classconfig parameter.

3. 提供平衡负载,通过用户指定的 Partitioner 

路由决定由kafka.producer.Partitioner影响。

interface Partitioner<T> {int partition(T key, int numPartitions);
}

该分区 API,使用 Key 和可用 Broker 分区数,返回一个分区 ID。这个 id 用作索引 broker_ids 和分区排序列表来为生产者请求挑选一个 Broker 分区。默认的分区策略是 hash(key)% numPartitions。如果 key 是空,则随机 Broker 分区,还可以插入自定义分区策略使用partitioner.class 配置参数。

三. 消费者 API

We have 2 levels of consumer APIs. The low-level "simple" API maintains a connection to a single broker and has a close correspondence to the network requests sent to the server. This API is completely stateless, with the offset being passed in on every request, allowing the user to maintain this metadata however they choose.

    我们有两个级别的消费者 API,低级别的“简单”API 保持一个 Broker 连接,并与发送到服务器的网络请求紧密对应。这个 API 是完全无状态的,每个请求都会传递偏移量,允许用户按照自己的选择来维护这个元数据。

The high-level API hides the details of brokers from the consumer and allows consuming off the cluster of machines without concern for the underlying topology. It also maintains the state of what has been consumed. The high-level API also provides the ability to subscribe to topics that match a filter expression (i.e., either a whitelist or a blacklist regular expression).

    高级别 API 向消费者隐藏 Broker 的具体细节,并允许在不考虑底层拓扑的情况下消费机器集群。它维持已消费的状态。高级别 API 还提供了还提供订阅与过滤器表达式(即白名单或黑名单正则表达式)匹配 Topic 的能力。

3.1. 低级别 API

class SimpleConsumer {/* Send fetch request to a broker and get back a set of messages. */ public ByteBufferMessageSet fetch(FetchRequest request);/* Send a list of fetch requests to a broker and get back a response set. */ public MultiFetchResponse multifetch(List<FetchRequest> fetches);/*** Get a list of valid offsets (up to maxSize) before the given time.* The result is a list of offsets, in descending order.* @param time: time in millisecs,*              if set to OffsetRequest$.MODULE$.LATIEST_TIME(), get from the latest offset available.*              if set to OffsetRequest$.MODULE$.EARLIEST_TIME(), get from the earliest offset available.*/public long[] getOffsetsBefore(String topic, int partition, long time, int maxNumOffsets);
}

The low-level API is used to implement the high-level API as well as being used directly for some of our offline consumers (such as the hadoop consumer) which have particular requirements around maintaining state.

    低级别 API 用于实现高级别 API,并直接用于一些离线消费者(如 hadoop 消费者),这些消费者对维护状态有特殊要求。

3.2. 高级别 API

/* create a connection to the cluster */ 
ConsumerConnector connector = Consumer.create(consumerConfig);interface ConsumerConnector {/*** This method is used to get a list of KafkaStreams, which are iterators over* MessageAndMetadata objects from which you can obtain messages and their* associated metadata (currently only topic).*  Input: a map of <topic, #streams>*  Output: a map of <topic, list of message streams>*/public Map<String,List<KafkaStream>> createMessageStreams(Map<String,Int> topicCountMap); /*** You can also obtain a list of KafkaStreams, that iterate over messages* from topics that match a TopicFilter. (A TopicFilter encapsulates a* whitelist or a blacklist which is a standard Java regex.)*/public List<KafkaStream> createMessageStreamsByFilter(TopicFilter topicFilter, int numStreams);/* Commit the offsets of all messages consumed so far. */public commitOffsets()/* Shut down the connector */public shutdown()
}

This API is centered around iterators, implemented by the KafkaStream class. Each KafkaStream represents the stream of messages from one or more partitions on one or more servers. Each stream is used for single threaded processing, so the client can provide the number of desired streams in the create call. Thus a stream may represent the merging of multiple server partitions (to correspond to the number of processing threads), but each partition only goes to one stream.

    这个 API 以迭代器为中心,由 KafkaStream 类实现,每个 KafkaStream 表示来自一个或多个服务器上的一个或多个分区的消息流。每个流都是单线程处理,因此客户端可以在创建调用中提供所需流的数量。因此,一个流可能代表多个服务器分区的合并(对应处理线程的数量),但每个分区只能进入一个流。

The createMessageStreams call registers the consumer for the topic, which results in rebalancing the consumer/broker assignment. The API encourages creating many topic streams in a single call in order to minimize this rebalancing. The createMessageStreamsByFilter call (additionally) registers watchers to discover new topics that match its filter. Note that each stream that createMessageStreamsByFilter returns may iterate over messages from multiple topics (i.e., if multiple topics are allowed by the filter).

    createMessageStreams 调用注册消费者的 Topic,这将导致重新平衡消费者/ Broker 分配。API鼓励在单个调用中创建多个 Topic 流,以尽量减少这种重新平衡。createMessageStreamsByFilter调用(另外)登记观察者发现符合过滤器的新 Topic。需要注意的是,createMessageStreamsByFilter 返回的每个流都可能遍历了来自多个 Topic 的消息(即,过滤器允许多个 Topic)。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/740733.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

在域控批量导出用户及其所在路径的信息

在Windows Server的Active Directory环境中&#xff0c;要批量导出用户及其所在OU&#xff08;组织单位&#xff09;的信息&#xff0c;可以使用PowerShell命令来实现。以下是一个简单的示例&#xff1a; Get-ADUser -Filter * -Properties CanonicalName | Select-Object Nam…

力扣-160. 相交链表(双指针)

给你两个单链表的头节点 headA 和 headB &#xff0c;请你找出并返回两个单链表相交的起始节点。如果两个链表不存在相交节点&#xff0c;返回 null 。 图示两个链表在节点 c1 开始相交&#xff1a; 题目数据 保证 整个链式结构中不存在环。 注意&#xff0c;函数返回结果后&a…

ARTS Week 20

Algorithm 本周的算法题为 1222. 可以攻击国王的皇后 在一个 下标从 0 开始 的 8 x 8 棋盘上&#xff0c;可能有多个黑皇后和一个白国王。 给你一个二维整数数组 queens&#xff0c;其中 queens[i] [xQueeni, yQueeni] 表示第 i 个黑皇后在棋盘上的位置。还给你一个长度为 2 的…

【JVM】字节码指令 invokevirtual

在Java虚拟机&#xff08;JVM&#xff09;中&#xff0c;invokevirtual 是一个字节码指令&#xff0c;用于调用对象实例的方法。它的主要作用是执行对象的虚方法调用&#xff0c;遵循Java语言中的动态分派机制。 例如&#xff0c;假设我们有以下Java代码&#xff1a; Java 1…

AI与广告创意:大模型在广告生成中的应用与挑战

AI与广告创意&#xff1a;大模型在广告生成中的应用与挑战 摘要&#xff1a; 本文将探讨AI大模型在广告创意生成中的应用&#xff0c;并分析其面临的挑战。 引言&#xff1a; 在当今信息爆炸的时代&#xff0c;广告创意生成的重要性不言而喻。广告创意需要吸引目标受众的注…

多维时序 | Matlab实现VMD-CNN-BiLSTM变分模态分解结合卷积神经网络结合双向长短期记忆神经网络多变量时间序列预测

多维时序 | Matlab实现VMD-CNN-BiLSTM变分模态分解结合卷积神经网络结合双向长短期记忆神经网络多变量时间序列预测 目录 多维时序 | Matlab实现VMD-CNN-BiLSTM变分模态分解结合卷积神经网络结合双向长短期记忆神经网络多变量时间序列预测预测效果基本介绍程序设计参考资料 预测…

stm32是用标准库还是hal库?

我大学几年一直都是使用标准库&#xff0c;然后17年毕业第一份工作转为HAL库&#xff0c;主要也是公司需求没办法。然后之后的数年我都是建议转HAL库&#xff0c;并不是这个库有多好&#xff0c;相反小问题很多&#xff0c;但是有一个&#xff0c;这是官方一直在开发维护的库&a…

软件开发秘籍:打造高效的分支管理策略

版本控制策略&#xff1a;如何制定合理的分支管理策略 引言 版本控制是软件开发中至关重要的一环&#xff0c;而合理的分支管理策略则是版本控制成功的关键。本文将介绍如何制定合理的分支管理策略&#xff0c;并通过实际案例和技巧&#xff0c;帮助读者更好地理解和应用。 …

SpringBoot3快速入门

目录 一、快速创建项目 二、手动创建一个工程 一、快速创建项目 1、使用官网提供的spring组件创建一个springboot3工程&#xff0c;springboot3要使用JDK17以上的版本 选择配置点击finish&#xff0c;刷新maven 创建一个controller层&#xff0c;写一个demo&#xff0c;点击运…

学习vue3第四节(ref以及ref相关api)

主要记录以下api&#xff1a;ref()、isRef()、unref()、 shallowRef()、triggerRef()、customRef() 1、ref() 定义 接受一个内部值&#xff0c;返回一个响应式的、可更改的 ref 对象&#xff0c;此对象只有一个指向其内部值的属性 .value&#xff0c;.value属性用于追踪并且存…

二十、HTML

一、什么是HTML 超文本标记语言&#xff0c;不是一种编程语言&#xff0c;而是一种标记语言&#xff0c;描述网页的语言&#xff0c;HTML使用标签描述网页中图片、文本、音乐、视频、超链接等。 二、常用标签 1、标题标签 <h1>一级标题</h1> 1-6 2、段落标签<p&…

Github 2024-03-13 C开源项目日报 Top10

根据Github Trendings的统计,今日(2024-03-13统计)共有10个项目上榜。根据开发语言中项目的数量,汇总情况如下: 开发语言项目数量C项目10C++项目1Curl:用于传输数据的命令行工具和库 创建周期:5067 天开发语言:C协议类型:OtherStar数量:32994 个Fork数量:6208 次关注人…

vscode使用npm命令无反应,而终端可以的解决办法

如若你遇到这种情况 使用命令 get-command npm 去下面这个路径把它删掉就可以了

在linux中如何后台运行java项目(详细)

目录 1.查看是否安装有jdk环境 2.将打包好的jar上传到linux服务器上 3.运行java程序 直接运行&#xff1a; 使用 & 使用 nohup & 扩展知识 1.查看是否安装有jdk环境 java -version 如果可以查看到jdk版本 &#xff0c;那就代表环境配置好了 2.将打包好的jar上…

Java JUC 笔记(2)

Java JUC 笔记&#xff08;2&#xff09; 锁框架 JDK5以后增加了Lock接口用来实现锁功能&#xff0c;其提供了与synchronized类似的同步功能&#xff0c;但是在使用时手动的获取和释放锁 Lock和Condition锁 这里的锁与synchronized锁不太一样&#xff0c;我们可以认为是Loc…

【硬件工程师面经整理30_工艺现状】

请描述一下国内的工艺现状 工艺水平&#xff1a;中国的器件工艺水平在不断提高&#xff0c;已经可以制造出高性能、高可靠性的器件产品。在集成电路领域&#xff0c;中国已经具备了一定的制造能力和产业规模&#xff0c;能够生产一些中低端的芯片产品。在光电器件、传感器、功…

【YOLOv9】训练模型权重 YOLOv9.pt 重新参数化轻量转为 YOLOv9-converted.pt

【YOLOv9】训练模型权重 YOLOv9.pt 重新参数化轻量转为 YOLOv9-converted.pt 1. 模型权重准备2. 模型重新参数化2.1 文件准备2.2 参数修改2.3 重新参数化过程 3. 重新参数化后模型推理3.1 推理超参数配置3.2 模型推理及对比 4. onnx 模型导出&#xff08;补充内容&#xff09;4…

Java开发从入门到精通(一):Java 数据库编程

三、Java 数据库编程 JDBC 基础&#xff1a;连接数据库、执行 SQL 查询 使用 JDBC 操作数据库 数据库连接池和事务处理 数据库基础知识&#xff1a; 数据库概念和模型 SQL 语言 关系型数据库架构 JDBC 编程&#xff1a; JDBC 概述和工作原理 连接数据库 执行 SQL 查询和更新 处…

蓝桥杯第1595题——和与乘积

题目描述 给定一个数列 A(a1​,a2​,⋯,an​)&#xff0c;问有多少个区间 [L,R] 满足区间内元素的乘积等于他们的和。 输入描述 输入第一行包含一个整数 n&#xff0c;表示数列的长度。 第二行包含 n 个整数&#xff0c;依次表示数列中的数 a1​,a2​,⋯,an​。 输出描述 …

java面试题(持续更新.. ...)

JDK和JRE和JVM区别 JVM是运行字节码的虚拟机&#xff0c;JRE在JVM的基础上添加了基本的类库&#xff0c;JDK在JRE的基础上添加了一些编译的工具(例如&#xff1a;javac等)… … java和c的区别 java和c都是面向对象都支持继承&#xff0c;但是c是多继承&#xff0c;java是单继承…