2024.1.11 Kafka 消息队列,shell命令,核心原理

目录

 一 . 消息队列

二. Kafka

三 . 启动命令

 四 . Kafka的Shell 命令

五 . Kafka的核心原理

1. Topic的分区和副本机制

2 . 消息存储机制 和 查询机制     

3. Kafka中生产者数据分发策略

 六 . Kafka 之所以具有高速的读写性能,主要有以下几个原因

七. 笔记 


 一 . 消息队列

应用场景:

        应用解耦合:类似单点故障

        异步处理: 减少处理时间

        限流削峰 : 不管流量多大,放到消息队列中,都是按照一定的节奏进行处理

        消息驱动的系统: 消息队列,消息生产者,消费者(负责对消息进行处理)

        消息(message): 指的是数据,只不过这个数据存在一定流动状态
        队列(queue): 指的容器,可以存储数据,只不过这个容器具备FIFO(先进先出)特性

消息队列中两个角色:

        生产者producer:生产/发送消息到消息队列中

        消费者consumer: 从消息队列中获取消息

二. Kafka

1. 基本介绍

        kafka是一款消息队列的中间件产品;

        kafka特点:

                可靠性

                可扩展性

                耐用性

                性能

 2. kafka架构

        1. Kafka中集群节点叫broker ;

        2. 集群的节点与节点之间,没有主从之分 ; 

        3. 同一个分区的不同副本间中, 有主从关系 ,主是leader , 从是Follower

        4. 同一个Partitions分区可以设置多个副本 , 但是副本数量不能超过集群broker节点的个数

        5. zookeeper用来管理集群,以及管理元数据信息

        6. Topic 主题 ,是业务层面对消息进行分类的

三 . 启动命令

三台虚拟机启动Zookeeper

        cd /export/server/zookeeper/bin

        ./zkServer.sh start

node1脚本启动kafka

         cd /export/onekey

        ./start-kafka.sh

        ./stop-kafka.sh

 四 . Kafka的Shell 命令

1.  创建Topic

./kafka-topics.sh --bootstrap-server node1.itcast.cn:9092,node2.itcast.cn:9092 --create --topic test02 --partitions 4 --replication-factor 2  

参数: 

        -- bootstrap-server: Kafka集群中broker连接信息

        -- create : 指定操作类型 .这里是新建Topic

        -- topic: 指定要新建的Topic名称

        -- partitions :设置Topic的分区数

        -- relication-factor :设置Topic分区的副本数

 2.  查看Topic

./kafka-topics.sh --bootstrap-server node1.itcast.cn:9092,node2.itcast.cn:9092 --list

参数说明:
    --bootstrap-server: Kafka集群中broker连接信息
    --list: 指定操作类型。这里是查看Kafka集群上所有可用的Topic列表

 3. 查看具体Topic

./kafka-topics.sh --bootstrap-server node1.itcast.cn:9092,node2.itcast.cn:9092 --describe --topic test04
参数说明:
    --bootstrap-server: Kafka集群中broker连接信息
    --describe: 指定操作类型。这里是查看具体Topic信息

 4. 模拟生产者Producer

./kafka-console-producer.sh --broker-list node1.itcast.cn:9092,node2.itcast.cn:9092 --topic test04
参数说明:
    --broker-list: Kafka集群中broker连接信息
    --topic: 指定要将消息发送到哪个具体的Topic

5. 模拟消费者 Consumer

 

./kafka-console-consumer.sh --bootstrap-server node1.itcast.cn:9092,node2.itcast.cn:9092 --topic test04

参数说明:
    --bootstrap-server: Kafka集群中broker连接信息
    --topic: 指定要从哪个Topic中消费消息
    --from-beginning: 指定该参数以后,会从最旧的地方开始消费
    latest: 消费者(默认)从最新的地方开始消费
    --max-messages: 最多消费的条数。满足条数后,就会自动结束
    --group: 指定消费组名称。一个消费者只能属于一个消费组;一个消费组里面可以有多个消费者。同一个Topic中的同一条数据,只能被同一个消费组中的一个消费者所消费
    

6. 修改Topic

 

./kafka-topics.sh --bootstrap-server node1.itcast.cn:9092,node2.itcast.cn:9092 --alter --topic test01 --partitions 10

分区: 只能增大,不能减小。而且没有数量限制
副本: 既不能增大,也不能减小

减小分区:
./kafka-topics.sh --bootstrap-server node1.itcast.cn:9092,node2.itcast.cn:9092 --alter --topic test01 --partitions 1

 修改副本数:
./kafka-topics.sh --bootstrap-server node1.itcast.cn:9092,node2.itcast.cn:9092 --alter --topic test01 --replication-factor 2 --partitions 11

7. 删除Topic

 

./kafka-topics.sh --bootstrap-server node1.itcast.cn:9092,node2.itcast.cn:9092 --delete --topic test01

参数说明:
    --bootstrap-server: Kafka集群中broker连接信息
    --delete: 指定操作类型。这里是删除Topic
    --topic: 指定要删除哪个Topic

 8. 查看消费组中有多少个消费者

./kafka-consumer-groups.sh --bootstrap-server node1.itcast.cn:9092,node2.itcast.cn:9092 --group g_01 --members --describe

 

五 . Kafka的核心原理

1. Topic的分区和副本机制

        分区的作用:

                1- 避免单台服务器容量的限制

                2- 提升Topic的吞吐量

                3 - 分区数量不要超过Kafka集群中的broker节点个数的3倍

        副本的作用:

                1 - 提升数据安全性,但也会导致冗余过多

                2- 副本个数不能超过集群的broker节点个数,推荐副本1-3个

2 . 消息存储机制 和 查询机制     

        消息存储机制

1-xx.log和xx.index它们的作用是什么?
        答:
        xx.log: 称之为segment片段文件,也就是一个Partition分区的数据,会被分成多个segment(log)片段文件进行存储。
        xx.index: 称之为索引文件,该文件的作用是用来加快对xx.log文件内容检索的速度


2-xx.log和xx.index文件名称的意义?
        答: 这个数字是xx.log文件中第一条消息的offset(偏移量)


3-为什么一个Partition分区的数据要分成多个xx.log(segment片段文件)文件进行存储?
        答:
      1- 如果一个文件的数据量过大,打开和关闭文件都非常消耗资源
      2- 在一个大的文件中,检索内容也会非常消耗资源
      3- Kafka只是用来临时存储消息数据。会定时将过期数据删除。如果数据放在一个文件中,删除的效率低;

        如果数据分成了多个segment片段文件进行存储,删除的时候只需要判断segment文件最后修改时间,如果超过了保留时间,就直接将整个segment文件删除。该保留时间是通过server.properties文件中的log.retention.hours=168进行设置,默认保留168小时(7天)

        

        查询机制

查询步骤:
1- 首先先确定要读取哪个xx.log(segment片段)文件。368776该offset的消息在368769.log文件中
2- 查询xx.log对应的xx.index,查询该条消息的物理偏移量范围
3- 根据消息的物理偏移量范围去读取xx.log文件(底层是基于磁盘的顺序读取)
4- 最终就获取到了具体的消息内容

3. Kafka中生产者数据分发策略

分发策略如下这些:

  • 1- 随机分发策略:将消息发到到随机的某个分区上。Python支持,Java不支持

  • 2- 指定分区策略:将消息发到指定的分区上面。Python支持,Java支持

  • 3- Hash取模策略:对消息的key先取Hash值,再和分区数取模。Python支持,Java支持

  • 4- 轮询策略:在Kafka的2.4及以上版本,已经更名成粘性分发策略。Python不支持,Java支持

  • 5- 自定义分发策略:Python支持,Java支持

 六 . Kafka 之所以具有高速的读写性能,主要有以下几个原因

Kafka之所以具有高速的读写性能,主要有以下几个原因:

  1. 分布式架构:Kafka采用分布式架构,可以通过水平扩展来处理大规模的数据流。它将数据分成多个分区,并将这些分区分布在不同的节点上,实现了数据的并行处理和负载均衡,从而提高了读写性能。

  2. 零拷贝技术:Kafka使用零拷贝技术来减少数据在内存和磁盘之间的拷贝次数。它通过直接内存访问(DMA)技术,将数据从磁盘读取到内存或者从内存写入到磁盘,避免了数据的多次复制,减少了IO操作的开销,提高了读写性能。

  3. 批量写入和压缩:Kafka支持批量写入消息和消息的压缩。它可以将多个消息一次性写入到磁盘,减少了磁盘IO的次数,提高了写入性能。同时,Kafka还支持对消息进行压缩,减小了消息的存储空间,降低了网络传输的开销,进一步提高了读写性能。

  4. 高效的消息索引和存储结构:Kafka使用高效的消息索引和存储结构,例如日志结构和位移索引,可以快速地定位和检索消息。它采用追加写入的方式,顺序写入磁盘,减少了随机写入的开销,提高了读写性能。

综上所述,Kafka通过分布式架构、零拷贝技术、批量写入和压缩、高效的消息索引和存储结构等手段,实现了高速的读写性能,使其成为处理大规模数据流的理想选择。

七. 笔记 

count(1)会记null,

count(0)会记null,

count(*)会记null

 count(字段)不会记null

count (null)得到null

import os
from pyspark import SparkConf, SparkContext, StorageLevel
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql import Window as win
from pyspark.sql.types import StructType, IntegerType, StringType, StructField, FloatType# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'if __name__ == '__main__':# 1- 创建SparkSession对象spark = SparkSession.builder \.config('spark.sql.shuffle.partitions', 1) \.appName('new_sale') \.master('local[*]') \.getOrCreate()# 使用框架spark.conf.set('spark.sql.execution.arrow.pyspark.enabled', True)

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/614091.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

如何让 QTableView控件五颜六色?

要让 QTableView 控件五颜六色,您可以使用 QStandardItemModel 的 setData() 方法为每个单元格设置不同的背景色。以下是一个示例: // 创建数据模型和设置表头略...// 设置单元格背景色 model->setData(model->index(0, 0), QColor(Qt::red), Qt:…

布隆过滤器 应用场景 优势 不足

布隆过滤器是一种空间效率很高的概率型数据结构,主要用于快速判断一个元素是否存在于一个集合中。它的主要应用场景、优势和不足如下: 应用场景 缓存:在缓存系统中,可以利用布隆过滤器快速判断某个数据是否存在于缓存中&#xf…

212. 单词搜索 II(字典树的另一种类型)

大致思路是: 根据words列表建立字典树,其中注意在单词末尾,将原来的isEnd变量换成存储这个单词的变量,方便存储到ans中,另外,字典树的字节点由原来的Trie数组变为hashmap,方便检索字母。 建立…

【AIGC】一组精美动物AI智能画法秘诀

如何使用AI绘画,从以下角度,依据表格内容梳理,表格如下: 外貌特征物种姿势特征描述场景风格技术描述小巧可爱幼小浣熊倚在桌子上具有人形特征中世纪酒馆电影风格照明8k分辨率细节精致毛茸茸手持咖啡杯Jean-Baptiste Monge的风格蓝…

一日难再晨及时当勉励 date

文章目录 Linux shell 获取更改系统时间默认输入显示时区世界协调时格式化日期更多信息 Linux shell 获取更改系统时间 … note:: 时光只解催人老,不信多情,长恨离亭,泪滴春衫酒易醒。 - 晏殊《采桑子时光只解催人老》date命令可以用来打印…

RT-Thread入门笔记4-跑马灯线程实例

RT-Thread操作系统是基于线程调度的多任务系统。 线程状态切换 调度过程是一种完全抢占式的基于优先级的调度算法。 支持8/32/256优先级,其中0表示最高,7/31/255表示最低。最低优先级7/31/255优先级用于空闲线程。 支持以相同优先级运行的线程。 共享时…

232.【2023年华为OD机试真题(C卷)】计算三叉搜索树的高度(JavaPythonC++JS实现)

🚀点击这里可直接跳转到本专栏,可查阅顶置最新的华为OD机试宝典~ 本专栏所有题目均包含优质解题思路,高质量解题代码(Java&Python&C++&JS分别实现),详细代码讲解,助你深入学习,深度掌握! 文章目录 一. 题目-计算三叉搜索树的高度二.解题思路三.题解代码P…

JS 监听浏览器各个标签间的切换-visibilitychange事件介绍

JS 监听浏览器各个标签间的切换 以前看到过一些网页,在标签切换到其它地址时,网页上的标题上会发生变化,一直不知道这个是怎么做的,最近查了一些资料才发现有一个 visibilitychange 事件就可以搞定,这里将介绍一下页面…

Linux使用信号量sem_timedwait当作定时器

主要是通过判断信号量等待超时,然后达到计时的目的。 创建信号量 sem_t *p_sem sem_open("mysem2", O_CREAT, 0666, 0); 获取当前系统时间 struct timespec ts; clock_gettime(CLOCK_REALTIME, &ts); 此函数用来获得当前时间,结果存…

铭文 LaunchPad 平台 Solmash 推出早鸟激励计划

为感谢用户对Solmash的支持,Solmash 特别推出“Solmash早鸟激励计划”,以回馈社区的早期参与者,这是专为已经参与Staking Pool或Honest Pool的用户推出的激励。 Solmash NFT激励 被列入早鸟计划的用户,可通过点击:sol…

文件重命名:一键操作,轻松把扩展字母改成大写,提升文件管理效率

在文件管理的过程中,经常要对文件进行重命名,以更好地组织和管理文件。例如要把文件扩展名从小写改为大写,手动把每个文件的扩展名改为大写即耗时,还容易出错。下面来看云炫文件管理器怎么批量把文件扩展名字母改成大写。 文件扩展…

Flutter Scrollbar滑动条与SingleChildScrollView的结合使用的小细节

我在业务开发中,ListView是竖向滑动的,然后 ListView中的每一个小条目比较长,我需要横向滑动,所以 就有了 ListView中多个SingleChildScrollView(横向滑动),但是在视觉上,我期望告知用户可以横向滑动&#…

算法第十四天-删除有序数组中的重复项

删除有序数组中的重复项 题目要求 解题思路 双指针 左指针确定不重复值,右指针遍历数组 代码 class Solution:def removeDuplicates(self, nums: List[int]) -> int:left0for right in range(1,len(nums)):if nums[left] ! nums[right]:left 1nums[left] nu…

上市路上,如何打好合规与增长的双赢之战? |CFO x CIO 专刊

经济发展的新旧动能转化之下,企业需要找到可持续的高质量发展之路。以数字化智能化为推动力,做好内控与合规,不仅能保证企业的发展不偏离航道,还能有效激发数字资产价值,帮企业获得新发展动能。不管是拟上市企业还是已…

QObject_other

QObject 属性定义 自定义属性我用到的较少,只在自定义键盘时用到了。 属性的行为类似于类数据成员,但它们具有可通过元对象系统访问的附加特性 Q_PROPERTY关键字 定义语法: Q_PROPERTY(type name (READ getFunction [WRITE setFunction] |…

【AI视野·今日NLP 自然语言处理论文速览 第七十三期】Tue, 9 Jan 2024

AI视野今日CS.NLP 自然语言处理论文速览 Tue, 9 Jan 2024 Totally 80 papers 👉上期速览✈更多精彩请移步主页 Daily Computation and Language Papers FFSplit: Split Feed-Forward Network For Optimizing Accuracy-Efficiency Trade-off in Language Model Infe…

LeetCode-2645. 构造有效字符串的最少插入数

给你一个字符串 word ,你可以向其中任何位置插入 “a”、“b” 或 “c” 任意次,返回使 word 有效需要插入的最少字母数。 如果字符串可以由 “abc” 串联多次得到,则认为该字符串有效 。 示例 1: 输入:word “b” …

centos7上升级mysql8.0.21到mysql8.0.35版本

1、下载安装包 cd /home/soft/mysql8.0.35 wget https://dev.mysql.com/get/Downloads/MySQL-8.0/mysql-8.0.35-1.el7.x86_64.rpm-bundle.tar 2、解压压缩包 3、停止掉mysqld服务 systemctl stop mysqld 4、强制安装新的版本包 rpm -ivh mysql-community-common-8.0.35-1.el…

Fluids —— Whitewater (SOP)

目录 Whitewater Lifecycle Workflow Whitewater source Deformation sources Visualizing whitewater Whitewater solver Wind Foam erosion Repellants Whitewater postprocess 基于SOP的白水是对SOP FLIP工作流的增强;该系统与规模无关,无需…

重温大学时奋斗的20条

重温大学时奋斗的20条 一.即哈佛训言20条。二.行稳致远体会对未来说的话 一.即哈佛训言20条。 1.This moment will nap, you will have a dream; But this moment study,you will interpret a dream. 此刻打盹,你将做梦;而此刻学习,你将圆梦。…