【消息队列kafka_中间件】一、快速入门分布式消息队列

        在当今大数据和分布式系统盛行的时代,消息队列作为一种关键的中间件技术,发挥着举足轻重的作用。其中,Apache Kafka 以其卓越的性能、高可扩展性和强大的功能,成为众多企业构建分布式应用的首选消息队列解决方案。本篇文章将带你深入了解 Kafka 的基础概念、架构原理、核心组件,并通过实际代码示例,让你快速上手 Kafka,揭开分布式消息队列的神秘面纱。

一、Kafka 简介与背景​

Kafka 最初是由 LinkedIn 公司开发,用于处理公司内部大规模的实时数据流。随着其开源并在社区的不断发展壮大,Kafka 已成为一款广泛应用于大数据处理、实时流计算、日志收集与处理、系统解耦等众多领域的分布式消息队列系统。​

与传统消息队列相比,Kafka 具有显著的优势。它能够支持超高的吞吐量,每秒可以处理数十万甚至数百万条消息,这使得它在应对海量数据传输时表现出色。同时,Kafka 具备低延迟的特性,消息的生产和消费延迟可以控制在毫秒级,满足了许多对实时性要求极高的应用场景。此外,Kafka 的分布式架构设计使其具有强大的可扩展性,能够轻松应对不断增长的数据处理需求。

二、关键概念剖析​

2.1 生产者(Producer)​

生产者是 Kafka 系统中负责发送消息的组件。在实际应用中,生产者通常是由业务系统中的某个模块或服务担当,它将业务数据封装成 Kafka 能够识别的消息格式,并发送到指定的主题(Topic)中。​

以 Java 语言为例,以下是一个简单的 Kafka 生产者代码示例:

import org.apache.kafka.clients.producer.*;
import java.util.Properties;public class KafkaProducerExample {public static void main(String[] args) {// 设置生产者属性Properties props = new Properties();// Kafka集群地址,格式为host1:port1,host2:port2,...props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");// 键的序列化方式,这里使用字符串序列化props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");// 值的序列化方式,同样使用字符串序列化props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");// 创建生产者实例KafkaProducer<String, String> producer = new KafkaProducer<>(props);// 要发送的消息内容String messageKey = "key1";String messageValue = "Hello, Kafka!";// 创建消息对象,指定主题和键值对ProducerRecord<String, String> record = new ProducerRecord<>("test - topic", messageKey, messageValue);try {// 发送消息并获取响应RecordMetadata metadata = producer.send(record).get();System.out.println("Message sent successfully to partition " + metadata.partition() +" with offset " + metadata.offset());} catch (Exception e) {e.printStackTrace();} finally {// 关闭生产者,释放资源producer.close();}}
}

2.2 消费者(Consumer)​

消费者负责从 Kafka 主题中读取消息并进行处理。Kafka 的消费者是以消费者组(Consumer Group)的形式存在的,同一消费者组内的消费者共同消费主题中的消息,通过负载均衡的方式提高消息处理的效率。不同消费者组之间相互独立,每个消费者组都可以完整地消费主题中的所有消息。​

以下是一个 Java 语言的 Kafka 消费者代码示例:

import org.apache.kafka.clients.consumer.*;
import java.util.Arrays;
import java.util.Properties;public class KafkaConsumerExample {public static void main(String[] args) {// 设置消费者属性Properties props = new Properties();// Kafka集群地址props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");// 消费者组ID,同一组内的消费者共享消费偏移量props.put(ConsumerConfig.GROUP_ID_CONFIG, "test - group");// 键的反序列化方式,这里使用字符串反序列化props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");// 值的反序列化方式,同样使用字符串反序列化props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");// 自动提交消费偏移量,默认true,建议设置为false,手动管理偏移量以确保数据不丢失props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");// 创建消费者实例KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);// 订阅主题consumer.subscribe(Arrays.asList("test - topic"));try {while (true) {// 拉取消息,设置拉取超时时间为100毫秒ConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records) {System.out.println("Received message: " +"topic = " + record.topic() +", partition = " + record.partition() +", offset = " + record.offset() +", key = " + record.key() +", value = " + record.value());}// 手动提交消费偏移量consumer.commitSync();}} catch (Exception e) {e.printStackTrace();} finally {// 关闭消费者,释放资源consumer.close();}}
}

2.3 主题(Topic)​

主题是 Kafka 中消息分类存储的逻辑概念,类似于数据库中的表。每个主题可以包含多个分区(Partition),生产者发送的消息会被存储到指定的主题中,消费者则通过订阅主题来获取消息。在实际应用中,通常会根据不同的业务类型或数据类型创建不同的主题。例如,在一个电商系统中,可以创建 “order - topic” 用于存储订单相关的消息,“user - behavior - topic” 用于存储用户行为数据相关的消息等。​

2.4 分区(Partition)​

分区是 Kafka 实现高吞吐量和可扩展性的关键机制。每个主题可以被划分为多个分区,这些分区分布在 Kafka 集群的不同 Broker 节点上。当生产者发送消息时,Kafka 会根据一定的策略将消息分配到主题的某个分区中。常见的分区策略有按消息键的 Hash 值分配(如果消息带有键)和轮询分配(如果消息没有键)。​

分区的好处主要有以下几点:首先,通过将数据分散存储在多个分区上,可以提高数据存储和读取的并行度,从而提升整体的吞吐量。例如,在一个拥有多个 Broker 节点的集群中,每个 Broker 可以同时处理不同分区的读写请求,大大加快了数据处理速度。其次,分区还可以实现数据的冗余备份。Kafka 会为每个分区创建多个副本,其中一个副本作为领导者(Leader)副本,负责处理读写请求,其他副本作为跟随者(Follower)副本,从领导者副本同步数据。当领导者副本所在的 Broker 节点发生故障时,Kafka 会自动从跟随者副本中选举出一个新的领导者副本,确保数据的可用性和一致性。

三、Kafka 集群架构​

Kafka 集群由多个 Broker 节点组成,每个 Broker 节点实际上就是一个 Kafka 服务器进程。这些 Broker 节点共同协作,实现了 Kafka 的分布式存储和消息处理功能。​

在 Kafka 集群中,Zookeeper 扮演着至关重要的角色。Zookeeper 是一个分布式协调服务,它负责管理 Kafka 集群的元数据信息,包括 Broker 节点的注册与发现、主题与分区的元数据管理、分区领导者副本的选举等。具体来说,当一个新的 Broker 节点加入集群时,它会向 Zookeeper 注册自己的信息,Zookeeper 会将这些信息同步给其他 Broker 节点,使得整个集群能够感知到新节点的加入。在主题与分区管理方面,Zookeeper 存储了每个主题的分区信息,包括分区的数量、每个分区的领导者副本和跟随者副本所在的 Broker 节点等。当某个分区的领导者副本出现故障时,Zookeeper 会触发领导者选举过程,从跟随者副本中选举出一个新的领导者副本,确保分区的正常工作。

如上图所示,Kafka 集群中的多个 Broker 节点通过 Zookeeper 进行协调和管理。生产者和消费者通过与 Broker 节点进行通信来发送和接收消息,而 Zookeeper 则在幕后负责维护集群的一致性和稳定性。

四、安装与环境搭建​

4.1 下载 Kafka​

首先,从 Apache Kafka 官方网站(Apache Kafka)下载 Kafka 的安装包。目前 Kafka 的最新版本可以在官网上找到,选择适合自己操作系统的安装包进行下载。例如,对于 Linux 系统,可以下载.tgz格式的压缩包。​

4.2 解压安装包​

下载完成后,使用解压命令将安装包解压到指定目录。假设将安装包下载到了/downloads目录下,解压命令如下:

tar -xzf kafka_2.13 - 3.3.1.tgz -C /usr/local/

上述命令将 Kafka 安装包解压到了/usr/local/目录下,解压后的目录名称为kafka_2.13 - 3.3.1,其中2.13是 Scala 的版本号,3.3.1是 Kafka 的版本号。​

4.3 配置环境变量​

为了方便在命令行中使用 Kafka 的命令工具,需要将 Kafka 的bin目录添加到系统的环境变量中。在 Linux 系统中,可以编辑~/.bashrc文件,在文件末尾添加以下行:

export PATH=$PATH:/usr/local/kafka_2.13 - 3.3.1/bin

然后执行以下命令使环境变量生效:

source ~/.bashrc

4.4 配置 Kafka​

Kafka 的主要配置文件位于其安装目录下的config文件夹中,其中最重要的配置文件是server.properties。在这个文件中,可以配置 Kafka 的各种参数,如 Kafka 监听的端口、日志存储路径、连接 Zookeeper 的地址等。​

以下是一些常见的配置参数及说明:

# Kafka监听的端口,默认9092
listeners=PLAINTEXT://localhost:9092
# 日志存储路径,可以配置多个路径,用逗号分隔
log.dirs=/tmp/kafka - logs
# 连接Zookeeper的地址,格式为host1:port1,host2:port2,...
zookeeper.connect=localhost:2181
# 每个分区的副本因子,即每个分区有多少个副本,建议设置为大于1的奇数,以确保容错
num.partitions=1
replica.fetch.max.bytes=1048576

4.5 启动 Zookeeper 与 Kafka​

在完成 Kafka 配置后,需要先启动 Zookeeper,因为 Kafka 依赖 Zookeeper 进行集群管理。在 Kafka 安装目录下,执行以下命令启动 Zookeeper:

bin/zookeeper - server - start.sh config/zookeeper.properties

上述命令会使用config/zookeeper.properties配置文件启动 Zookeeper 服务。启动成功后,终端会输出一些启动日志信息,显示 Zookeeper 已正常运行并监听在指定端口(默认为 2181)。​

接着,启动 Kafka 服务,执行命令:

bin/kafka - server - start.sh config/server.properties

此命令通过config/server.properties配置文件启动 Kafka 服务器进程。启动过程中,日志会显示 Kafka 加载配置、注册到 Zookeeper 等信息。当看到类似 “[KafkaServer id=0] started” 的日志时,表明 Kafka 已成功启动。

4.6 使用 Kafka 命令行工具验证安装​

Kafka 提供了丰富的命令行工具,方便我们进行各种操作与验证。例如,使用以下命令创建一个新的主题:

bin/kafka - topics.sh --create --bootstrap - servers localhost:9092 --replication - factor 1 --partitions 1 --topic new - topic

参数说明:​

  • --create:表示执行创建主题操作。​
  • --bootstrap - servers localhost:9092:指定 Kafka 集群地址,这里是本地的 9092 端口。​
  • --replication - factor 1:设置主题的副本因子为 1,即每个分区只有一个副本。在生产环境中,为了数据冗余与容错,通常设置为大于 1 的奇数,如 3 或 5。​
  • --partitions 1:指定主题的分区数量为 1。根据业务需求可调整,若业务数据量较大且对并行处理要求高,可设置多个分区。​
  • --topic new - topic:指定要创建的主题名称为new - topic。

创建成功后,可使用以下命令查看当前 Kafka 集群中的所有主题:

bin/kafka - topics.sh --list --bootstrap - servers localhost:9092

该命令会列出所有已创建的主题名称,若能看到刚刚创建的new - topic,则说明主题创建成功。​

还可以使用生产者和消费者命令行工具进行消息的发送与接收测试。首先,启动一个生产者终端,执行命令:

bin/kafka - console - producer.sh --bootstrap - servers localhost:9092 --topic new - topic

启动后,终端会等待输入消息。此时,输入任意消息内容并回车,消息就会被发送到new - topic主题中。​

然后,在另一个终端启动消费者,执行命令:

bin/kafka - console - consumer.sh --bootstrap - servers localhost:9092 --topic new - topic --from - beginning

--from - beginning参数表示从主题的起始位置开始消费消息。

启动消费者后,就能看到之前生产者发送的消息,这表明 Kafka 的基本功能正常,安装与环境搭建成功。

通过这些命令行工具的操作,不仅验证了安装,也进一步熟悉了 Kafka 的基本使用方式。你将发现其在分布式消息处理领域的强大功能,无论是构建大规模数据处理系统,还是实现复杂业务系统的解耦与异步通信,Kafka 都能成为有力的技术支撑。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/75628.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

在线地图支持天地图和腾讯地图,仪表板和数据大屏支持发布功能,DataEase开源BI工具v2.10.7 LTS版本发布

2025年4月11日&#xff0c;人人可用的开源BI工具DataEase正式发布v2.10.7 LTS版本。 这一版本的功能变动包括&#xff1a;数据源方面&#xff0c;Oracle数据源支持获取和查询物化视图&#xff1b;图表方面&#xff0c;在线地图支持天地图、腾讯地图&#xff1b;新增子弹图&…

【Linux实践系列】:匿名管道收尾+完善shell外壳程序

&#x1f525; 本文专栏&#xff1a;Linux Linux实践项目 &#x1f338;作者主页&#xff1a;努力努力再努力wz &#x1f4aa; 今日博客励志语录&#xff1a; 人生总会有自己能力所不及的范围&#xff0c;但是如果你在你能力所及的范围尽了全部的努力&#xff0c;那你还有什么遗…

【C++初学】课后作业汇总复习(七) 指针-深浅copy

1、 HugeInt类:构造、、cout Description: 32位整数的计算机可以表示整数的范围近似为&#xff0d;20亿到&#xff0b;20亿。在这个范围内操作一般不会出现问题&#xff0c;但是有的应用程序可能需要使用超出上述范围的整数。C可以满足这个需求&#xff0c;创建功能强大的新的…

【C++】 —— 笔试刷题day_16

刷题_day16&#xff0c;继续加油啊 一、字符串替换 题目解析 这道题是一道简单的字符题目&#xff0c;题目给我们一个字符串A&#xff0c;和n表示A字符串的长度&#xff0c;再给出一个字符数组arg&#xff0c;m表示arg中是数据个数。 然我们在字符串A中找到%s然后替换成arg中的…

n8n 本地部署及实践应用,实现零成本自动化运营 Telegram 频道(保证好使)

n8n 本地部署及实践应用&#xff0c;实现零成本自动化运营 Telegram 频道&#xff08;保证好使&#xff09; 简介 n8n 介绍 一、高度可定制性 二、丰富的连接器生态 三、自托管部署&#xff08;本地部署&#xff09; 四、社区驱动 n8n 的部署 一、前期准备 二、部署步…

flutter 桌面应用之系统托盘

系统托盘(Tray) 系统托盘就是状态栏里面对应的图标点击菜单 主要有两款框架 框架一句话评价tray_manager轻量、简单、易用&#xff0c;适合常规托盘功能system_tray更底层、更强大、支持图标/菜单/消息弹窗等更多功能&#xff0c;但复杂度更高 &#x1f9f1; 基础能力对比 …

修改idea/android studio等编辑器快捷注释从当前行开头的反人类行为

不知道什么时候开始&#xff0c;idea编辑的快捷注释开始从当前行开头出现了&#xff0c;显得实在是难受&#xff0c;我只想让在当前行代码的部份开始缩进两个字符开始&#xff0c;这样才会显得更舒服。不知道有没有强迫症的猴子和我一样&#xff0c;就像下面的效果&#xff1a;…

MySQL慢查询全攻略:定位、分析与优化实战

&#x1f680; MySQL慢查询全攻略&#xff1a;定位、分析与优化实战 #数据库优化 #性能调优 #SQL优化 #MySQL实战 一、慢查询定位&#xff1a;找到性能瓶颈 1.1 开启慢查询日志 -- 查看当前配置 SHOW VARIABLES LIKE %slow_query%; -- 动态开启&#xff08;重启失效&…

当原型图与文字说明完全不同时,测试要怎么做?

当测试遇上左右手互搏的需求&#xff0c;怎么办&#xff1f; "这个弹窗样式怎么和文档写的不一样&#xff1f;"、"按钮位置怎么跑到左边去了&#xff1f;"——根据Deloitte的调查&#xff0c;62%的项目存在原型图与需求文档不一致的情况。这种"精神分…

关于量化交易在拉盘砸盘方面应用的部分思考

关于“砸盘”的深层解析与操盘逻辑 ​​一、砸盘的本质与市场含义​​ ​​砸盘​​指通过集中抛售大量筹码导致价格快速下跌的行为&#xff0c;其核心目标是​​制造恐慌、清洗浮筹或实现利益再分配​​。不同场景下的砸盘含义不同&#xff1a; ​​主动砸盘&#xff08;操控…

【项目管理】第12章 项目质量管理-- 知识点整理

项目管理-相关文档,希望互相学习,共同进步 风123456789~-CSDN博客 (一)知识总览 项目管理知识域 知识点: (项目管理概论、立项管理、十大知识域、配置与变更管理、绩效域) 对应:第6章-第19章 第6章 项目管理概论 4分第13章 项目资源管理 3-4分第7章 项目…

一个好看的图集展示html页面源码

源码介绍 一个好看的图集展示html页面源码&#xff0c;适合展示自己的作品&#xff0c;页面美观大气&#xff0c;也可以作为产品展示或者个人引导页等等 源码由HTMLCSSJS组成&#xff0c;记事本打开源码文件可以进行内容文字之类的修改&#xff0c; 双击html文件可以本地运行…

2021第十二届蓝桥杯大赛软件赛省赛C/C++ 大学 B 组

记录刷题的过程、感悟、题解。 希望能帮到&#xff0c;那些与我一同前行的&#xff0c;来自远方的朋友&#x1f609; 大纲&#xff1a; 1、空间-&#xff08;题解&#xff09;-字节单位转换 2、卡片-&#xff08;题解&#xff09;-可以不用当组合来写&#xff0c;思维题 3、直…

LabVIEW 中 JSON 数据与簇的转换

在 LabVIEW 编程中&#xff0c;数据格式的处理与转换是极为关键的环节。其中&#xff0c;将数据在 JSON 格式与 LabVIEW 的簇结构之间进行转换是一项常见且重要的操作。这里展示的程序片段就涉及到这一关键功能&#xff0c;以下将详细介绍。 一、JSON 数据与簇的转换功能 &am…

蓝桥杯大模板

init.c void System_Init() {P0 0x00; //关闭蜂鸣器和继电器P2 P2 & 0x1f | 0xa0;P2 & 0x1f;P0 0x00; //关闭LEDP2 P2 & 0x1f | 0x80;P2 & 0x1f; } led.c #include <LED.H>idata unsigned char temp_1 0x00; idata unsigned char temp_old…

通过HTTP协议实现Git免密操作的解决方案

工作中会遇到这样的问题的。 通过HTTP协议实现Git免密操作的解决方案 方法一&#xff1a;启用全局凭据存储&#xff08;推荐&#xff09; 配置凭证存储‌ 执行以下命令&#xff0c;让Git永久保存账号密码&#xff08;首次操作后生效&#xff09;&#xff1a; git config --g…

Java常见面试问题

一.Liunx 二.Java基础 1.final 2.static 3.与equals 三.Collection 1.LIst 2.Map 3.Stream 四、多线程 1.实现方法 2.线程池核心参数 3.应用场景 五、JVM 1.堆 2.栈 六、Spring 1.面向对象 2.IOC 3.AOP 七、Springboot 1.自动装配 八、SpringCloud 1.Nacos 2.seata 3.ga…

【蓝桥杯】第十六届蓝桥杯 JAVA B组记录

试题 A: 逃离高塔 很简单&#xff0c;签到题&#xff0c;但是需要注意精度&#xff0c;用int会有溢出风险 答案&#xff1a;202 package lanqiao.t1;import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.io.PrintWrit…

PyTorch Tensor维度变换实战:view/squeeze/expand/repeat全解析

本文从图像数据处理、模型输入适配等实际场景出发&#xff0c;系统讲解PyTorch中view、squeeze、expand和repeat四大维度变换方法。通过代码演示对比不同方法的适用性&#xff0c;助您掌握数据维度调整的核心技巧。 一、基础维度操作方法 1. view&#xff1a;内存连续的形状重…

Kubernetes nodeName Manual Scheduling practice (K8S节点名称绑定以及手工调度)

Manual Scheduling 在 Kubernetes 中&#xff0c;手动调度框架允许您将 Pod 分配到特定节点&#xff0c;而无需依赖默认调度器。这对于测试、调试或处理特定工作负载非常有用。您可以通过在 Pod 的规范中设置 nodeName 字段来实现手动调度。以下是一个示例&#xff1a; apiVe…