kafka详解(三)

2.2 Kafka命令行操作

在这里插入图片描述

2.2.1 主题命令行操作

1)查看操作主题命令参数

[aa kafka]$ bin/kafka-topics.sh

在这里插入图片描述
2)查看当前服务器中的所有topic (配置了环境变量不需要写bin/)

[aa kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --list
3)创建first topic
[aa kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --create --partitions 1 --replication-factor 3 --topic first
[aa ~]$ kafka-topics.sh --bootstrap-server hadoop102:9092 --topic first --create --partitions 3 --replication-factor 3
选项说明:
--topic 定义topic名
--replication-factor  定义副本数
--partitions  定义分区数

4)查看first主题的详情

[aa kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe --topic first
[aa ~]$ kafka-topics.sh --bootstrap-server hadoop102:9092 --list
first
[aa ~]$ kafka-topics.sh --bootstrap-server hadoop102:9092 --topic first --describe
Topic: first	TopicId: 3pIfoppvRmq84FjACWzAgw	PartitionCount: 3	ReplicationFactor: 3	Configs: segment.bytes=1073741824Topic: first	Partition: 0	Leader: 104	Replicas: 104,103,102	Isr: 104,103,102Topic: first	Partition: 1	Leader: 103	Replicas: 103,102,104	Isr: 103,102,104Topic: first	Partition: 2	Leader: 102	Replicas: 102,104,103	Isr: 102,104,103
[aa ~]$

5)修改分区数( 注意:分区数只能增加,不能减少,如果减少会报错!

[a kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --alter --topic first --partitions 3
[aa ~]$ kafka-topics.sh --bootstrap-server hadoop102:9092 --topic first --alter --partitions 4
[aa ~]$ kafka-topics.sh --bootstrap-server hadoop102:9092 --topic first --describe
Topic: first	TopicId: 3pIfoppvRmq84FjACWzAgw	PartitionCount: 4	ReplicationFactor: 3	Configs: segment.bytes=1073741824Topic: first	Partition: 0	Leader: 104	Replicas: 104,103,102	Isr: 104,103,102Topic: first	Partition: 1	Leader: 103	Replicas: 103,102,104	Isr: 103,102,104Topic: first	Partition: 2	Leader: 102	Replicas: 102,104,103	Isr: 102,104,103Topic: first	Partition: 3	Leader: 104	Replicas: 104,103,102	Isr: 104,103,102 
[aa ~]$ kafka-topics.sh --bootstrap-server hadoop102:9092 --topic first --alter --partitions 2
Error while executing topic command : Topic currently has 4 partitions, which is higher than the requested 2.
[2023-09-13 19:22:16,891] ERROR org.apache.kafka.common.errors.InvalidPartitionsException: Topic currently has 4 partitions, which is higher than the requested 2.(kafka.admin.TopicCommand$)
[aa ~]$

6)再次查看first主题的详情

[aa kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe --topic first

7)删除topic

[aa ~]$ kafka-topics.sh --bootstrap-server hadoop102:9092 --topic first --delete
[aa ~]$ kafka-topics.sh --bootstrap-server hadoop102:9092 --list[aa ~]$

2.2.2 生产者命令行操作

1)查看操作生产者命令参数
[aa kafka]$ bin/kafka-console-producer.sh

在这里插入图片描述
2)发送消息

[aa kafka]$ kafka-console-producer.sh --bootstrap-server hadoop102:9092 --topic first
>111
>222
>333
>

2.2.3 消费者命令行操作

[aa kafka]$ bin/kafka-console-consumer.sh

在这里插入图片描述

2)消费消息
[aa kafka]$ kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first --group test --from-beginning
111
222
333
还可以动态的生产和消费,比如102机器上输入
>444
103机器就会自动在结尾弹出
111
222
333
444

Kafka生产者

生产者消息发送流程

3.1.1 发送原理

Kafka的producer发送消息采用的是异步发送的方式
在消息发送的过程中,涉及到了两个线程——main线程和Sender线程(两个线程是异步!),以及一个线程共享变量:RecordAccumulator。

  1. 在main线程中创建了一个双端队列RecordAccumulator。main线程将消息发送给RecordAccumulator。
  2. Sender线程不断从RecordAccumulator中拉取消息发送到Kafka Broker。
    在这里插入图片描述
    main线程将外部数据包装成kafka要求的格式ProducerRecord,类似于Flume中的Event.网络中进行数据传输都会序列化(kryo框架)。
    分区策略:涉及到生产者和消费者。生产者分区针对的是数据,消费者分区针对是分区怎么消费数据
    。RecordAccumulator(一种堆内存缓冲区)达到两种标准之后就唤醒Sender进行发送!bachsize(同一个队列中,两个时间非常紧密的数据可以形成一个bachsize)一般就是数据洪峰的时候;linger.ms就是在数据量非常小的时候;默认值0代表来一条发一条;
    sender发送也是异步发送,sender将RecordAccumulator中的数据包装成Request(一个批次包装成一个Request),sender发送Request1之后不等待响应就发送Request2,然后不等待响应就发送Request2,…Request5,Request6必须排队了。
    sender发送过去的数据在Leader中应该是先存在线程对应的内存中,还没等到磁盘中存储数据落盘的一个时间点决定是不是回复ack为0,此时就是不安全,时延最低!。为1的时候就是数据落盘之后再发送ack,此时数据安全性有所提高,稍慢!注意此时的fllower还没有数据!完全保证数据安全,Leader和follwer都罗盘,回复-1
    发送成功:清理网络客户端请求Request
    线程共享变量中RecordAccumulator清理数据,因为只有32M。
    发送失败:重试次数----int的最大值
    Selector是负责决定将数据发送到集群的哪个分区!
    注意:
    中间涉及到数据的发送和拉取都是异步的!main线程放数据和sender拉取数据并发送两个过程异步!
    一个队列只能发送到最右边的集群中的一个分区,假如有两个toptic,5个分区,就需要创建5个双端队列,队列内部才能形成批次(bachsize),所以只能发到一个分区!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/104911.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

SpringBoot-黑马程序员-学习笔记(五)

74.自定义bean属性绑定以及第三方bean属性绑定 自定义bean属性绑定 1.自定义一个bean Data Component public class ServerConfig {private String ipAddress;private int port;private long timeout; } 2.在yml配置文件中中定义一组值 3.在bean中进行属性绑定 加上这个注…

mysql中的几种排名函数

mysql中的排名函数 mysql里面的排名函数&#xff0c;涉及有以下几个&#xff1a; rank()、dense_rank()、row_number() 1、rank() 函数 RANK() OVER (PARTITION BY <expression>[{,<expression>...}]ORDER BY <expression> [ASC|DESC], [{,<expression…

【Nginx32】Nginx学习:随机索引、真实IP处理与来源处理模块

Nginx学习&#xff1a;随机索引、真实IP处理与来源处理模块 完成了代理这个大模块的学习&#xff0c;我们继续其它 Nginx 中 HTTP 相关的模块学习。今天的内容都比较简单&#xff0c;不过最后的来源处理非常有用&#xff0c;可以帮我们解决外链问题。另外两个其实大家了解一下就…

C#开发的OpenRA游戏之金钱系统(1)

C#开发的OpenRA游戏之金钱系统(1) 设计一个游戏,肯定要有一个唯一的资源,用这个资源来管理整个游戏的进度,以及相互争夺的焦点。在OpenRA里,就是使用矿产资源。所以在地图上分布几个矿场,玩家就需要相互争夺矿场,谁开采多谁就更有钱,谁有钱了就可以升级更好的科技,以…

Linux Kernel 4.13 RC6发布:正式版9月3日发布

美国当地时间上周末&#xff0c;大神Linus Torvalds发布了Linux Kernel 4.13内核的又一候选版本。上周发布的RC5版本更新幅度也要比上上周的RC4要小&#xff0c;Linus Torvalds表示本周发布的RC6版本属于常规更新&#xff0c;在过去一周的开发过程中并没有出现任何意外。RC6版本…

Spring MVC 十一:@EnableWebMvc

我们从两个角度研究EnableWebMvc&#xff1a; EnableWebMvc的使用EnableWebMvc的底层原理 EnableWebMvc的使用 EnableWebMvc需要和java配置类结合起来才能生效&#xff0c;其实Spring有好多Enablexxxx的注解&#xff0c;其生效方式都一样&#xff0c;通过和Configuration结合…

Linux 64位 C++协程池原理分析及代码实现

导语 本文介绍了协程的作用、结构、原理&#xff0c;并使用C和汇编实现了64位系统下的协程池。文章内容避免了协程晦涩难懂的部分&#xff0c;用大量图文来分析原理&#xff0c;适合新手阅读学习。 GitHub源码 1. Web服务器问题 现代分布式Web后台服务逻辑通常由一系列RPC请…

【java学习—七】单继承和多层继承(30)

文章目录 1. 相关概念2. 从代码中理解 1. 相关概念 Java 只支持单继承&#xff0c;不允许多重继承&#xff1a; &#xff08;1&#xff09;一个子类只能有一个父类 &#xff08;2&#xff09;一个父类可以派生出多个子类      举例区分&#xff1a; class SubDemo extend…

Hermes - 指尖上的智慧:自定义问答系统的崭新世界

在希腊神话中&#xff0c;有一位智慧与消息的传递者神祇&#xff0c;他就是赫尔墨斯&#xff08;Hermes&#xff09;。赫尔墨斯是奥林匹斯众神中的一员&#xff0c;传说他是乌尔阿努斯&#xff08;Uranus&#xff09;和莫伊拉&#xff08;Maia&#xff09;的儿子&#xff0c;同…

Git纯操作版 项目添加和提交、SSH keys添加、远程仓库控制、冲突解决、IDEA连接使用

Git 文章目录 Git项目简单克隆通用操作添加和提交回滚分支变基分支优选 远程项目推送认证抓取、拉取和冲突解决 IEDA类软件连接 最近学原理学的快头秃了&#xff0c;特此想出点不讲原理的纯操作版&#xff0c;不过还是放个图吧 项目简单克隆 git在本人日常中最重要的功能还是…

Linux中怎么启动Zookeeper

首先进入Zookeeper安装目录下的bin目录 比如&#xff1a; cd /root/zookeeper-3.4.9/bin 然后在此目录下执行命令。 1. 启动Zookeeper Server端 ./zkServer.sh start 2.启动Zookeeper Client端 ./zkCli.sh 启动Zookeeper Client端后如下&#xff1a;

Electron基础学习笔记

Electron基础学习笔记 官网&#xff1a; https://www.electronjs.org/ 文档&#xff1a;https://www.electronjs.org/zh/docs/latest/ Electon概述 Electron 是由 Github开发的开源框架它允许开发者使用Web技术构建跨平台的桌面应用 Electron Chromium Node.js Native AP…

面部检测与特征分析:视频实时美颜SDK的核心组件

随着视频直播、社交媒体和在线会议的流行&#xff0c;人们对于美颜工具的需求不断增加。无论是自拍照片还是视频聊天&#xff0c;美颜技术已经成为现代应用程序的不可或缺的一部分。本文将深入探讨视频实时美颜SDK的一个核心组件——面部检测与特征分析。 一、面部检测技术 …

C++内存管理(new和delete)

目录 1.C的内存分布 2.C内存管理方式 1.C的内存分布 在内存里面是分好几个区的 1. 栈又叫堆栈--非静态局部变量/函数参数/返回值等等&#xff0c;栈是向下增长的。 2. 内存映射段是高效的I/O映射方式&#xff0c;用于装载一个共享的动态内存库。用户可使用系统接口 创建共享…

AI换脸之Faceswap技术原理与实践

目录 1.方法介绍 2.相关资料 3.实践记录 ​4.实验结果 1.方法介绍 Faceswap利用深度学习算法和人脸识别技术&#xff0c;可以将一个人的面部表情、眼睛、嘴巴等特征从一张照片或视频中提取出来&#xff0c;并将其与另一个人的面部特征进行匹配。主要应用在图像/视频换脸&am…

数字图像处理实验记录一(图像基本灰度变换)

文章目录 基础知识图像是什么样的&#xff1f;1&#xff0c;空间分辨率&#xff0c;灰度分辨率2&#xff0c;灰度图和彩色图的区别3&#xff0c;什么是灰度直方图&#xff1f; 实验要求1&#xff0c;按照灰度变换曲线对图像进行灰度变换2&#xff0c;读入一幅图像&#xff0c;分…

树莓派玩转openwrt软路由:5.OpenWrt防火墙配置及SSH连接

1、SSH配置 打开System -> Administration&#xff0c;打开SSH Access将Interface配置成unspecified。 如果选中其他的接口表示仅在给定接口上侦听&#xff0c;如果未指定&#xff0c;则在所有接口上侦听。在未指定下&#xff0c;所有的接口均可通过SSH访问认证。 2、防火…

给ChuanhuChatGPT 配上讯飞星火spark大模型V2.0(一)

ChuanhuChatGPT 拥有多端、比较好看的Gradio界面&#xff0c;开发比较完整&#xff1b; 刚好讯飞星火非常大气&#xff0c;免费可以领取大概20w&#xff08;&#xff01;&#xff01;&#xff01;&#xff09;的token&#xff0c;这波必须不亏&#xff0c;整上。 重要参考&am…

MySQL——源码安装教程

MySQL 一、MySQL的安装1、RPM2、二进制3、源码 二、源码安装方式三、安装过程1、上传源码包2、解压当前文件并安装更新依赖3、对MySQL进行编译安装 四、其他步骤 一、MySQL的安装 首先这里我来介绍下MySQL的几种安装方式&#xff1a; 一共三种&#xff0c;RPM安装包、二进制包…

将Excel表中数据导入MySQL数据库

1、准备好Excel表&#xff1a; 2、数据库建表case2 字段信息与表格对应建表&#xff1a; 3、实现代码 import pymysql import pandas as pd import openpyxl 从excel表里读取数据后&#xff0c;再存入到mysql数据库。 需要安装openpyxl pip install openpyxl# 读入数据&#x…