zookeperkafka学习

1、why kafka

优点   缺点
kafka
  • 吞吐量高,对批处理和异步处理做了大量的设计,因此Kafka可以得到非常高的性能。
延迟也会高,不适合电商场景。
RabbitMQ
  • 如果有大量消息堆积在队列中,性能会急剧下降
  • 每秒处理几万到几十万的消息。如果应用要求高的性能,不要选择RabbitMQ。
性能RocketMQ低
RocketMQ
  • 性能比RabbitMQ高一个数量级,适合电商场景。
  • RocketMQ主要用于有序,事务,流计算,消息推送,日志流处理,binlog分发等场景。
  • 每秒处理几十万的消息,同时响应在毫秒级。如果应用很关注响应时间,可以使用RocketMQ。

2、Broker:

缓存代理(可以把Broker理解为Kafka的服务器),Kafka 集群中的一台或多台服务器统称为 broker。kafka中支持消息持久化的,生产者生产消息后,kafka不会直接把消息传递给消费者,而是先要在broker中进行存储,持久化是保存在kafka的日志文件中。 

3、分区:

一个消费者可以对应多个分区,一个分区只能对应一个消费者。

topic分区有leader和follower。 Log的分区被分布到集群中的多个服务器上。每个服务器处理它分到的分区。 根据配置每个分区还可以复制到其它服务器作为备份容错。 每个分区有一个leader,零或多个follower。Leader处理此分区的所有的读写请求,而follower被动的复制数据。如果leader宕机,其它的一个follower会被推举为新的leader。 一台服务器可能同时是一个分区的leader,另一个分区的follower。 这样可以平衡负载,避免所有的请求都只让一台或者某几台服务器处理。

4、消费者组 :

topic实现JMS模型中消费者组中只有一个消费者,这种情况下topic的消费的offset是无序的。当单个消费者无法跟上数据生成的速度,就可以增加更多的消费者分担负载,每个消费者只处理部分partition的消息,从而实现单个应用程序的横向伸缩。但是不要让消费者的数量多于partition的数量,此时多余的消费者会空闲。此外,Kafka还允许多个应用程序从同一个Topic读取所有的消息,此时只要保证每个应用程序有自己的消费者组即可。

kafka为什么读写快?

利用零拷贝和页面缓存技术,零拷贝技术读取文件数据并发送到网络的步骤如下:

  • 将磁盘文件的数据复制到页面缓存。
  • 将数据从页面缓存直接发送到网卡从而发到网络中。

rebalance

主要是对partition的个数和group当中的consumer个数重新统计,再重新对应consumer和partition的关系。一个消费者可以对应多个分区。一个分区只能对应一个消费者。

kafka producer API

生产者的分区由key决定

我们创建消息的时候,必须要提供主题和消息的内容,而消息的key是可选的,当不指定key时默认为null。消息的key有两个重要的作用:1)提供描述消息的额外信息;2)用来决定消息写入到哪个分区,所有具有相同key的消息会分配到同一个分区中。

如果key为null,那么生产者会使用默认的分配器,该分配器使用轮询(round-robin)算法来将消息均衡到所有分区。

如果key不为null而且使用的是默认的分配器,那么生产者会对key进行哈希并根据结果将消息分配到特定的分区。

案例:

Here is a simple example of using the producer to send records with strings containing sequential numbers as the key/value pairs.

Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("acks", "all");props.put("retries", 0);props.put("batch.size", 16384); //默认是16kB, 每个Batch要存放batch.size大小的数据后,才可以发送出去。props.put("linger.ms", 1); //一个Batch被创建之后,最多过多久,不管这个Batch有没有写满,都必须发送出去了。props.put("buffer.memory", 33554432); //默认是32MB,KafkaProducer发送出去的消息都是先进入到客户端本地的内存缓冲里,然后把很多消息收集成一个一个的Batch,再发送到Broker上去的。props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");Producer<String, String> producer = new KafkaProducer<>(props);for(int i = 0; i < 100; i++)producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));producer.close();

kafka consumer API

案例一:手动同步提交

Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("group.id", "test");props.put("enable.auto.commit", "false");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList("foo", "bar"));final int minBatchSize = 200;List<ConsumerRecord<String, String>> buffer = new ArrayList<>();while (true) {ConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records) {buffer.add(record);}if (buffer.size() >= minBatchSize) {insertIntoDb(buffer);consumer.commitSync();buffer.clear();}}

案例二:每个partition手动同步提交

try {while(running) {ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);for (TopicPartition partition : records.partitions()) {//拿到这个partition下面的所有数据List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);for (ConsumerRecord<String, String> record : partitionRecords) {System.out.println(record.offset() + ": " + record.value());}//通过这个partition的list获取最后一个数据的offsetlong lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));}}} finally {consumer.close();}

Kafka文件存储:

知道通过分片和索引机制找到offset的就行了。index和log文件以当前的第一条消息的offset命名。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/146828.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

记录一些涉及到界的题

文章目录 coppersmith的一些相关知识题1 [N1CTF 2023] e2Wrmup题2 [ACTF 2023] midRSA题3 [qsnctf 2023]浅记一下 coppersmith的一些相关知识 上界 X c e i l ( 1 2 ∗ N β 2 d − ϵ ) X ceil(\frac{1}{2} * N^{\frac{\beta^2}{d} - \epsilon}) Xceil(21​∗Ndβ2​−ϵ) …

【Proteus仿真】【STM32单片机】防火防盗GSM智能家居设计

文章目录 一、功能简介二、软件设计三、实验现象联系作者 一、功能简介 本项目使用Proteus8仿真STM32单片机控制器&#xff0c;使用声光报警模块、LCD1602显示模块、DS18B20温度、烟雾传感器模块、按键模块、PCF8591 ADC模块、红外检测模块等。 主要功能&#xff1a; 系统运行…

Mac电脑好用的窗口管理软件 Magnet 中文for mac

Magnet是一款用于Mac操作系统的窗口管理工具&#xff0c;它可以帮助您快速和方便地组织和管理应用程序窗口&#xff0c;以提高您的工作效率和多任务处理能力。 以下是Magnet的一些主要功能和特点&#xff1a; 窗口自动调整&#xff1a;Magnet允许您通过简单的拖放操作或使用快…

微信小程序 限制字数文本域框组件封装

微信小程序 限制字数文本域框 介绍&#xff1a;展示类组件 导入 在app.json或index.json中引入组件 "usingComponents": {"text-field":"/pages/components/text-field/index"}代码使用 <text-field maxlength"500" bindtabsIt…

IDEA创建文件添加作者及时间信息

前言 当使用IDEA进行软件开发时&#xff0c;经常需要在代码文件中添加作者和时间信息&#xff0c;以便更好地维护和管理代码。 但是如果每次都手动编辑 以及修改那就有点浪费时间了。 实践 其实我们可以将注释日期 作者 配置到 模板中 同时配置上动态获取内容 例如时间 这样…

PaddleClas学习2——使用PPLCNet模型对车辆朝向进行识别(python)

使用PPLCNet模型对车辆朝向进行识别 1. 配置PaddlePaddle,PaddleClas环境2. 准备数据2.1 标注数据格式2.2 标注数据3. 模型训练3.1 修改配置文件3.2 训练、评估4 模型预测1. 配置PaddlePaddle,PaddleClas环境 安装:请先参考文档 环境准备 配置 PaddleClas 运行环境。 2. 准…

美国服务器:全面剖析其主要优点与潜在缺点

​  服务器是网站搭建的灵魂。信息化的今天&#xff0c;我们仍需要它来为网站和应用程序提供稳定的运行环境。而美国作为全球信息技术靠前的国家之一&#xff0c;其服务器市场备受关注。那么&#xff0c;美国服务器究竟有哪些主要优点和潜在缺点呢? 优点 数据中心基础设施&a…

Vue 路由缓存 防止路由切换数据丢失

在切换路由的时候&#xff0c;如果写好了一丢数据在去切换路由在回到写好的数据的路由去将会丢失&#xff0c;这时可以使用路由缓存技术进行保存&#xff0c;这样两个界面来回换数据也不会丢失 在 < router-view >展示的内容都不会被销毁&#xff0c;路由来回切换数据也…

Linux上编译和安装SOFA23.06

前言 你可以直接使用编译安装好的SOFA版本Installing from all-included binaries (v23.06.00)&#xff1a; 如果你想自己编译&#xff0c;可以看我下面写的内容&#xff0c;不过绝大多数是从官网来的&#xff0c;如果和官网有出入&#xff0c;建议还是以官网为准。 在Linux下…

PVE Win平台虚拟机下如何安装恢复自定义备份Win系统镜像ISO文件(已成功实现)

环境: Virtual Environment 7.3-3 Win s2019 UltraISO9.7 USM6.0 NTLite_v2.1.1.7917 问题描述: PVE Win平台虚拟机下如何安装恢复自定义备份Win系统镜像ISO文件 本次目标 主要是对虚拟机里面Win系统备份做成可安装ISO文件恢复至别的虚拟机或者实体机上 解决方案: …

【数据库】数据库连接池导致系统吞吐量上不去-复盘

在实际的开发中&#xff0c;我们会使用数据库连接池&#xff0c;但是如果不能很好的理解其中的含义&#xff0c;那么就可以出现生产事故。 HikariPool-1 - Connection is not available, request timed out after 30001ms.当系统的调用量上去&#xff0c;就出现大量这样的连接…

NET8 ORM 使用AOT SqlSugar

.NET AOT8 基本上能够免强使用了, SqlSugar ORM也支持了CRUD 能在AOT下运行了 Nuget安装 SqlSugarCore 具体代码 StaticConfig.EnableAot true;//启用AOT 程序启动执行一次就好了//用SqlSugarClient每次都new,不要用单例模式 var db new SqlSugarClient(new ConnectionC…

Os-ByteSec

Os-ByteSec 一、主机发现和端口扫描 主机发现&#xff0c;靶机地址192.168.80.144 端口扫描&#xff0c;开放了80、139、445、2525端口 二、信息收集 访问80端口 路径扫描 dirsearch -u "http://192.168.80.144/" -e *访问扫描出来的路径&#xff0c;没有发现…

Idea安装完成配置

目录&#xff1a; 环境配置Java配置Maven配置Git配置 基础设置编码级设置File Header自动生成序列化编号配置 插件安装MyBtisPlusRestfulTooklkit-fix 环境配置 Java配置 Idea右上方&#xff0c;找到Project Settings. 有些版本直接有&#xff0c;有些是在设置下的二级菜单下…

LLM实现RPA

“PROAGENT: 从机器人流程自动化到代理流程自动化”这篇论文有几个创新点是比较有意思的&#xff1a;1.通过描述方式生成执行链&#xff0c;执行链通过代码方式生成保证执行链的稳健、可约束2.对执行过程抽取出数据结构&#xff0c;数据结构也通过代码生成方式来约束3.整个过程…

Web之CSS笔记

Web之HTML、CSS、JS 二、CSS&#xff08;Cascading Style Sheets层叠样式表&#xff09;CSS与HTML的结合方式CSS选择器CSS基本属性CSS伪类DIVCSS轮廓CSS边框盒子模型CSS定位 Web之HTML笔记 二、CSS&#xff08;Cascading Style Sheets层叠样式表&#xff09; Css是种格式化网…

【论文阅读】基于隐蔽带宽的汽车控制网络鲁棒认证(一)

文章目录 Abstract第一章 引言1.1 问题陈述1.2 研究假设1.3 贡献1.4 大纲 第二章 背景和相关工作2.1 CAN安全威胁2.1.1 CAN协议设计2.1.2 CAN网络攻击2.1.3 CAN应用攻击 2.2 可信执行2.2.1 软件认证2.2.2 消息身份认证2.2.3 可信执行环境2.2.4 Sancus2.2.5 VulCAN 2.3 侧信道攻…

竞赛 题目:基于深度学习卷积神经网络的花卉识别 - 深度学习 机器视觉

文章目录 0 前言1 项目背景2 花卉识别的基本原理3 算法实现3.1 预处理3.2 特征提取和选择3.3 分类器设计和决策3.4 卷积神经网络基本原理 4 算法实现4.1 花卉图像数据4.2 模块组成 5 项目执行结果6 最后 0 前言 &#x1f525; 优质竞赛项目系列&#xff0c;今天要分享的是 基…

Windows 11 设置 wsl-ubuntu 使用桥接网络

Windows 11 设置 wsl-ubuntu 使用桥接网络 0. 背景1. Windows 11 下启用 Hyper-V2. 使用 Hyper-V 虚拟交换机管理器创建虚拟网络3. 创建 .wslconfig 文件4. 配置 wsl.conf 文件5. 配置 wsl-network.conf 文件和 resolv.conf6. 创建 00-wsl2.yaml7. 安装 net-tools 和 openssh-s…

c# webapi 处理跨源问题

利用cors中间件处理跨源问题。 首先&#xff0c;什么是跨域&#xff08;跨源&#xff09;问题&#xff1a; 是指不同站点之间&#xff0c;使用ajax无法相互调用的问题。跨域问题本质是浏览器的一种保护机制&#xff0c;它的初衷是为了保证用户的安全&#xff0c;防止恶意网站窃…