kafka: 基础概念回顾(生产者客户端和机架感知相关内容)

一、kafka生产者客户端

在kafka体系结构中有如下几个重要的概念:

  • Producer:生产者,负责生产消息并投递到kafka broker的某个的分区中
  • Consumer:消费者,负责消费kafka若干个分区中的消息
  • Broker:kafka服务节点
1、整体架构:数据发送流程

在这里插入图片描述
(1)生产者

  • 拦截器
    生产者的拦截器可以在消息发送前做一些拦截工作对数据进行相应的处理,比如:消息过滤、消息内容修改等。
package org.apache.kafka.clients.producer;
import org.apache.kafka.common.Configurable;
public interface ProducerInterceptor<K, V> extends Configurable {//在将消息序列化和计算分区之前会调⽤该⽅法,⽤来对消息进⾏相应的定制化操作,如修改消息内容public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);//在消息被应答之前或者消息发送失败时调⽤该⽅法,优先于⽤⼾设定的Callback之前执⾏,如统计消息发送成功或失败的次数public void onAcknowledgement(RecordMetadata metadata, Exception exception);public void close();
}
  • 序列化器
  • 分区器

二、kafka数据可靠性保证

1、LEO和HW
2、工作流程
3、Leader Epoch

三、粘性分区策略

四、机架感知

1、概念
2、机架感知分区分配策略
3、验证

(1)验证目标

  • 机架感知特性将同⼀分区的副本分散到不同的机架上
  • rack机制消费者可以消费到follower副本中的数据

(2)参数配置
broker端配置:

  • 配置名:broker.rack=my-rack-id
    • 解释:broker属于的rack
  • 配置名:replica.selector.class
    • 解释:ReplicaSelector实现类的全名,包括路径 (⽐如 RackAwareReplicaSelector 即按 rack id 指定消费)

Client端配置:
client.rack

  • consumer端配置
  • 配置名:client.rack
  • 解释:这个参数需要和broker端指定的 broker.rack 相同,表⽰去哪个rack中获取数据。
  • 默认:null

(3)环境准备:kafka集群

  • kafka实例数: 4
  • 两个kafka实例broker.rack配置为0,另外两个kafka实例broker.rack配置为了2,broker端配置如下:
server1:
broker.id=0broker.rack=0
replica.selector.class=org.apache.kafka.common.replica.RackAwareReplicaSelectorserver2:
broker.id=1
broker.rack=0
replica.selector.class=org.apache.kafka.common.replica.RackAwareReplicaSelectorserver3
broker.id=2
broker.rack=2
replica.selector.class=org.apache.kafka.common.replica.RackAwareReplicaSelectorserver4
broker.id=3
broker.rack=2
replica.selector.class=org.apache.kafka.common.replica.RackAwareReplicaSelector

启动kafka集群,服务端⽇志信息:
在这里插入图片描述
在这里插入图片描述
验证一:机架感知特性将同一分区的副本分散到不同的机架上
在这里插入图片描述
创建topic rack02,副本被分配到了broker1和2
在这里插入图片描述
创建topic rack03 副本被分配到了0和3
在这里插入图片描述
在这里插入图片描述

验证二:客⼾端(消费者)验证:rack机制消费者可以消费到follower副本中的数据

验证代码如下:

package person.xsc.train.producer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import person.xsc.train.client.KafkaConsumerClient;
import person.xsc.train.constant.KafkaConstant;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
public class Demo {public static KafkaConsumer<String, String> kafkaConsumer;public static void main(String[] args) {Properties properties = new Properties();properties.put(KafkaConstant.BOOTSTRAP_SERVERS, "localhost:9093,localhosproperties.put(KafkaConstant.GROUP_ID, "test01");properties.put(KafkaConstant.ENABLE_AUTO_COMMIT, "true");properties.put(KafkaConstant.AUTO_COMMIT_INTERVAL_MS, "1000");properties.put(KafkaConstant.KEY_DESERIALIZER, StringDeserializer.class.properties.put(KafkaConstant.VALUE_DESERIALIZER, StringDeserializer.clasproperties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "10");properties.put(ConsumerConfig.CLIENT_RACK_CONFIG, "0");properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");kafkaConsumer = KafkaConsumerClient.createKafkaClient(properties);receiveMessage("rack02");}public static void receiveMessage(String topic) {TopicPartition topicPartition0 = new TopicPartition(topic, 0);kafkaConsumer.assign(Arrays.asList(topicPartition0));while(true) {// Kafka的消费者⼀次拉取⼀批的数据ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll//System.out.println("开始打印消息!");// 5.将将记录(record)的offset、key、value都打印出来for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {// 主题String topicName = consumerRecord.topic();int partition = consumerRecord.partition();// offset:这条消息处于Kafka分区中的哪个位置long offset = consumerRecord.offset();// key\valueString key = consumerRecord.key();String value = consumerRecord.value();System.out.println(String.format("topic: %s, partition: %s, offs}}}
}

前置背景:
Topic rack02的partition 0分区的副本为broker2(对应的rack为2)和broker1(对应的rack为0),其中broker2为leader(在⾮rack机制下仅能消费到leader中的数据)。

在上述代码中,消费者配置中限制了rack为0,消费的分区为0,因此映射到broker1。通过测试可验证在rack机制下消费者可以消费到folloer副本中的数据,测试如下:
在这里插入图片描述

五、机架感知存在的问题

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/610655.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

@DependsOn:解析 Spring 中的依赖关系之艺术

欢迎来到我的博客&#xff0c;代码的世界里&#xff0c;每一行都是一个故事 DependsOn&#xff1a;解析 Spring 中的依赖关系之艺术 前言简介基础用法高级用法在 XML 配置中使用 DependsOn通过 Java Config 配置实现依赖管理 生命周期与初始化顺序Bean 生命周期的关键阶段&…

红帽宣布CentOS 7和RHEL 7将在2024年6月30日结束支持,企业面临紧迫的迁移压力!

2020 年红帽 (RedHat&#xff0c;已在 2019 年被 IBM 收购) 单方面宣布终止 CentOS Linux 的开发&#xff0c;此后 CentOS Linux 8 系列的更新已经在 2021 年 12 月结束&#xff0c;而 CentOS Linux 7 系列的更新将在 2024 年 6 月 30 日结束。 与 CentOS Linux 7 一起发布的 R…

网络的设置

一、网络设置 1.1查看linux基础的网络设置 网关 route -n ip地址ifconfigDNS服务器cat /etc/resolv.conf主机名hostname路由 route -n 网络连接状态ss 或者 netstat域名解析nslookup host 例题&#xff1a;除了ping&#xff0c;什么命令可以测试DNS服务器来解…

LeetCode 94. 二叉树的中序遍历

94. 二叉树的中序遍历 给定一个二叉树的根节点 root &#xff0c;返回 它的 中序 遍历 。 示例 1&#xff1a; 输入&#xff1a;root [1,null,2,3] 输出&#xff1a;[1,3,2] 示例 2&#xff1a; 输入&#xff1a;root [] 输出&#xff1a;[] 示例 3&#xff1a; 输入&…

企业级进销存管理系统

框架&#xff1a; 进销存管理系统&#xff0c;采用SpringBootShiroMyBatisEasyUI 项目采用Maven构建&#xff0c;数据库文件存放在 sql/jxc.sql 截图 运行项目部分截图&#xff0c; 登录界面&#xff0c;用户名admin&#xff0c;密码admin123 当前库存查询&#xff0c; 进…

搭建Eureka服务注册中心

一、前言 我们在别的章节中已经详细讲解过eureka注册中心的作用&#xff0c;本节会简单讲解eureka作用&#xff0c;侧重注册中心的搭建。 Eureka作为服务注册中心可以进行服务注册和服务发现&#xff0c;注册在上面的服务可以到Eureka上进行服务实例的拉取&#xff0c;主要作用…

用判断对齐大语言模型

1、写作动机&#xff1a; 目前的从反馈中学习方法仅仅使用判断来促使LLMs产生更好的响应&#xff0c;然后将其作为新的示范用于监督训练。这种对判断的间接利用受到无法从错误中学习的限制&#xff0c;这是从反馈中学习的核心精神&#xff0c;并受到LLMs的改进能力的制约。 2…

来自一个系统的自白

天空一声巨响&#xff0c;小炫我闪亮登场&#xff01;初次见面&#xff0c;给大家简单介绍下自己&#xff1a;我是炫我渲染私有云系统&#xff0c;是最新一代的智能渲染集群系统。可以进行私有化部署&#xff0c;在3dsmax、maya等软件中一键完成提交、上传、渲染、下载的任务&a…

1881_S32K344开发工具以及MCAL软件安装

全部学习汇总&#xff1a; GreyZhang/g_s32k344: A new MCU learning notes. I would try to use MCAL instead of SDK. (github.com) 编译有专门的编译器安装包&#xff0c;也有IDE的安装形式。这里我选择了IDE&#xff0c;因为我还需要一个开发调试环境。这个IDE可以让我方便…

使用cURL命令在Linux中测试HTTP服务器的性能

cURL是一个强大的命令行工具&#xff0c;用于从或向服务器传输数据。它支持多种协议&#xff0c;包括HTTP、HTTPS、FTP等。在Linux系统中&#xff0c;cURL可以用于测试和评估HTTP服务器的性能。下面是一些使用cURL命令测试HTTP服务器性能的示例和说明。 1. 基本请求 要向指定…

MySQL8下载安装教程

一、MySQL下载 我的版本是8.2.0&#xff0c;当前的最新版本&#xff0c;网址如下&#xff1a;MySQL :: Download MySQL Community Server 点击No thanks&#xff0c;just start my download&#xff0c;就是只是开始下载的意思&#xff0c;点击下载&#xff0c;等待下载完成 二…

ylov8的训练和预测使用(目标检测)

首先要配置文文件 1-配置数据集的yaml文件&#xff1a; 目录在ultralytics/cfg/datasets/下面&#xff1a; 例如我的&#xff1a; (这里面的yaml文件在/ultralytics/cfg/datasets下面有很多&#xff0c;可以找几个参考一下) path: /path/to/eye_datasets # dataset root di…

java基础之Java8新特性-方法引入

目录 1.简介 2.方法引入 方法引入遵循规范 方法引入种类 1.静态方法引入 2.对象方法引入 3.实例方法引入 4.构造函数引入 1.简介 方法引用是 Java 8 中引入的另一个重要特性&#xff0c;它提供了一种简洁的语法来直接引用现有方法或构造函数。方法引用可以看作是 Lambd…

【Python机器学习】决策树集成——梯度提升回归树

理论知识&#xff1a; 梯度提升回归树通过合并多个决策树来构建一个更为强大的模型。虽然名字里有“回归”&#xff0c;但这个模型既能用于回归&#xff0c;也能用于分类。与随机森林方法不同&#xff0c;梯度提升采用连续的方式构造树&#xff0c;每棵树都试图纠正前一…

功能分享【电商API接口】:商品采集正确使用方法!

相信很多做过电商的人&#xff0c;曾经有在淘宝、京东、天猫、拼多多、1688等平台上卖过自己的产品&#xff0c;但是每换一个平台&#xff0c;商品要重新上传&#xff0c;这浪费了很多没有必要的时间。 为此我们开发了商品采集API功能&#xff0c;一键完成上架商品&#xff0c…

CHS_01.1.5+操作系统引导

CHS_01.1.5操作系统引导 操作系统的引导一个新的磁盘安装操作系统后操作系统引导&#xff08;开机过程&#xff09; 操作系统的引导 我们会学习操作系统的引导 那你可能看见这个词的时候会觉得莫名其妙不明 绝地 什么是操作系统的引导呢 简单来说就是当你在开机的时候 如何让…

频率的高低与辐射强度有关系吗?

摘要: 频率的高低和辐射强度之间存在一定的关系。 一般而言&#xff0c;频率越高&#xff0c;辐射强度越大&#xff0c;即电磁辐射的能量越大。这是因为电磁波的能量与其频率成正比。在电磁波谱中&#xff0c;如X光和伽玛射线具有高频率和强辐射强度&#xff0c;可以破坏构成 .…

番外篇 中国古代的操 作系统

番外篇中国古代的操作系统 在古代中国&#xff0c;仿佛已经存在一套古老而神秘的操作系统机制。 这个东方国度中&#xff0c;有一位名叫小李子的忙碌人物&#xff0c;他的工作就如同是执行各种指令的“人肉CPU”。 这个国家还有一个特殊的人物&#xff0c;即皇帝&#xff0c;他…

1.4.1机器学习——梯度下降+α学习率大小判定

1.4.1梯度下降 4.1、梯度下降的概念 ※【总结一句话】&#xff1a;系统通过自动的调节参数w和b的值&#xff0c;得到最小的损失函数值J。 如下&#xff1a;是梯度下降的概念图。 我们有一个损失函数 J(w,b)&#xff0c;包含两个参数w和b&#xff08;你可以想象成J(w,b) w*x…

服务器配置SSL证书到nginx基于Fdfs存储服务器或者直接阿里云绑定SSL

1.如果用FDFS存储服务器内置nginx设置SSL证书 1.验证当前nginx是否存在 http_ssl_modulehttp_ssl_module模块 如果存在直接配置就行 server {listen 80 default backlog2048;listen 443 ssl; server_name 域名; ssl_certificate /usr/local/nginx_fdfs/ssl/xxxx.top.crt; ssl…