Kafka基本介绍

消息队列

产生背景

消息队列:指的数据在一个容器中,从容器中一端传递到另一端的过程

消息(message): 指的是数据,只不过这个数据存在一定流动状态
队列(queue): 指的容器,可以存储数据,只不过这个容器具备FIFO(先进先出)特性
公共容器需要具备什么特点?
1- 公共性: 各个程序都可以与之对接
2- FIFO特性: 先进先出
3- 具备高效的并发能力: 能够承载海量数据
4- 具备一定的容错能力: 比如支持重新读取消息方案

消息队列介绍

常见的消息队列产品

MQ:message queue消息队列

activeMQ: 出现时期比较早的一款消息队列的中间件产品,在早期使用人群是非常多,目前整个社区活跃度严重下降,使用人群基本很少
rabbitMQ: 此款是目前使用人群比较多的一款消息队列的中间件的产品,社区活跃度比较高,主要是应用传统业务领域中
rocketMQ: 是阿里推出的一款消息队列的中间件的产品,目前主要是在阿里系环境中使用,目前支持的客户端比较少,主要是Java中应用较多
Kafka: Apache旗下的顶级开源消息,是一款消息队列的中间件产品项目来源于领英,是大数据体系中目前为止最为常用的一款消息队列的产品

应用场景

  • 应用解耦合
  • 异步处理
  • 限流削峰
  • 消息驱动系统

消息队列中两种消息模型

在Java中, 为了能够集成消息队列的产品, 专门提供了一个消息队列的协议: JMS(Java Message Server)  java消息服务消息队列中两个角色: 生产者(producer) 和 消费者(consumer)
生产者: 生产/发送消息到消息队列中
消费者: 从消息队列中获取消息在JMS规范中, 专门规定了两种消息消费模型: 
1- 点对点消费模型: 指的一条消息最终只能被一个消费者所消费。微信聊天的私聊
2- 发布订阅消费模型: 指的一条消息最终被多个消费者所消费。微信聊天的群聊

Kafka基本介绍

基本介绍

Kafka是一款消息队列的中间件产品, 来源于领英公司, 后期贡献给了Apache, 目前是Aapche旗下的顶级开源项目, 采用语言是Scala
官方地址: http://kafka.apache.org

kafka特点:

- 可靠性:Kafka集群是分布式的,并且有多副本的机制。数据可以自动复制
- 可扩展性:Kafka集群可以灵活的调整,在线扩容
- 耐用性:Kafka数据保存在磁盘上面,数据并且有多副本的机制。数据持久化,而且可以一定程度上防止数据丢失
- 高性能:Kafka可以存储海量的数据,虽然是使用磁盘进行数据存储,但是Kafka有各种优化手段(例如:磁盘的顺序读写、零拷贝等)提高数据的读写速度(吞吐量)

Kafka的架构

在这里插入图片描述

1- Kafka中集群节点叫broker,节点和节点之间没有主从之分,地位是完全一样
2- Topic:主题/话题,是业务层面对消息进行分类的。
3- 一个Topic可以设置多个Partition分区。
4- 同一个Partition分区可以设置多个副本,但是副本数不能超过(>)集群broker节点的个数
5- 虽然broker节点间没有主从之分,但是同一个Partition分区的不同副本间有主从之分,分为了Leader主副本和Follower从副本
6- 生产者将数据首先发送给到Leader主副本,接着是Leader主副本主动的往Follower从副本上同步消息
7- Zookeeper用来管理集群,以及管理元数据信息
8- ISR同步列表。该列表中存放的是与Leader主副本消息同步程度最接近的Follower从副本,也就是消息最小的一个列表。
该列表作用,当Leader主副本无法对外提供服务的时候,会从该ISR列表中选择一个Follower从副本变成Leader主副本,对外提供服务相关名词:
Kafka Cluster: Kafka集群
Topic: 主题/话题
Broker: Kafka中的节点
Producer: 生产者,负责生产/发送消息到Kafka中
Consumer: 消费者,负责从Kafka中获取消息
Partition: 分区。一个Topic可以设置多个分区,没有数量限制

Kafka的相关使用

Kafka的shell命令使用

Kafka本质上就是一个消息队列的中间件的产品,主要负责消息数据的传递。

1- 创建Topic

./kafka-topics.sh --bootstrap-server node1.itcast.cn:9092,node2.itcast.cn:9092 --create --topic test02 --partitions 4 --replication-factor 2  参数说明:--bootstrap-server: Kafka集群中broker连接信息--create: 指定操作类型。这里是新建Topic--topic: 指定要新建的Topic名称--partitions: 设置Topic的分区数--replication-factor: 设置Topic分区的副本数

设置副本数超过了集群broker节点个数

./kafka-topics.sh --bootstrap-server node1.itcast.cn:9092,node2.itcast.cn:9092 --create --topic test04 --partitions 4 --replication-factor 4

在这里插入图片描述
2- 查看Topic

./kafka-topics.sh --bootstrap-server node1.itcast.cn:9092,node2.itcast.cn:9092 --list参数说明:--bootstrap-server: Kafka集群中broker连接信息--list: 指定操作类型。这里是查看Kafka集群上所有可用的Topic列表

在这里插入图片描述
3- 查看具体Topic

./kafka-topics.sh --bootstrap-server node1.itcast.cn:9092,node2.itcast.cn:9092 --describe --topic test04
参数说明:--bootstrap-server: Kafka集群中broker连接信息--describe: 指定操作类型。这里是查看具体Topic信息

在这里插入图片描述
4- 模拟生产者Producer

./kafka-console-producer.sh --broker-list node1.itcast.cn:9092,node2.itcast.cn:9092 --topic test04
参数说明:--broker-list: Kafka集群中broker连接信息--topic: 指定要将消息发送到哪个具体的Topic

在这里插入图片描述
5- 模拟消费者Consumer

./kafka-console-consumer.sh --bootstrap-server node1.itcast.cn:9092,node2.itcast.cn:9092 --topic test04参数说明:--bootstrap-server: Kafka集群中broker连接信息--topic: 指定要从哪个Topic中消费消息--from-beginning: 指定该参数以后,会从最旧的地方开始消费latest: 消费者(默认)从最新的地方开始消费--max-messages: 最多消费的条数。满足条数后,就会自动结束--group: 指定消费组名称。一个消费者只能属于一个消费组;一个消费组里面可以有多个消费者。同一个Topic中的同一条数据,只能被同一个消费组中的一个消费者所消费参数一般如何使用?
答: 推荐latest、--max-messages、--group一同使用。因为实际中Topic的数据量是特别大的,消费、打印都需要消耗服务器的资源,如果不限定消费的最大条数,可能造成服务器宕机。

在这里插入图片描述
6- 修改Topic

./kafka-topics.sh --bootstrap-server node1.itcast.cn:9092,node2.itcast.cn:9092 --alter --topic test01 --partitions 10分区: 只能增大,不能减小。而且没有数量限制
副本: 既不能增大,也不能减小
减小分区:
./kafka-topics.sh --bootstrap-server node1.itcast.cn:9092,node2.itcast.cn:9092 --alter --topic test01 --partitions 1

在这里插入图片描述

修改副本数:
./kafka-topics.sh --bootstrap-server node1.itcast.cn:9092,node2.itcast.cn:9092 --alter --topic test01 --replication-factor 2 --partitions 11

在这里插入图片描述
7- 删除Topic

./kafka-topics.sh --bootstrap-server node1.itcast.cn:9092,node2.itcast.cn:9092 --delete --topic test01参数说明:--bootstrap-server: Kafka集群中broker连接信息--delete: 指定操作类型。这里是删除Topic--topic: 指定要删除哪个Topic

8- 查看消费组中有多少个消费者

./kafka-consumer-groups.sh --bootstrap-server node1.itcast.cn:9092,node2.itcast.cn:9092 --group g_01 --members --describe

在这里插入图片描述

Kafka基准测试

Kafka的基准测试, 主要是用于测试Kafka集群的吞吐量, 每秒钟最大可以生产多少条数据, 以及每秒钟最大可以消费多少条数据

测试生产的效率

1- 创建Topic

./kafka-topics.sh --create --bootstrap-server node1.itcast.cn:9092,node2.itcast.cn:9092,node3.itcast.cn:9092 --partitions 3 --replication-factor 2 --topic test01

2- 执行生产测试命令: 测试后,会增加4GB磁盘占用

./kafka-producer-perf-test.sh --topic test01 --num-records 4000000 --throughput -1 --record-size 1000 --producer-props bootstrap.servers=node1.itcast.cn:9092,node2.itcast.cn:9092,node3.itcast.cn:9092 acks=1

在这里插入图片描述
3- 测试结果
在这里插入图片描述

测试消费的效率

1- 执行消费测试命令

./kafka-consumer-perf-test.sh --broker-list node1.itcast.cn:9092,node2.itcast.cn:9092,node3.itcast.cn:9092 --topic test01 --fetch-size 1048576 --messages 500000

在这里插入图片描述
2- 测试结果:
在这里插入图片描述

Kafka的Python API的操作

纯Python的方式操作Kafka。
准备工作:在node1的节点上安装一个python用于操作Kafka的库

安装命令:
python -m pip install kafka-python -i https://pypi.tuna.tsinghua.edu.cn/simpleAPI使用的参考文档:
https://kafka-python.readthedocs.io/en/master/usage.html#kafkaproducer

在这里插入图片描述

完成生产者代码
import timefrom kafka import KafkaProducer# 同步发送
def sync_send():global topic, partition, offset# 2.1- 同步发送数据/消息metadata = producer.send("test01", value=f"hello_java_{i}".encode("UTF-8")).get()# metadata = producer.send("test03",value=f"hello_spark_{i}".encode("UTF-8")).get()# 2.2- 获取元信息中的内容topic = metadata.topicpartition = metadata.partition"""offset消息偏移量,从0开始编号。也就是一条消息在分区中的序号/索引在不同分区间,消息偏移量是无序在同一个分区里面,消息偏移量是有序"""offset = metadata.offsetprint(f"{topic},{partition},{offset},{metadata}")if __name__ == '__main__':# 1- 创建生产者producer = KafkaProducer(bootstrap_servers=["node1.itcast.cn:9092","node2.itcast.cn:9092"])# 2- 发送消息for i in range(10):# 同步发送# sync_send()# 2.3- 异步发送"""异步发送,需要等待一下,或者明确关闭Producer生产者"""producer.send("test01", value=f"hello_hive_{i}".encode("UTF-8"))time.sleep(1)# 3- 释放资源/关闭生产者# producer.close()

在这里插入图片描述
可能遇到的错误:
在这里插入图片描述

原因: 服务器环境有问题。是因为服务器上既安装了kafka-python的第三方依赖,同时还安装kafka的第三方依赖。可以通过pip list | grep kafka进行确定
解决办法: 先将这两个第三方依赖全部卸载,然后再重新执行如下命令
python -m pip install kafka-python -i https://pypi.tuna.tsinghua.edu.cn/simple
完成消费者代码
from kafka import KafkaConsumerif __name__ == '__main__':# 1- 创建消费者consumer = KafkaConsumer("test01",bootstrap_servers=["node1.itcast.cn:9092", "node2.itcast.cn:9092"])# 2- 消费消息for msg in consumer:topic = msg.topicpartition = msg.partitionoffset = msg.offset# key和value消费出来都是bytes数据类型,需要进行解码key = msg.keyvalue = msg.valueprint(f"{topic}{partition}{offset}{key}{value.decode('UTF-8')}{msg}")

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/619396.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

kali_linux换源教程

vim /etc/apt/sources.list #阿里云deb http://mirrors.aliyun.com/kali kali-rolling main non-free contribdeb-src http://mirrors.aliyun.com/kali kali-rolling main non-free contrib#清华大学源deb http://mirrors.tuna.tsinghua.edu.cn/kali kali-rolling main contrib…

Android14实战:打破音频默认重采样的限制(五十二)

简介: CSDN博客专家,专注Android/Linux系统,分享多mic语音方案、音视频、编解码等技术,与大家一起成长! 优质专栏:Audio工程师进阶系列【原创干货持续更新中……】🚀 优质专栏:多媒体系统工程师系列【原创干货持续更新中……】🚀 人生格言: 人生从来没有捷径,只…

5文件操作

包含头文件<fstream> 操作文件三大类&#xff1a; ofstream : 写文件ifstream &#xff1a;读文件fstream : 读写文件 5.1文本文件 -文件以ascii的形式存储在计算机中 5.1.1写文件 步骤&#xff1a; 包含头文件 #include "fstream"创建流对象 ofs…

【STM32】STM32学习笔记-FlyMCU串口下载和STLINK Utility(30)

00. 目录 文章目录 00. 目录01. 串口简介02. 串口连接电路图03. FlyMCU软件下载程序04. 串口下载原理05. FlyMCU软件其它操作06. STLINK Utility软件07. 软件下载08. 附录 01. 串口简介 串口通讯(Serial Communication)是一种设备间非常常用的串行通讯方式&#xff0c;因为它简…

WebRTC入门:基础的核心协议与概念(二十三)

简介: CSDN博客专家,专注Android/Linux系统,分享多mic语音方案、音视频、编解码等技术,与大家一起成长! 优质专栏:Audio工程师进阶系列【原创干货持续更新中……】🚀 优质专栏:多媒体系统工程师系列【原创干货持续更新中……】🚀 人生格言: 人生从来没有捷径,只…

【C#】C#实现PDF合并

文章目录 一、下载iTextSharp.dll下载iTextSharp.dll命名空间引入 二、界面设计三、代码全局变量选择文件夹的按钮确认合并的按钮 四、导出结果五、完整源码 一、下载iTextSharp.dll 下载iTextSharp.dll 可使用联机方式或者文件下载方式。 命名空间引入 代码开始时引入了一…

Android Studio导入项目 下载gradle很慢或连接超时

AS最常见的问题之一就是下载gradle非常慢&#xff0c;还经常出现下载失败的情况&#xff0c;没有gradle就无法build项目&#xff0c;所以一定要先解决gradle的下载问题&#xff0c;下面教大家两种常用方法。 因为我的项目绝大多数使用的是gradle-5.6.4-all&#xff0c;下面就以…

【搜索引擎设计:信息搜索怎么避免大海捞针?

在前面我们提到了网页爬虫设计&#xff1a;如何下载千亿级网页&#xff1f;中&#xff0c;我们讨论了大型分布式网络爬虫的架构设计&#xff0c;但是网络爬虫只是从互联网获取信息&#xff0c;海量的互联网信息如何呈现给用户&#xff0c;还需要使用搜索引擎完成。因此&#xf…

MYSQL分表分库 详解

目录 一、垂直拆分于水平拆分的区别&#xff1f; 垂直拆分 水平拆分 二、分表分库有哪些策略&#xff1f; Hash分片策略 枚举分片策略 日期分片策略 范围分片策略&#xff08;用的较多&#xff09; 三、分表分库之后&#xff0c;如何查询的呢&#xff1f; 四、分表分…

【RHEL】Vivado调用VCS+Verdi联合仿真报错解决

问题描述 在使用VCS Verdi仿真Vivado工程时&#xff0c;点击行为仿真按钮进度条窗口消失后&#xff0c;Verdi窗口并未出现&#xff0c;查看消息报错如下&#xff1a; vcs: line 34205: 119837 Segmentation fault (core dumped) ${TOOL_HOME}/bin/cfs_ident_exec -f ${X…

网络安全已死,趁早转行?

近年来&#xff0c;曾经被寄予厚望的网络安全行业似乎正逐渐失去昔日的辉煌。曾经一度备受瞩目的网络安全专业&#xff0c;如今却面临着降薪、裁员的困境。许多公司对网络安全的重视程度不高&#xff0c;网络安全岗位成了背锅的代名词。在这样的环境下&#xff0c;有人开始质疑…

智能小车项目(七)通过PID实现给定和实际速度值计算PWM输出

我们先看大脑&#xff08;上位机nano&#xff09; keybord_ctrl节点发布’cmd_vel’消息消息类型为Twist队列大小为1 pub rospy.Publisher(cmd_vel, Twist, queue_size1)if not stop: pub.publish(twist)driver_node订阅这个消息 当有消息时cmd_vel_callback回掉函数处理消息…

感染嗜肺军团菌是什么感觉?

记录一下最近生病的一次经历吧&#xff0c;可能加我好友的朋友注意到了&#xff0c;前几天我发了个圈&#xff0c;有热心的朋友还专门私信了我说明了他自己的情况和治疗经验&#xff0c;感谢他们。 ​ 那么关于这次生病的经历&#xff0c;给大家分享一下。 首先&#xff0c;这次…

redis夯实之路-持久化之RDB与AOF详解

数据库 初始化服务器时会根据redisServer的dbnum属性来决定创建多少个数据库&#xff0c;默认为16 使用select切换数据库 客服端状态redisClient结构的db属性记录了当前的目标数据库 RedisDb结构的dict字典保存了数据库的所有键值对&#xff0c;这个字典被称为键空间。 cru…

C++I/O流——(2)预定义格式的输入/输出(第二节)

归纳编程学习的感悟&#xff0c; 记录奋斗路上的点滴&#xff0c; 希望能帮到一样刻苦的你&#xff01; 如有不足欢迎指正&#xff01; 共同学习交流&#xff01; &#x1f30e;欢迎各位→点赞 &#x1f44d; 收藏⭐ 留言​&#x1f4dd; 含泪播种的人一定能含笑收获&#xff…

【MySQL】MySQL表的约束-空属性/默认值/列属性/zerofill/主键/自增长/唯一键/外键

文章目录 表的约束1.空属性 --null && not null2.默认值 -- default3.列描述4.zerofill5.主键6.自增长7.唯一键8.外键 表的约束 表的约束&#xff1a;表中一定要有各种约束&#xff0c;通过约束&#xff0c;让我们未来插入数据库表中的数据是符合预期的。约束的本质是…

【QT】多层QTreeWidget与QStackedWidget的关联操作

通过点击多层QTreeWidget来控制QStackedWidget中的page页面切换 treeWidget设计 treeWidget设计&#xff1a; // treeWidget设计ui->treeWidget->clear();ui->treeWidget->setColumnCount(1);//第一层QStringList l;l<<"管理系统";QTreeWid…

iPhone“查找”最多可添加32个物品!

对于那些丢三落四的果粉来说&#xff0c;苹果的“查找”功能是一大福音。不管是丢失了iPhone、iPad、Mac、AirPods还是AirTag&#xff0c;都可以通过“查找”功能在地图上追踪设备的位置&#xff0c;甚至是远程锁定或抹掉设备的数据。 那么&#xff0c;iPhone的查找一次能支持添…

LeetCode 38 外观数列

题目描述 外观数列 给定一个正整数 n &#xff0c;输出外观数列的第 n 项。 「外观数列」是一个整数序列&#xff0c;从数字 1 开始&#xff0c;序列中的每一项都是对前一项的描述。 你可以将其视作是由递归公式定义的数字字符串序列&#xff1a; countAndSay(1) "1…

软件测试|解决Github port 443 : Timed out连接超时的问题

前言 GitHub是全球最大的开源代码托管平台之一&#xff0c;许多开发者和团队使用它来管理和协作开源项目。但在当下&#xff0c;我们在clone或者提交代码时会经常遇到"GitHub Port 443: Timed Out"错误&#xff0c;这意味着我们的电脑无法建立与GitHub服务器的安全连…