07、Kafka ------ 消息生产者(演示 发送消息) 和 消息消费者(演示 监听消息)

目录

  • Kafka --- 消息生产者
    • ★ 消息
    • ★ 消息的分发机制
    • ★ 分发到哪个分区
    • ★ 轮询策略(round-robin)
    • ★ 使用命令行工具发送消息
      • 演示添加消息
  • Kafka --- 消息消费者
    • ★ 消息消费者命令
    • ▲ 监听 【指定主题】 的所有消息:
    • ▲ 监听 【指定主题、指定分区】的所有消息:
    • ▲ 监听【指定主题、指定分区】的【新】消息:
    • ▲ 监听【指定主题、指定分区的、指定下标及之后】的所有消息:
    • kafka灵活之处:

Kafka — 消息生产者


★ 消息

简单来说,就是一个数据项。

▲ 消息就是 Kafka 所记录的数据节点,消息在 Kafka 中又被称为记录(record)或事件(event)。

从存储上来看,消息就是存储在分区文件(有点类似于List)中的一个数据项,消息具有 key、value、时间戳 和 可选的元数据头。

▲ 下面是一个示例事件:
key: “fkjava”
value: “publish a new Book”
timestamp: “Feb. 15, 2021 at 2:06 p.m.”


★ 消息的分发机制

【注意:】当程序向主题发送消息时,该消息会被立即分给该主题下某一个领导者分区。

消息生产者向消息主题发送消息,这些消息将会立即分发给该主题下某一个分区(此处说的都是领导者分区)来保存

主题下的每条消息只会保存在一个领导者分区中,而不会在多个领导者分区中保存多份

消息实际上是存在分区中的,往主题发送消息只是一种逻辑说法。
当生产者发送一条消息到一个主题的时候,实际上这个消息马上就会被直接分发到对应的某一个领导者分区当中。

非领导者分区只是领导者分区的后备,也就是备份而已,当领导者分区挂掉的时候,非领导者分区就有可能成为领导者分区。

但是真正能够直接与客户进行交互的,就是直接接收用户的数据,或者让用户来消费数据的只能是领导者分区。

在这里插入图片描述


★ 分发到哪个分区

当生产者发送一条消息时,它会按如下规则来决定该消息被分发到哪个分区:

优先级:

最优先:(1)如果在发送消息时指定了分区,则消息分发到指定的分区。

(2)如果发送消息时没有指定分区,但消息的key不为空,则基于key的hashCode来选择一个分区。

此处暗示了:在一段时间内:同一个key的多条消息,通常会被分发到同一个分区

最次:(3)如果既没有指定分区,且消息的key也是空,则用轮询策略(round-robin)来选择一个分区。


★ 轮询策略(round-robin)

轮询策略(round-robin)就是按顺序来分发消息,

比如下面一个主题有P0、P1、P2三个分区,

那么第一条消息被分发到P0分区,
第二条消息被分发到P1分区,
第三条消息被分发到P2分区,

以此类推,第四条消息又被分发到P0分区。
在这里插入图片描述


★ 使用命令行工具发送消息

Kafka提供了kafka-console-producer.bat(.sh)工具来发送消息,例如执行如下命令:

下面命令发送带key的消息:

kafka-console-producer.bat ^
--bootstrap-server localhost:9092 ^
--topic test2 ^
--property parse.key=true

上面命令指定向test2这个主题发送消息,并通过“parse.key=true”指定发送消息时会解析消息的key,

默认解析规则为:制表符(Tab键)之前的是key,制表符(Tab键)之后的是value。

如果不指定“parse.key=true”属性,则默认不解析消息的key,也就是发送不带key的消息。

下面命令发送不带key的消息:

kafka-console-producer.bat ^
--bootstrap-server localhost:9092 ^
--topic test2


演示添加消息

因为演示过程中,我弄的3个节点老是只有一个能存活,所以把 kafka和zookeeper的存储数据的文件夹都删除了,重新启动,就好了。方便演示。

删除Kaka和zookeeper的存储数据的文件夹,重新启动

现在3个节点就能正常存活了。
在这里插入图片描述

接下来继续演示:
添加 不带key 和带key的消息,演示生产者发送消息:

解释:
1、演示生产者发送消息,消息发送到主题 test2 那里
2、发送没有带 key 的消息,kafka 采用的是轮询的策略,把消息存放到不同的分区里面;如图,test2 主题有4个分区(属于领导者分区),那么这8条消息就会轮询的发送到这4个分区里面
3、发送带key的消息,如图,key 是 ljh,kafka就会计算 ljh 这个key的hashcode 值,然后存放都某一个分区里面,因为key都是一样的,所以这几个key为ljh的消息都会被发到同一个分区里面。
但是具体发送到哪个分区,是无法指定的。
4、发送带key的消息,如图,key 和 value 之间要间隔一个 Tab 键,不要弄成空格键。

命令在上面

在这里插入图片描述

我自己再添加不同key的消息,可以看出新添加的两条消息,是存放在0分区的。

在这里插入图片描述



Kafka — 消息消费者

★ 消息消费者命令

消费者用于从消息主题读取消息,Kafka提供了kafka-console-consumer.bat工具命令从指定主题、甚至指定分区读取消息。
该工具支持如下常用选项:

 --bootstrap-server:指定要连接的Kafka主机和端口。--from-beginning:指定从开始读取消息。--group:指定组ID。--offset <String: consume offset>:指定从特定下标开始读取消息,比如将该选项设为1,表明从第2条消息开始读取;该选项还支持earliest和latest两个字符串值,其中earliest表示从最开始处读取(类似于--from-beginning选项的作用),latest表示从最新处开始读取、即不读取之前的消息,latest是默认值。--partition <Integer: partition>:指定哪个分区。--property:用于指定一些额外属性,比如 print.timestamp=true 指定要输出时间戳,print.key=true表示输出消息key,print.offset=true表示打印消息的下标,print.partition=true表示打印分区信息。--topic:指定哪个主题。 


▲ 监听 【指定主题】 的所有消息:

这个监听命令,运行后是一直存在的,会一直监听,有新消息会马上监听出来的。

 kafka-console-consumer --bootstrap-server localhost:9092 ^--topic test2 ^--from-beginning ^--property print.timestamp=true ^--property print.key=true ^--property print.offset=true ^--property print.partition=true

可以看到消息都成功存放在分区里面。
可以看到指定主题 test2 下的所有消息
但是目前还演示出轮询存放,先不理。

在这里插入图片描述



▲ 监听 【指定主题、指定分区】的所有消息:

看看分区2下的所有消息

 kafka-console-consumer --bootstrap-server localhost:9092 ^--topic test2 ^--from-beginning ^--partition 2^--property print.timestamp=true ^--property print.key=true ^--property print.offset=true ^--property print.partition=true

在这里插入图片描述

看看分区3下的所有消息

 kafka-console-consumer --bootstrap-server localhost:9092 ^--topic test2 ^--from-beginning ^--partition 3^--property print.timestamp=true ^--property print.key=true ^--property print.offset=true ^--property print.partition=true

分区3没有消息,所以没有任何显示:正确
在这里插入图片描述




查看分区0的消息:
有两条,正确
在这里插入图片描述


注意点:
–from-beginning:指定从开始读取消息。
添加这个命令,就是一直都会读取到从一开始就添加的消息,这样演示不够真实,因为我们消息消费之后,正常情况下我们不需要再查出来,所以可以用offset这个命令:
–offset <String: consume offset>:指定从特定下标开始读取消息。
下面就来演示这个offset命令:

 --offset <String: consume offset>:指定从特定下标开始读取消息,比如将该选项设为1,表明从第2条消息开始读取;该选项还支持earliest和latest两个字符串值,其中earliest表示从最开始处读取(类似于--from-beginning选项的作用),latest表示从最新处开始读取、即不读取之前的消息,latest是默认值。

▲ 监听【指定主题、指定分区】的【新】消息:

(模拟传统的ActiveMQ、RabbitMQ的消息模型):

 kafka-console-consumer --bootstrap-server localhost:9092 ^--topic test2 ^--offset latest ^--partition 0^--property print.timestamp=true ^--property print.key=true ^--property print.offset=true ^--property print.partition=true

演示 latest 从最新处开始读取、即不读取之前的消息,latest是默认值。

如图:现在从最新开始出监听消息,为了演示能监听最新消息,我们再打开一个命令行小黑窗,来往这个 0 分区发送消息。

因为上面key 为 l 的消息是发送到 0 分区,所以接下来发送的消息key也设置为l

在这里插入图片描述

如图:生产者刚发送消息,消费者这边马上就监听到主题为test2,分区为0 的最新的消息。

在这里插入图片描述



▲ 监听【指定主题、指定分区的、指定下标及之后】的所有消息:

这个 --offset 2 ^ 指定下标,查出来的消息是包括索引下标2 这条消息的。


先查出主题test2,分区0的所有消息,用来对比:

 kafka-console-consumer --bootstrap-server localhost:9092 ^--topic test2 ^--from-beginning ^--partition 0^--property print.timestamp=true ^--property print.key=true ^--property print.offset=true ^--property print.partition=true

在这里插入图片描述


再查出 【主题test2,分区0,指定索引下标为2及之后】 的所有消息,用来对比:

 kafka-console-consumer --bootstrap-server localhost:9092 ^--topic test2 ^--offset 2 ^--partition 0^--property print.timestamp=true ^--property print.key=true ^--property print.offset=true ^--property print.partition=true

上面有4条消息,所以2及之后的消息,应该有2条,为索引2 nnnnnnnnnn,索引3 mmmmmmmm

在这里插入图片描述



kafka灵活之处:

灵活之处, --offset latest ^ 这个设置,默认就是latest ,就是从最新处开始读取、不读取之前的消息,始终读取最新的消息,以前用过的消息就不会管了。
因为 kafka 内部有一个偏移主题来存储每一个分区里面的消息及消息曾经被读取到哪一条(哪个位置)。

更灵活之处:
我们可以通过 --offset 加上指定的索引下标,非常灵活的读取我们想要读取的哪个位置的消息。
从上面的消息监听可以看出,消息是一直保存在分区当中的,意味着消息被消费之后,并没有立即从分区中被删除,还可以被重复的使用,这就是kafka非常灵活的地方。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/607254.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

LED电平显示驱动电路图

LB1409九位LED电平显示驱动电路 如图所示为LBl409九位LED电平显示驱动电路。图&#xff08;a&#xff09;是用LB1409做电平显示驱动电路&#xff0c;图&#xff08;b&#xff09;是应用基准电压电平显示驱动电路。LB1409是日本东京互洋电机株式会社生产的产品&#xff0c;与其…

开启Android学习之旅-5-Activity全屏

Android 两种方式设置全屏&#xff1a; 1. 第一行代码中的方法 通过 getWindow().getDecorView()方法拿到当前Activity的DecorView,再调用 setSystemUiVisibility() 方法来改变系统UI的显示&#xff0c;这里传入了 View.SYSTEM_UI_FLAG_LAYOUT_FULLSCREEN 和 View.SYSTEM_UI_…

上海雏鸟科技无人机灯光秀跨年表演点亮三国五地夜空

2023年12月31日晚&#xff0c;五场别开生面的无人机灯光秀跨年表演在新加坡圣淘沙、印尼雅加达、中国江苏无锡、浙江衢州、陕西西安等五地同步举行。据悉&#xff0c;这5场表演背后均出自上海的一家无人机企业之手——上海雏鸟科技。 在新加坡圣淘沙西乐索海滩&#xff0c;500架…

设计模式的艺术P1基础—2.2 类与类的UML图示

设计模式的艺术P1基础—2.2 类与类的UML图示 在UML 2.0的13种图形中&#xff0c;类图是使用频率最高的两种UML图之一&#xff08;另一种是用于需求建模的用例图&#xff09;&#xff0c;它用于描述系统中所包含的类以及它们之间的相互关系&#xff0c;帮助人们简化对系统的理解…

Avalonia学习(二十一)-自定义界面演示

今天开始继续Avalonia练习。 本节&#xff1a;自定义界面 在网上看见一个博客&#xff0c;根据需要演示一下。 前台代码 <Window xmlns"https://github.com/avaloniaui"xmlns:x"http://schemas.microsoft.com/winfx/2006/xaml"xmlns:vm"using:…

系列三十五、获取Excel中的总记录数

一、获取Excel中的总记录数 1.1、概述 使用EasyExcel开发进行文件上传时&#xff0c;通常会碰到一个问题&#xff0c;那就是Excel中的记录数太多&#xff0c;使用传统的方案进行文件上传&#xff0c;很容易就超时了&#xff0c;这时可以通过对用户上传的Excel中的数量进行限制…

vue3+echarts应用——深度遍历html的dom结构并用树图进行可视化

文章目录 ⭐前言&#x1f496;vue3系列文章 ⭐html数据解析&#x1f496; html字符串转为html对象&#x1f496; 深度遍历html对象内容 ⭐echarts 树图的渲染&#x1f496; 处理html内容为树状结构&#x1f496; 渲染树状图&#x1f496; inscode代码块 ⭐总结⭐结束 ⭐前言 大…

CentOS 7 安装私有平台OpenNebula

目录 一、配置yum源 二、配置数据库MySQL 2.1 安装MySQL 2.2 修改MySQL密码 2.3 创建项目用户和库 三、安装配置前端包 四、设置oneadmin账号密码 五、验证安装 5.1 命令行验证安装 5.2 数据存放位置 5.3 端口介绍 5.4 命令介绍 六、访问 6.1 设置语言 6.2 创建主…

C语言中的预处理

欢迎关注博主 Mindtechnist 或加入【Linux C/C/Python社区】一起学习和分享Linux、C、C、Python、Matlab&#xff0c;机器人运动控制、多机器人协作&#xff0c;智能优化算法&#xff0c;滤波估计、多传感器信息融合&#xff0c;机器学习&#xff0c;人工智能等相关领域的知识和…

Git将本地项目上传到Gitee仓库

1.右键点击文件&#xff0c;点击Git Bash Here,进入git窗口 2.初始化本地仓库 git init3.将本地仓库与远程仓库建立连接 git remote add origin 远程仓库地址远程仓库地址在gitee仓库复制即可 4.将远程仓库的文件拉到本地仓库中 git pull origin master5.将本地文件全部上传…

《PCI Express体系结构导读》随记 —— 第I篇 第2章 PCI总线的桥与配置(11)

接前一篇文章&#xff1a;《PCI Express体系结构导读》随记 —— 第I篇 第2章 PCI总线的桥与配置&#xff08;10&#xff09; 2.3 PCI桥与PCI设备的配置空间 PCI设备都有独立的配置空间&#xff0c;HOST主桥通过配置读写总线事务访问这段空间。PCI总线规定了三种类型的PCI配置…

油烟机灯泡更换

油烟机自带两个小灯&#xff0c;开始两个都亮&#xff0c;后来只有一个亮&#xff0c;再后来都不亮了 这个感觉是旋转卡尺打开&#xff0c;用剪子卡主转不动&#xff0c;打不开&#xff0c;可能是油烟粘住了。使用螺丝刀直接撬开。 发现果真是旋转卡扣。灯泡已经烧黑。 换上新…

uniapp微信小程序投票系统实战 (SpringBoot2+vue3.2+element plus ) -投票创建页面实现

锋哥原创的uniapp微信小程序投票系统实战&#xff1a; uniapp微信小程序投票系统实战课程 (SpringBoot2vue3.2element plus ) ( 火爆连载更新中... )_哔哩哔哩_bilibiliuniapp微信小程序投票系统实战课程 (SpringBoot2vue3.2element plus ) ( 火爆连载更新中... )共计21条视频…

【Emgu.CV教程】4.2、无缝融合应用之IlluminationChange()函数去除高亮区域

上一篇讲的是ColorChange()函数&#xff0c;今天讲IlluminationChange()函数&#xff0c;它可以去除图片中的高亮区域。试想一下&#xff0c;下面是一张反光背心的夜间照片&#xff0c;反光条颜色特别亮&#xff0c;如果想只把反光的部分变暗一点&#xff0c;其余部分不变&…

嵌入式Linux-Qt环境搭建

本编介绍如何在嵌入式Linux开发板上配置Qt运行环境&#xff0c;并进行Qt程序运行测试。 1 tslib编译 tslib之前在测试触摸屏的时候使用过&#xff0c;这里再来记录一下编译过程。 下载tslib库的源码&#xff1a;https://github.com/libts/tslib/tags 将下载的源码拷贝到ubun…

Java学习笔记(六)——基本数据类型及其对应的包装类

文章目录 包装类基本数据类型及其对应的包装类获取Integer对象的方式(了解)获取Integer对象两种方式的区别(掌握) 包装类的计算&#xff1a;自动装箱和自动拆箱Integer成员方法综合练习练习1练习2练习3练习4练习5 包装类 包装类&#xff1a;基本数据类型对应的引用数据类型。 …

Python多线程同步锁

Python同步锁 多线程是共用一个进程空间的&#xff0c;当多个线程要用到相同的数据&#xff0c;那么久会存在资源竞争和锁的问题。 锁是用来实现共享资源的同步访问。为每一个共享资源创建一个Lock对象&#xff0c;当需要访问共享资源的时候&#xff0c;调用acquire方法来获取…

目标检测再升级!YOLOv8模型训练和部署

YOLOv8 是 Ultralytics 开发的 YOLO&#xff08;You Only Look Once&#xff09;物体检测和图像分割模型的最新版本。YOLOv8是一种尖端的、最先进的SOTA模型&#xff0c;它建立在先前YOLO成功基础上&#xff0c;并引入了新功能和改进&#xff0c;以进一步提升性能和灵活性。它可…

springboot集成cas客户端

Background 单点登录SSO(Single Sign ON)&#xff0c;指在多个应用系统中&#xff0c;只需登录一次&#xff0c;即可在多个应用系统之间共享登录。统一身份认证CAS&#xff08;Central Authentication Service&#xff09;是SSO的开源实现&#xff0c;利用CAS实现SSO可以很大程…

【Python学习】Python学习8-Number

目录 【Python学习】Python学习8-Number 前言在变量赋值时被创建Python支持四种不同的数据类型整型(Int)长整型(long integers&#xff09;浮点型(loating point real values)复数(complex numbers) Python Number 类型转换Python math 模块、cmath 模块Python数学函数Python随…