「Kafka」Kafka基础知识入门介绍(三)

「Kafka」Kafka基础知识入门介绍(三)

  • 一、消息主题
    • 1. 创建主题
  • 二、生产数据
    • 1. 命令行模式
    • 2. Java代码模式
  • 三、消费数据
    • 1. 命令行模式
    • 2. Java代码模式

「Kafka」Kafka理论知识解读(一)
「Kafka」Kafka安装和启动(二)

一、消息主题

消息主题(Message Topic)是Kafka中用于组织和存储消息的基本单元。它类似于一个命名的消息队列,生产者可以向主题发布消息,而消费者可以从主题订阅并接收消息。

每个主题可以分为多个分区(Partitions)每个分区可以在不同的服务器上进行复制以提供容错性。消息被附加到主题的分区中,并根据消息键(Key)进行分配和存储。这种设计允许Kafka在分布式环境中实现高吞吐量和水平扩展。

主题的名称在集群中必须是唯一的,并且可以根据需求创建任意数量的主题。通常,主题的名称会反映其中包含的消息类型或者业务逻辑。例如,一个电子商务应用可能会创建名为 “orders”、“payments” 和 “shipments” 的主题来存储订单、支付和发货相关的消息。

将不同的消息进行分类,分成不同的主题(Topic),然后消息生产者在生成消息时,就会向指定的主题(Topic)中发送。而消息消费者也可以订阅自己感兴趣的主题(Topic)并从中获取消息。
有很多种方式都可以操作Kafka消息中的主题(Topic):命令行、第三方工具、Java API、自动创建。下面将介绍命令行和Java API模式

1. 创建主题

  • 创建主题

在这里插入图片描述

# Kafka是通过kafka-topics.bat指令文件进行消息主题操作的。其中包含了对主题的查询,创建,删除等功能。
# 调用指令创建主题时,需要传递多个参数,而且参数的前缀为两个横线。因为参数比较多,为了演示方便,这里我们只说明必须传递的参数,其他参数后面课程中会进行讲解
# --bootstrap-server : 把当前的DOS窗口当成Kafka的客户端,那么进行操作前,就需要连接服务器,这里的参数就表示服务器的连接方式,因为我们在本机启动Kafka服务进程,且Kafka默认端口为9092,所以此处,后面接的参数值为localhost:9092,用空格隔开
# --create : 表示对主题的创建操作,是个操作参数,后面无需增加参数值
# --topic : 主题的名称,后面接的参数值一般就是见名知意的字符串名称,类似于java中的字符串类型标识符名称,当然也可以使用数字,只不过最后还是当成数字字符串使用。
# 指令
kafka-topics.bat --bootstrap-server localhost:9092 --create --topic test
  • 查询主题
# 查看所有主题
# --list : 表示对所有主题的查询操作,是个操作参数,后面无需增加参数值
kafka-topics.bat --bootstrap-server localhost:9092 --list
# 查看某个主题的详细信息
# --describe : 查看主题的详细信息
# --topic : 查询的主题名称
kafka-topics.bat --bootstrap-server localhost:9092 --describe --topic test123

在这里插入图片描述

  • 修改主题
# --alter : 表示对所有主题的查询操作,是个操作参数,后面无需增加参数值
# --topic : 修改的主题名称
kafka-topics.bat --bootstrap-server localhost:9092 --topic test --alter --partitions 2

在这里插入图片描述

  • 删除主题
# --delete: 表示对主题的删除操作,是个操作参数,后面无需增加参数值。默认情况下,删除操作是逻辑删除,也就是说数据存储的文件依然存在,但是通过指令查询不出来。如果想要直接删除,需要在server.properties文件中设置参数delete.topic.enable=true
# --topic : 删除的主题名称
kafka-topics.bat --bootstrap-server localhost:9092 --topic test --delete

注意:windows系统中由于权限或进程锁定的问题,删除topic会导致kafka服务节点异常关闭。

二、生产数据

1. 命令行模式

Kafka是通过kafka-console-producer.bat文件进行消息生产者操作的。
调用指令时,需要传递多个参数,而且参数的前缀为两个横线,因为参数比较多。为了演示方便,这里我们只说明必须传递的参数,其他参数后面课程中会进行讲解
--bootstrap-server : 把当前的DOS窗口当成Kafka的客户端,那么进行操作前,就需要连接服务器,这里的参数就表示服务器的连接方式,因为我们在本机启动Kafka服务进程,且Kafka默认端口为9092,所以此处,后面接的参数值为localhost:9092,用空格隔开。早期版本的Kafka也可以通过 --broker-list参数进行连接,当前版本已经不推荐使用了。
--topic : 主题的名称,后面接的参数值就是之前已经创建好的主题名称。

kafka-console-producer.bat --bootstrap-server localhost:9092 --topic test123

注意:这里的数据需要回车后,才能真正将数据发送到Kafka服务器。
在这里插入图片描述

2. Java代码模式

public class Producer {public static void main(String[] args) {// TODO 配置属性集合Map<String, Object> configMap = new HashMap<>();// TODO 配置属性:Kafka服务器集群地址configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");// TODO 配置属性:Kafka生产的数据为KV对,所以在生产数据进行传输前需要分别对K,V进行对应的序列化操作configMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");configMap.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");// TODO 创建Kafka生产者对象,建立Kafka连接//      构造对象时,需要传递配置参数KafkaProducer<String, String> producer = new KafkaProducer<>(configMap);// TODO 准备数据,定义泛型//      构造对象时需要传递 【Topic主题名称】,【Key】,【Value】三个参数ProducerRecord<String, String> record = new ProducerRecord<String, String>("test123", "key11111111", "value1111111");// TODO 生产(发送)数据producer.send(record);// TODO 关闭生产者连接producer.close();}
}

三、消费数据

1. 命令行模式

Kafka是通过kafka-console-consumer.bat文件进行消息消费者操作的。
调用指令时,需要传递多个参数,而且参数的前缀为两个横线,因为参数比较多。为了演示方便,这里我们只说明必须传递的参数,其他参数后面课程中会进行讲解
--bootstrap-server : 把当前的DOS窗口当成Kafka的客户端,那么进行操作前,就需要连接服务器,这里的参数就表示服务器的连接方式,因为我们在本机启动Kafka服务进程,且Kafka默认端口为9092,所以此处,后面接的参数值为localhost:9092,用空格隔开。早期版本的Kafka也可以通过 --broker-list参数进行连接,当前版本已经不推荐使用了。
--topic : 主题的名称,后面接的参数值就是之前已经创建好的主题名称。其实这个参数并不是必须传递的参数,因为如果不传递这个参数的话,那么消费者会消费所有主题的消息。如果传递这个参数,那么消费者只能消费到指定主题的消息数据。
--from-beginning :从第一条数据开始消费,无参数值,是一个标记参数。默认情况下,消费者客户端连接上服务器后,是不会消费到连接之前所生产的数据的。也就意味着如果生产者客户端在消费者客户端连接前已经生产了数据,那么这部分数据消费者是无法正常消费到的。所以在实际环境中,应该是先启动消费者客户端,再启动生产者客户端,保证消费数据的完整性。增加参数后,Kafka就会从第一条数据开始消费,保证消息数据的完整性。

kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning

在这里插入图片描述
注意:控制台消费端或许会有乱码,自行解决即可,因为一般不会在控制台获取消息

2. Java代码模式

public class Consumer{public static void main(String[] args) {// TODO 配置属性集合Map<String, Object> configMap = new HashMap<String, Object>();// TODO 配置属性:Kafka集群地址configMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");// TODO 配置属性: Kafka传输的数据为KV对,所以需要对获取的数据分别进行反序列化configMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");configMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");// TODO 配置属性: 读取数据的位置 ,取值为earliest(最早),latest(最晚)configMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");// TODO 配置属性: 消费者组configMap.put("group.id", "atguigu");// TODO 配置属性: 自动提交偏移量configMap.put("enable.auto.commit", "true");KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(configMap);// TODO 消费者订阅指定主题的数据consumer.subscribe(Collections.singletonList("test123"));while (true) {// TODO 每隔100毫秒,抓取一次数据ConsumerRecords<String, String> records =consumer.poll(Duration.ofMillis(100));// TODO 打印抓取的数据for (ConsumerRecord<String, String> record : records) {System.out.println("K = " + record.key() + ", V = " + record.value());}}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/3107.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【kotlin】利用by关键字更加方便地实现装饰器模式

关于kotlin中的by关键字的用法&#xff0c;kotlin官方文档属性委托这一节讲得很清楚。 简单来说就是这样的&#xff0c;假设存在一个接口Component如下&#xff1a; interface Component {fun method1(): IntArrayfun method2(a: Int)fun method3(a: Int, str: String) }那么对…

React-性能优化的手段

​&#x1f308;个人主页&#xff1a;前端青山 &#x1f525;系列专栏&#xff1a;React篇 &#x1f516;人终将被年少不可得之物困其一生 依旧青山,本期给大家带来React篇专栏内容:React-性能优化的手段 目录 React 性能优化的手段有哪些&#xff1f; 一、是什么 二、如何做…

【汇编语言】流程转移和子程序

【汇编语言】流程转移和子程序 文章目录 【汇编语言】流程转移和子程序前言一、“转移”综述二、操作符offset三、jmp指令jmp指令——无条件转移jmp指令&#xff1a;依据位移进行转移两种段内转移远转移&#xff1a;jmp far ptr 标号转移地址在寄存器中的jmp指令转移地址在内存…

Vue3种常用插槽的使用

插槽总结 &#xff1a; 插槽的作用&#xff1a;让父组件可以向子组件指定位置插入html结构&#xff0c;也是一种组件间通信的方式&#xff0c;适用于 父组件 > 子组件 。分类&#xff1a;默认插槽、具名插槽、作用域插槽 1、默认插槽 父组件中&#xff1a; <Category>…

鸿蒙HarmonyOS应用 - ArkUI组件

ArkUI组件 基础组件 Image 声明Image组件并设置图片源 网络权限&#xff1a;ohos.permission.INTERNET Image(scr: string | PixelMap | Resource)// 1. string&#xff1a;用于加载网络图片&#xff0c;需要申请网络权限 Image("https://xxx.png")// 2. PixelMap…

一线实战,一次底层超融合故障导致的Oracle异常恢复

背景概述 某客户数据由于底层超融合故障导致数据库产生有大量的坏块&#xff0c;最终导致数据库宕机&#xff0c;通过数据抢救&#xff0c;恢复了全部的数据。下面是详细的故障分析诊断过程&#xff0c;以及详细的解决方案描述&#xff1a; 故障现象 数据库宕机之后&#xff0c…

粤嵌—2024/4/24—删除有序数组中的重复项 ||

代码实现&#xff1a; 方法一&#xff1a;双指针 int removeDuplicates(int *nums, int numsSize) {int l 0, r 0;while (r < numsSize) {if (r > 1 && nums[r] nums[l - 1] && nums[r] nums[l - 2]) {r;} else {nums[l] nums[r];l;r;}}return l; }…

ONES 功能上新|ONES Wiki 新功能一览

支持在 ONES Wiki 页面中使用分栏进行横向排版&#xff0c;丰富排版方式&#xff0c;帮助用户以更丰富的版式展示内容。 应用场景&#xff1a; 页面的布局对内容的阅读有很大的影响。当页面中有图文混排的需求时&#xff0c;可以通过分栏来组织页面结构&#xff0c;以更清晰、更…

Docker容器概念介绍与基本管理

前言 在软件开发和部署环境中&#xff0c;使用 Docker 等容器技术可以帮助团队实现快速、一致、可靠的应用程序部署&#xff0c;提高开发效率和应用程序的可移植性。 目录 一、虚拟化产品介绍 1. 云服务模型 1.1 IaaS 1.2 PaaS 1.3 SaaS 1.4 DaaS 2. 产品介绍 2.1 虚…

14 JavaScript学习:条件语句

JavaScript条件语句 JavaScript中的条件语句主要用于根据条件执行不同的代码块。以下是对JavaScript条件语句概念的详细解释和分类&#xff1a; if语句&#xff1a; 单个if语句&#xff1a;最简单的条件语句&#xff0c;根据条件判断是否执行特定的代码块。if…else语句&#x…

【机器学习与实现】机器学习概述

目录 一、机器学习的基本概念和方法&#xff08;一&#xff09;基本概念&#xff08;二&#xff09;机器学习的一般过程举例&#xff08;三&#xff09;样本和参数估计 二、机器学习的步骤总结&#xff08;一&#xff09;机器学习的主要步骤&#xff08;二&#xff09;样本及样…

C++并发编程

基本介绍 线程 C98标准没有直接提供原生的多线程支持 在C98中&#xff0c;并没有像后来的C11标准中那样的<thread>库或其他直接的多线程工具 然而&#xff0c;这并不意味着在C98中无法实现多线程。开发者通常会使用平台特定的API&#xff08;如Windows的线程API或POSI…

vue3中的ref、isRef、shallowRef、triggerRef和customRef

1.ref 接受一个参数值并返回一个响应式且可改变的 ref 对象。 ref 对象拥有一个指向内部值的单一属性 .value property &#xff0c;指向内部值。 例&#xff1a;此时&#xff0c;页面上的 str1 也跟着变化 <template><div><button click"handleClick&quo…

严厉打击侵犯知识产权行为!法院公开审理假冒半岛超声炮知产刑事案件

随着医美行业的蓬勃发展&#xff0c;一些不法分子利用消费者对变美的渴望&#xff0c;制售假冒半岛超声炮&#xff0c;严重侵犯了消费者的合法权益&#xff0c;也破坏了医美市场的健康发展。为了维护市场秩序&#xff0c;保障消费者权益&#xff0c;各地相关监管部门持续加大监…

QT从入门到实战x篇_22_番外1_Qt事件系统

文章目录 1. Qt事件系统简介1.1 事件的来源和传递1.2 事件循环和事件分发1.2.1 QT消息/事件循环机制1.2.1.1 机制解释1.2.1.2 两个问题 1.2.2 事件分发 2. 事件过滤基础2.1 什么是事件过滤器&#xff08;Event Filter&#xff09;&#xff1f;2.2 如何安装事件过滤器 3. 事件过…

<计算机网络自顶向下> 路由器组成

路由器结构概况 路由&#xff1a;运行路由选择算法/协议&#xff08;RIP, OSPF, BGP&#xff09;生成路由表转发&#xff1a;从输入到输出链路交换数据包-根据路由表进行分组的转发中间的fabric是用来接收输入的分组交给输出端口的&#xff0c;完成局部的转发&#xff08;根据…

在广东珠海,持有软考等证书最高可获6位数补贴,快来申报!

近日&#xff0c;横琴粤澳深度合作区执行委员会印发《横琴粤澳深度合作区支持人才发展若干措施》&#xff08;以下简称《若干措施》&#xff09;及三项配套实施办法&#xff0c;鼓励企业“招贤纳士”&#xff0c;加强琴澳人才协同培养。目前&#xff0c;2024年第一批博士后专项…

星汉未来AI应用市场:一站式AI解决方案平台

星汉未来AI应用市场&#xff1a;一站式AI解决方案平台 在人工智能技术日益渗透到各行各业的今天&#xff0c;星汉未来AI应用市场为我们提供了一个集创新与实用于一体的平台。下面&#xff0c;我将为您详细介绍这个平台的各个方面。 平台特色 星汉未来AI应用市场是一个面向未…

Keil出现警告:warning: #223-D: function “XXX“ declared implicitly

这个警告表明编译器在函数使用之前没有找到函数的显式声明或定义。这通常发生在函数被使用之前没有在当前文件中进行声明或定义&#xff0c;或者头文件未正确包含。 解决方式&#xff1a; 在当前文件中添加函数声明&#xff1a;在使用函数之前&#xff0c;在当前文件中添加函…

maixcam如何无脑运行运行别人的模型(以安全帽模型为例)

maixcam如何无脑运行运行别人的模型&#xff08;以安全帽模型为例&#xff09; 本文章主要讲如何部署上传的模型文件&#xff0c;以及如果你要把你模型按照该流程应该怎么修改&#xff0c;你可以通过该文章得到你想要的应该&#xff0c;该应用也包含的退出按钮&#xff0c;是屏…