kafka: 基础概念回顾(生产者客户端和机架感知相关内容)

一、kafka生产者客户端

在kafka体系结构中有如下几个重要的概念:

  • Producer:生产者,负责生产消息并投递到kafka broker的某个的分区中
  • Consumer:消费者,负责消费kafka若干个分区中的消息
  • Broker:kafka服务节点
1、整体架构:数据发送流程

在这里插入图片描述
(1)生产者

  • 拦截器
    生产者的拦截器可以在消息发送前做一些拦截工作对数据进行相应的处理,比如:消息过滤、消息内容修改等。
package org.apache.kafka.clients.producer;
import org.apache.kafka.common.Configurable;
public interface ProducerInterceptor<K, V> extends Configurable {//在将消息序列化和计算分区之前会调⽤该⽅法,⽤来对消息进⾏相应的定制化操作,如修改消息内容public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);//在消息被应答之前或者消息发送失败时调⽤该⽅法,优先于⽤⼾设定的Callback之前执⾏,如统计消息发送成功或失败的次数public void onAcknowledgement(RecordMetadata metadata, Exception exception);public void close();
}
  • 序列化器
  • 分区器

二、kafka数据可靠性保证

1、LEO和HW
2、工作流程
3、Leader Epoch

三、粘性分区策略

四、机架感知

1、概念
2、机架感知分区分配策略
3、验证

(1)验证目标

  • 机架感知特性将同⼀分区的副本分散到不同的机架上
  • rack机制消费者可以消费到follower副本中的数据

(2)参数配置
broker端配置:

  • 配置名:broker.rack=my-rack-id
    • 解释:broker属于的rack
  • 配置名:replica.selector.class
    • 解释:ReplicaSelector实现类的全名,包括路径 (⽐如 RackAwareReplicaSelector 即按 rack id 指定消费)

Client端配置:
client.rack

  • consumer端配置
  • 配置名:client.rack
  • 解释:这个参数需要和broker端指定的 broker.rack 相同,表⽰去哪个rack中获取数据。
  • 默认:null

(3)环境准备:kafka集群

  • kafka实例数: 4
  • 两个kafka实例broker.rack配置为0,另外两个kafka实例broker.rack配置为了2,broker端配置如下:
server1:
broker.id=0broker.rack=0
replica.selector.class=org.apache.kafka.common.replica.RackAwareReplicaSelectorserver2:
broker.id=1
broker.rack=0
replica.selector.class=org.apache.kafka.common.replica.RackAwareReplicaSelectorserver3
broker.id=2
broker.rack=2
replica.selector.class=org.apache.kafka.common.replica.RackAwareReplicaSelectorserver4
broker.id=3
broker.rack=2
replica.selector.class=org.apache.kafka.common.replica.RackAwareReplicaSelector

启动kafka集群,服务端⽇志信息:
在这里插入图片描述
在这里插入图片描述
验证一:机架感知特性将同一分区的副本分散到不同的机架上
在这里插入图片描述
创建topic rack02,副本被分配到了broker1和2
在这里插入图片描述
创建topic rack03 副本被分配到了0和3
在这里插入图片描述
在这里插入图片描述

验证二:客⼾端(消费者)验证:rack机制消费者可以消费到follower副本中的数据

验证代码如下:

package person.xsc.train.producer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import person.xsc.train.client.KafkaConsumerClient;
import person.xsc.train.constant.KafkaConstant;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
public class Demo {public static KafkaConsumer<String, String> kafkaConsumer;public static void main(String[] args) {Properties properties = new Properties();properties.put(KafkaConstant.BOOTSTRAP_SERVERS, "localhost:9093,localhosproperties.put(KafkaConstant.GROUP_ID, "test01");properties.put(KafkaConstant.ENABLE_AUTO_COMMIT, "true");properties.put(KafkaConstant.AUTO_COMMIT_INTERVAL_MS, "1000");properties.put(KafkaConstant.KEY_DESERIALIZER, StringDeserializer.class.properties.put(KafkaConstant.VALUE_DESERIALIZER, StringDeserializer.clasproperties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "10");properties.put(ConsumerConfig.CLIENT_RACK_CONFIG, "0");properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");kafkaConsumer = KafkaConsumerClient.createKafkaClient(properties);receiveMessage("rack02");}public static void receiveMessage(String topic) {TopicPartition topicPartition0 = new TopicPartition(topic, 0);kafkaConsumer.assign(Arrays.asList(topicPartition0));while(true) {// Kafka的消费者⼀次拉取⼀批的数据ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll//System.out.println("开始打印消息!");// 5.将将记录(record)的offset、key、value都打印出来for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {// 主题String topicName = consumerRecord.topic();int partition = consumerRecord.partition();// offset:这条消息处于Kafka分区中的哪个位置long offset = consumerRecord.offset();// key\valueString key = consumerRecord.key();String value = consumerRecord.value();System.out.println(String.format("topic: %s, partition: %s, offs}}}
}

前置背景:
Topic rack02的partition 0分区的副本为broker2(对应的rack为2)和broker1(对应的rack为0),其中broker2为leader(在⾮rack机制下仅能消费到leader中的数据)。

在上述代码中,消费者配置中限制了rack为0,消费的分区为0,因此映射到broker1。通过测试可验证在rack机制下消费者可以消费到folloer副本中的数据,测试如下:
在这里插入图片描述

五、机架感知存在的问题

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/610655.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

民安智库(第三方满意度调研公司):物业满意度调查,选择适合的调查方法至关重要

物业满意度调查是企业了解业主对物业管理服务需求和期望的重要工具。然而&#xff0c;选择适合的调查方法也是至关重要的。不同的调查方法可能会影响调查结果的真实性和准确性&#xff0c;进而影响企业改进物业管理服务的决策。本文将分享民安智库在物业满意度调查方面的实践经…

@DependsOn:解析 Spring 中的依赖关系之艺术

欢迎来到我的博客&#xff0c;代码的世界里&#xff0c;每一行都是一个故事 DependsOn&#xff1a;解析 Spring 中的依赖关系之艺术 前言简介基础用法高级用法在 XML 配置中使用 DependsOn通过 Java Config 配置实现依赖管理 生命周期与初始化顺序Bean 生命周期的关键阶段&…

尚硅谷大数据技术-数据湖Hudi视频教程-笔记02【核心概念(基本概念、数据写、数据读)】

大数据新风口&#xff1a;Hudi数据湖&#xff08;尚硅谷&Apache Hudi联合出品&#xff09; B站直达&#xff1a;https://www.bilibili.com/video/BV1ue4y1i7na 尚硅谷数据湖Hudi视频教程百度网盘&#xff1a;https://pan.baidu.com/s/1NkPku5Pp-l0gfgoo63hR-Q?pwdyyds阿里…

如何创业,创业的历程

1、网址导航流行的时候&#xff0c;hao123胜出 2、视频网站流行的时候&#xff0c;优酷 土豆 等胜出 3、团购网站流行的时候&#xff0c;美团胜出 4、导购网站流行的时候&#xff0c;美丽说、蘑菇街胜出 5、p2p流行的时候&#xff0c;不知道谁胜出 6、短视频流行的时候&#xf…

红帽宣布CentOS 7和RHEL 7将在2024年6月30日结束支持,企业面临紧迫的迁移压力!

2020 年红帽 (RedHat&#xff0c;已在 2019 年被 IBM 收购) 单方面宣布终止 CentOS Linux 的开发&#xff0c;此后 CentOS Linux 8 系列的更新已经在 2021 年 12 月结束&#xff0c;而 CentOS Linux 7 系列的更新将在 2024 年 6 月 30 日结束。 与 CentOS Linux 7 一起发布的 R…

CentOS Stream 9配置yum源

文章目录 Red Hat 9 && CentOS Stream 9 配置阿里云yum 源CentOS Stream 9 配置阿里云 yum 源Red Hat 9 配置阿里云 yum 源 Red Hat 9 && CentOS Stream 9 配置阿里云yum 源 CentOS Stream 9 配置阿里云 yum 源 备份原有的 yum文件 [rootlocalhost ~]# cd /…

网络的设置

一、网络设置 1.1查看linux基础的网络设置 网关 route -n ip地址ifconfigDNS服务器cat /etc/resolv.conf主机名hostname路由 route -n 网络连接状态ss 或者 netstat域名解析nslookup host 例题&#xff1a;除了ping&#xff0c;什么命令可以测试DNS服务器来解…

LeetCode 94. 二叉树的中序遍历

94. 二叉树的中序遍历 给定一个二叉树的根节点 root &#xff0c;返回 它的 中序 遍历 。 示例 1&#xff1a; 输入&#xff1a;root [1,null,2,3] 输出&#xff1a;[1,3,2] 示例 2&#xff1a; 输入&#xff1a;root [] 输出&#xff1a;[] 示例 3&#xff1a; 输入&…

【MySQL】数据库的设计

数据库设计 1、多表关系 一对多(多对一)&#xff1a;在多的一方建立外键&#xff0c;指向一的一方的主键。多对多&#xff1a;多对多关系实现需要借助第三张中间表。中间表至少包含两个字段&#xff0c;这两个字段作为第三张表的外键&#xff0c;分别指向两张表的主键一对一&…

企业级进销存管理系统

框架&#xff1a; 进销存管理系统&#xff0c;采用SpringBootShiroMyBatisEasyUI 项目采用Maven构建&#xff0c;数据库文件存放在 sql/jxc.sql 截图 运行项目部分截图&#xff0c; 登录界面&#xff0c;用户名admin&#xff0c;密码admin123 当前库存查询&#xff0c; 进…

搭建Eureka服务注册中心

一、前言 我们在别的章节中已经详细讲解过eureka注册中心的作用&#xff0c;本节会简单讲解eureka作用&#xff0c;侧重注册中心的搭建。 Eureka作为服务注册中心可以进行服务注册和服务发现&#xff0c;注册在上面的服务可以到Eureka上进行服务实例的拉取&#xff0c;主要作用…

用判断对齐大语言模型

1、写作动机&#xff1a; 目前的从反馈中学习方法仅仅使用判断来促使LLMs产生更好的响应&#xff0c;然后将其作为新的示范用于监督训练。这种对判断的间接利用受到无法从错误中学习的限制&#xff0c;这是从反馈中学习的核心精神&#xff0c;并受到LLMs的改进能力的制约。 2…

了解Spring中的依赖注入:@Autowired vs. @Resource

在Spring框架中&#xff0c;依赖注入是一项关键的特性&#xff0c;通过它&#xff0c;我们能够更灵活、更方便地管理和使用各种组件。在依赖注入的实现中&#xff0c;Resource 和 Autowired 是两个常用的注解&#xff0c;它们分别具有不同的特点和用途。在本篇博客中&#xff0…

来自一个系统的自白

天空一声巨响&#xff0c;小炫我闪亮登场&#xff01;初次见面&#xff0c;给大家简单介绍下自己&#xff1a;我是炫我渲染私有云系统&#xff0c;是最新一代的智能渲染集群系统。可以进行私有化部署&#xff0c;在3dsmax、maya等软件中一键完成提交、上传、渲染、下载的任务&a…

ESP32-WIFI(Arduino)

ESP32-WIFI Wi-Fi是一种基于IEEE 802.11标准的无线局域网技术&#xff0c;是Wi-Fi联盟制造商的商标作为产品的品牌认证。它可以让电脑、手机、平板电脑等设备通过无线信号连接到互联网 。 在无线网络中&#xff0c;AP&#xff08;Access Point&#xff09;和 STA&#xff08;St…

八股文 c++ 多态

静态多态 静态多态&#xff08;编译时多态&#xff09;&#xff1a;主要体现在函数重载&#xff08;Overloading&#xff09;和运算符重载上&#xff0c;编译器根据函数签名在编译阶段就能确定调用哪个函数。 动态多态 动态多态&#xff08;运行时多态&#xff09;&#xff…

leetcode 659. 分割数组为连续子序列

题目链接&#xff1a;leetcode 659 1.题目 给你一个按 非递减顺序 排列的整数数组 nums 。 请你判断是否能在将 nums 分割成 一个或多个子序列 的同时满足下述 两个 条件&#xff1a; 每个子序列都是一个 连续递增序列&#xff08;即&#xff0c;每个整数 恰好 比前一个整数…

1881_S32K344开发工具以及MCAL软件安装

全部学习汇总&#xff1a; GreyZhang/g_s32k344: A new MCU learning notes. I would try to use MCAL instead of SDK. (github.com) 编译有专门的编译器安装包&#xff0c;也有IDE的安装形式。这里我选择了IDE&#xff0c;因为我还需要一个开发调试环境。这个IDE可以让我方便…

使用cURL命令在Linux中测试HTTP服务器的性能

cURL是一个强大的命令行工具&#xff0c;用于从或向服务器传输数据。它支持多种协议&#xff0c;包括HTTP、HTTPS、FTP等。在Linux系统中&#xff0c;cURL可以用于测试和评估HTTP服务器的性能。下面是一些使用cURL命令测试HTTP服务器性能的示例和说明。 1. 基本请求 要向指定…

MySQL8下载安装教程

一、MySQL下载 我的版本是8.2.0&#xff0c;当前的最新版本&#xff0c;网址如下&#xff1a;MySQL :: Download MySQL Community Server 点击No thanks&#xff0c;just start my download&#xff0c;就是只是开始下载的意思&#xff0c;点击下载&#xff0c;等待下载完成 二…