Kafka 快速实战及基本原理详解解析-01

一、Kafka 介绍

1. MQ 的作用

消息队列(Message Queue,简称 MQ)是一种用于跨进程通信的技术,核心功能是通过异步消息的方式实现系统之间的解耦。它在现代分布式系统中有着广泛的应用,主要作用体现在以下三个方面:

异步处理

在传统的同步调用中,生产者和消费者需要同时在线,并且生产者在完成任务后才能继续执行其他工作。这种模式限制了系统的性能。而引入消息队列后,生产者可以将任务提交到队列中,消费者按需消费任务,从而提升系统的吞吐量。

  • 示例:快递员送快递到客户家,效率低下。而菜鸟驿站的出现让快递员只需将包裹放置在驿站,客户可以根据自己的时间安排取件。这种方式大大提高了效率。
解耦

解耦是消息队列最重要的功能之一。服务之间通过消息队列传递数据,而不是直接调用对方的服务接口,这样可以有效降低系统的耦合度。

  • 示例:《Thinking in JAVA》原书是英文版,但通过翻译社将内容翻译成多种语言,满足不同读者的需求。翻译社起到了桥梁作用,不同语言之间的沟通不再直接依赖于作者和读者。
削峰填谷

在高并发场景下,系统往往会遇到流量高峰,导致系统负载过重。通过消息队列,可以将流量暂存并按固定速率处理,从而避免系统崩溃。

  • 示例:长江每年都会涨水,但通过三峡大坝的调节,下游的出水速度保持稳定,避免了洪水泛滥。

2. 为什么要用 Kafka

Kafka 是一种高吞吐量、低延迟、分布式的消息队列系统,适合在大规模数据处理场景中使用。以下是 Kafka 的典型使用场景和优势:

日志聚合场景

在大规模分布式系统中,各个服务都会产生大量的日志信息。传统的日志收集方式往往存在以下问题:

  • 数据量大:需要快速收集和处理来自各个渠道的海量日志。
  • 容错性要求高:集群中允许少量节点出现故障而不影响整体服务。
  • 功能专注:Kafka 专注于高吞吐量、低延迟的消息传递,不追求复杂的消息处理功能。
核心优势
  • 高吞吐量:Kafka 能够处理数百万 TPS(每秒事务处理量)。
  • 低延迟:通常在毫秒级别的延迟时间内完成消息传递。
  • 可扩展性:通过增加节点和分区数量,可以线性扩展处理能力。
  • 容错性:通过副本机制保证消息的高可用性。
  • 持久化:Kafka 使用磁盘存储消息,保证消息的持久性。

二、Kafka 快速上手

1. 实验环境准备

要快速上手 Kafka,首先需要搭建实验环境。以下是推荐的实验环境配置:

  • 虚拟机数量:3 台
  • 操作系统:CentOS 7
  • Java 版本:Java 8
环境配置步骤
  1. 下载 Kafka 和 Zookeeper。
  2. 将 Kafka 解压到 /app/kafka 目录,将 Zookeeper 解压到 /app/zookeeper 目录。
  3. 配置环境变量,确保系统能够识别 Kafka 和 Zookeeper 的命令。
  4. 关闭防火墙,以避免端口阻塞:
    systemctl stop firewalld.service
    

2. 单机服务体验

为了更直观地理解 Kafka 的工作原理,我们可以先体验单机版 Kafka 服务。

步骤 1:启动 Zookeeper

Kafka 依赖 Zookeeper 进行元数据管理和选举机制。在实际部署中,通常使用独立的 Zookeeper 集群。

启动 Zookeeper 服务:

nohup bin/zookeeper-server-start.sh config/zookeeper.properties &

检查 Zookeeper 是否正常启动:

jps

确认输出中有 QuorumPeerMain 进程。

步骤 2:启动 Kafka

启动 Kafka 服务前,需要确保 Zookeeper 服务正常运行。

启动 Kafka 服务:

nohup bin/kafka-server-start.sh config/server.properties &

确认 Kafka 是否正常启动:

jps

检查输出中是否包含 Kafka 进程。

步骤 3:创建和使用 Topic

Kafka 的基础工作机制是通过 Topic 进行消息的传递。

  1. 创建 Topic

    bin/kafka-topics.sh --create --topic test --bootstrap-server localhost:9092
    
  2. 发送消息 启动生产者端并发送消息:

    bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
    > 这是一条测试消息
    
  3. 消费消息 启动消费者端并接收消息:

    bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
    

3. 理解 Kafka 的消息传递机制

Kafka 的消息传递机制可以通过以下核心组件来理解:

  • 生产者(Producer):将消息发送到指定的 Topic。
  • 消费者(Consumer):从指定的 Topic 消费消息。
  • Topic:逻辑概念,表示一类业务消息的集合。
  • Partition:物理概念,实际存储消息的分区。
  • Broker:Kafka 服务器实例,存储和管理 Partition。

Kafka 的设计目标是通过这些组件实现高效、可靠的消息传递,满足企业级数据管道的需求。


四、Kafka 集群服务

1. 为什么要使用集群

单机部署的 Kafka 在性能上虽然已经非常出色,但在实际生产环境中通常需要使用 Kafka 集群来进一步提升数据存储能力和系统的高可用性。集群可以解决以下问题:

1.1 解决海量数据存储问题

单个 Broker 服务器的存储能力有限,当数据量增长到一定程度时,单机难以承载。通过集群部署,可以将数据分散存储在多个 Broker 中,从而提升整体存储能力。

1.2 提高系统容错能力

单机环境中,如果 Broker 崩溃,所有数据都会丢失。而集群环境下,每个 Partition 都有多个副本,即使部分 Broker 节点宕机,系统依然可以正常运行,保证数据的高可用性。


五、理解服务端的 Topic、Partition 和 Broker

Kafka 的核心架构由 Topic、Partition 和 Broker 组成,这三者之间的关系至关重要:

  • Topic:一个逻辑的消息分类,每个 Topic 包含多条消息。
  • Partition:每个 Topic 可以分成多个 Partition,每个 Partition 是一个消息队列。
  • Broker:Kafka 的服务器实例,负责存储 Partition 数据,并处理客户端请求。

5.1 创建分布式 Topic 示例

bin/kafka-topics.sh --bootstrap-server worker1:9092 --create --replication-factor 2 --partitions 4 --topic distributedTopic

5.2 查看 Topic 信息

bin/kafka-topics.sh --bootstrap-server worker1:9092 --describe --topic distributedTopic

六、章节总结:Kafka 集群的整体结构

通过前面的学习,我们可以总结 Kafka 集群的整体结构:

  1. Topic 是逻辑概念,Producer 和 Consumer 通过 Topic 进行消息传递。
  2. Partition 是实际存储单元,保证数据分散存储和负载均衡。
  3. Broker 是 Kafka 的服务器实例,存储 Partition 数据并处理客户端请求。
  4. Zookeeper 管理 Kafka 集群的元数据和选举过程。
  5. Controller 是 Kafka 集群的核心管理节点,负责管理 Topic 和 Partition 的分配。

七、Spring Boot 实现 Kafka 消息有序性

为了保证 Kafka 的消息有序性,可以使用 Spring Boot 和 Kafka 的整合来实现。在 Java 的 Spring Boot 项目中,我们通过指定消息的 Key 和自定义分区器来确保消息发送到相同的 Partition,从而实现有序性。

7.1 依赖配置

在 Maven 项目中,引入 Kafka 的依赖:

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-kafka</artifactId>
</dependency>

7.2 配置 KafkaProducer

创建 Kafka 的生产者配置类:

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;import java.util.HashMap;
import java.util.Map;@Configuration
public class KafkaProducerConfig {@Beanpublic ProducerFactory<String, String> producerFactory() {Map<String, Object> configProps = new HashMap<>();configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);return new DefaultKafkaProducerFactory<>(configProps);}@Beanpublic KafkaTemplate<String, String> kafkaTemplate() {return new KafkaTemplate<>(producerFactory());}
}

7.3 发送有序消息

创建一个消息发送服务,确保消息使用相同的 Key 发送到同一个 Partition:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;@Service
public class KafkaProducerService {private static final String TOPIC = "test_topic";@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;public void sendMessage(String key, String message) {kafkaTemplate.send(TOPIC, key, message);}
}

7.4 自定义分区器(可选)

如果有更复杂的分区逻辑,可以自定义分区器:

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;import java.util.Map;public class CustomPartitioner implements Partitioner {@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {// 自定义分区逻辑return Math.abs(key.hashCode()) % cluster.partitionCountForTopic(topic);}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> configs) {}
}

7.5 设置一个 Topic 对应一个 Partition 的方法

如果业务需求是保证某个 Topic 的消息全局有序,可以在创建 Topic 时将 Partition 数量设置为 1,从而保证所有消息存储在同一个 Partition 中,实现全局有序。

创建一个 Partition 的 Topic
bin/kafka-topics.sh --create --topic singlePartitionTopic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
在 Spring Boot 中发送消息到该 Topic
@Service
public class KafkaSinglePartitionProducerService {private static final String TOPIC = "singlePartitionTopic";@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;public void sendMessage(String message) {kafkaTemplate.send(TOPIC, message);}
}

通过这种方式,所有发送到 singlePartitionTopic 的消息都会进入同一个 Partition,确保消息顺序性。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/892220.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

xtu oj 1614 数字(加强版)

输出格式# 每行输出一个样例的结果&#xff0c;为一个整数。 样例输入# 3 1 10 101 样例输出# 1 2 3 解题思路&#xff1a;这个题不要想复杂了&#xff0c;很容易超时。 首先需要注意的点&#xff0c;n<10的10000次方&#xff0c;用int或者long long都会爆&#xff0c;所…

了解RabbitMQ:强大的开源消息队列中间件

在现代分布式系统中&#xff0c;消息队列&#xff08;Message Queue&#xff0c;简称MQ&#xff09;作为一种重要的组件&#xff0c;承担着上下游消息传递和通信的重任。其中&#xff0c;RabbitMQ作为一款流行的开源消息队列中间件&#xff0c;凭借其高可用性、可扩展性和易用性…

这是什么操作?强制迁移?GitLab 停止中国区用户访问

大家好&#xff0c;我是鸭鸭&#xff01; 全球知名代码托管平台 GitLab 发布通告&#xff0c;宣布不再为位于中国大陆、香港及澳门地区的用户提供访问服务&#xff0c;并且“贴心”建议&#xff0c;可以访问极狐 GitLab。 极狐 GitLab 是一家中外合资公司&#xff0c;宣称获得…

第二届 Sui 游戏峰会将于 3 月 18 日在旧金山举行

3 月中旬&#xff0c;Sui 基金会和 Mysten Labs 将共同举办第二届 Sui 游戏峰会&#xff08;Sui Gaming Summit&#xff09;&#xff0c;这是一个专注于 Sui 游戏平台的 GDC 周边活动。此次峰会将与旧金山的年度游戏开发者大会&#xff08;GDC&#xff0c;Game Developers Conf…

易支付二次元网站源码及部署教程

易支付二次元网站源码及部署教程 引言 在当今数字化时代&#xff0c;二次元文化逐渐成为年轻人生活中不可或缺的一部分。为了满足这一庞大用户群体的需求&#xff0c;搭建一个二次元主题网站显得尤为重要。本文将为您详细介绍易支付二次元网站源码的特点及其部署教程&#xf…

计算机毕业设计hadoop+spark知网文献论文推荐系统 知识图谱 知网爬虫 知网数据分析 知网大数据 知网可视化 预测系统 大数据毕业设计 机器学习

温馨提示&#xff1a;文末有 CSDN 平台官方提供的学长联系方式的名片&#xff01; 温馨提示&#xff1a;文末有 CSDN 平台官方提供的学长联系方式的名片&#xff01; 温馨提示&#xff1a;文末有 CSDN 平台官方提供的学长联系方式的名片&#xff01; 作者简介&#xff1a;Java领…

LabVIEW四旋翼飞行器姿态监测系统

四旋翼飞行器姿态监测系统是一个集成了高度、速度、俯仰角与滚转角数据采集与分析的系统&#xff0c;提高飞行器在复杂环境中的操作精确度与安全性。系统利用LabVIEW平台与硬件传感器相结合&#xff0c;实现实时数据处理与显示&#xff0c;有效地提升了四旋翼飞行器的监测与控制…

3D机器视觉的类型、应用和未来趋势

3D相机正在推动机器视觉市场的增长。很多制造企业开始转向自动化3D料箱拣选&#xff0c;专注于使用3D视觉和人工智能等先进技术来简化操作并减少开支。 预计3D相机将在未来五年内推动全球机器视觉市场&#xff0c;这得益于移动机器人和机器人拣选的强劲增长。到 2028 年&#…

JavaFX基础之环境配置,架构,FXML

文章目录 1 JavaFX1.1 简介1.2 环境准备1.2.1 手动管理依赖1.2.2 maven或Gradle管理 1.3 JavaFX 架构1.3.1 JavaFX 架构图1.3.2 JavaFX组件1.3.2.1 舞台1.3.2.2 场景1.3.2.3 控件1.3.2.4 布局1.3.2.5 图表1.3.2.6 2D图形1.3.2.7 3D图形1.3.2.8 声音1.3.2.9 视频 1.4 简单使用1.…

php命名空间

什么是命名空间 从广义上来说&#xff0c;命名空间是一种封装事物的方法&#xff0c;在很多地方都可以见到这种抽象概念。 例如&#xff0c;在操作系统中目录用来将相关文件分组&#xff0c;对于目录中的文件来说&#xff0c;它就扮演了命名空间的角色。 具体举个例子&#xf…

【Unity3D】导出Android项目以及Java混淆

Android Studio 下载文件归档 | Android Developers Android--混淆配置&#xff08;比较详细的混淆规则&#xff09;_android 混淆规则-CSDN博客 Unity版本&#xff1a;2019.4.0f1 Gradle版本&#xff1a;5.6.4&#xff08;或5.1.1&#xff09; Gradle Plugin版本&#xff…

腾讯云AI代码助手编程挑战赛-每日一句

一、作品简介 “每日一句”是一个基于Python的图形用户界面&#xff08;GUI&#xff09;应用程序&#xff0c;旨在为用户提供随机的中英文名言警句。它利用腾讯云AI代码助手辅助开发&#xff0c;为用户带来便捷、高效的阅读体验。 二、技术架构 1. 编程语言&#xff1a;使用P…

【AI工具】PDFMathTranslate安装使用

用了一天时间&#xff0c;安装并使用了PDFMathTranslate这款PDF文档翻译工具。 PDFMathTranslate是能够完整保留排版的 PDF 文档全文双语翻译项目&#xff0c;之前使用文档翻译的时候&#xff0c;对于论文这种类型的文章&#xff0c;由于图表和公式太多&#xff0c;文档翻译经常…

conda 批量安装requirements.txt文件

conda 批量安装requirements.txt文件中包含的组件依赖 conda install --yes --file requirements.txt #这种执行方式&#xff0c;一遇到安装不上就整体停止不会继续下面的包安装。 下面这条命令能解决上面出现的不执行后续包的问题&#xff0c;需要在CMD窗口执行&#xff1a; 点…

网络安全图谱以及溯源算法

​ 本文提出了一种网络攻击溯源框架&#xff0c;以及一种网络安全知识图谱&#xff0c;该图由六个部分组成&#xff0c;G <H&#xff0c;V&#xff0c;A&#xff0c;E&#xff0c;L&#xff0c;S&#xff0c;R>。 1|11.知识图 ​ 网络知识图由六个部分组成&#xff0c…

上汽乘用车研发流程

目的 最近刚入职主机厂&#xff0c;工作中所提到各个阶段名称与之前在供应商那边不一致&#xff0c;概念有点模糊&#xff0c;所以打算学习了解一番 概念 术语 EP: enginerring prototype car 工程样车 Mule Car: 骡子车 Simulator Car&#xff1a;模拟样车 PPV&#xff1a;…

封装/前线修饰符/Idea项目结构/package/impore

目录 1. 封装的情景引入 2. 封装的体现 3. 权限修饰符 4. Idea 项目结构 5. package 关键字 6. import 关键字 7. 练习 程序设计&#xff1a;高内聚&#xff0c;低耦合&#xff1b; 高内聚&#xff1a;将类的内部操作“隐藏”起来&#xff0c;不需要外界干涉&#xff1b…

计算机网络 (23)IP层转发分组的过程

一、IP层的基本功能 IP层&#xff08;Internet Protocol Layer&#xff09;是网络通信模型中的关键层&#xff0c;属于OSI模型的第三层&#xff0c;即网络层。它负责在不同网络之间传输数据包&#xff0c;实现网络间的互联。IP层的主要功能包括寻址、路由、分段和重组、错误检测…

【W800】UART 的使用与问题

1.开发环境 OS: Windows 11开发板&#xff1a;海凌科 HLK-W800-KIT-PROSDK: W80X_SDK_v1.00.10IDE: CSKY Development Kit 2.UART 使用 在 SDK 中创建文件 uart_test.h 和 uart_test.c&#xff0c;然后在 CDK 项目中添加这两个文件&#xff0c;CDK 会自动 include 头文件。 …

万界星空科技质量管理QMS系统具体功能介绍

一、什么是QMS系统&#xff0c;有什么价值&#xff1f; 1、QMS 系统即质量管理系统&#xff08;Quality Management System&#xff09;。 它是一套用于管理和控制企业产品或服务质量的集成化体系。 2、QMS 系统的价值主要体现在以下几个方面&#xff1a; 确保产品质量一致性…