kafka的成神秘籍(java)

kafka的成神秘籍

kafka的简介

​ Kafka 最初是由Linkedin 即领英公司基于Scala和 Java语言开发的分布式消息发布-订阅系统,现已捐献给Apache软件基金会。Kafka 最被广为人知的是作为一个 消息队列(mq)系统存在,而事实上kafka已然成为一个流行的分布式流处理平台。其具有高吞吐、低延迟的特性,许多大数据处理系统比如storm、spark、flink等都能很好地与之集成。按照Wikipedia上的说法,kafka的核心数据结构本质上是一个“按照分布式事务日志架构的大规模发布/订阅消息队列”。总的来讲,kafka通常具有3重角色:

消息系统

​ Kafka和传统的消息队列比如RabbitMQ、RocketMQ、ActiveMQ类似,支持流量削锋、服务解耦、异步通信等核心功能。

流处理平台

​ Kafka 不仅能够与大多数流式计算框架完美整合,并且自身也提供了一个完整的流式处理库,即kafka Streaming。kafka Streaming提供了类似Flink中的窗口、聚合、变换、连接等功能。

存储系统

​ 通常消息队列会把消息持久化到磁盘,防止消息丢失,保证消息可靠性。Kafka的消息持久化机制和多副本机制使其能够作为通用数据存储系统来使用

​ 一句话概括:Kafka 是一个分布式的基于发布/订阅模式的消息队列(Message Queue),在业界主要应用于大数据实时处理领域。

kafka体系结构

Topic(主题)

  • 适用于存储消息的逻辑概念,1个Topic,可以看做是消息的集合。
  • 这个消息的集合(Topic),可以接收多个生产者(Producer)推送(Push)过来的消息,也可以让多个消费者(Consumer)从中消费(Pull)消息。

分区(Partition)

  • 分区的概念,可以理解为Topic的子集:1个Topic可能只有1个分区,也可能有多个分区。这里说的多个分区,一个分区就代表磁盘的一块连续的位置,不同的分区也就是磁盘上不同的区域块。

  • 分区存在的意义:

​ 通过不同的区域块,kafka存储在Topic里的消息就可以在多个地方存储,也就是对kafka进行了水平扩展,这样可以增加kafka的 并行处理能力。

  • 对于不同分区的理解

​ 同一个Topic下,多个区域块,这些不同的区域块,存储的消息是不同的,也就是说,A、B两个区域块,不会同时存储1类消 息。

  • 对于同一个分区中的消息:

    ​ 在区块A中接收到消息时,该消息会接收它在这块区域块A中的Offset(唯一编号),这个Offset使得kafka确定了消息在区域块中的顺序。(这样就保证了磁盘的顺序读写,减少磁盘IO)

    image-20241002131656296

Log

  • 分区在逻辑上,对应一个Log,当生产者将消息写入分区的时候,实际上就是写入一个log。

  • Log:一个逻辑概念,对应着磁盘上的一个文件夹。

  • Log的组成:由多个Segment组成,每1个Segment对应1个日志文件和一个索引文件。

Broker

​ 区域块A会给他接收的消息分配一个offset,并且x会保存到A所在的磁盘区域上。而这个功能,就是由Broker完成的。

  • Broker:1个Broker就是一个单独的kafka server。

  • Broker的主要工作:接收生产者发送来的消息,分配offset,然后将包装过的数据保存到磁盘上。

  • Broker的其他作用:接收消费者Consumer和其他Broker的请求,根据请求的类型进行相应的处理然后返回响应。

  • 这里引出集群(Cluster)的概念,1个Cluster是由多个Broker构成的,也就是说,1个Broker不会对外提供服务,而是通过Cluster的形式对外提供服务:

因为,一个Cluster里,需要1个Broker担任Controller,这个Broker就是这个集群的指挥中心,负责:

  • 管理各个分区(Partition)的状态;
  • 管理每个分区(Partition)的副本的状态;
  • 监听zookeeper的数据变化。

其他的Broker均是通过这个Controller进行指挥的,完成各自相应的功能。

关于Cluster的一主多从实现:

除了担任Controller的Broker会监听其他Broker的状态,其他Broker也会监听Controller的状态,当Controller出翔了故障,就会重新选取新的Broker担任Controller

消息

kafka中最基本的消息单元。有一串字节组成,主要由key和value构成(即key、value都是字节数组)

  • key:主要作用是 根据一定策略,将这个消息路由到制定分区中 ==> 这样就保障了,包含同一个key的消息全部写入一个分区A,不会写入另外一个分区。(即实现了Partition分区中提到的“不同分区存储的消息是不一样的”)

副本

kafka会对消息进行冗余备份,每一个分区Partition,都可以有多个副本(每一个副本包含的消息是相同的,但是不能保证同一时刻下完全相同)。

副本类型:Leader、Follower。

副本选举策略(即选举1个Leader,其余为Follower):

  • 当分区只有1个副本,这个副本就是Leader,没有Follower。
  • 在其他不同场景,会采取不同的选举策略。
  • Leader:处理kafka中所有的读写请求
  • Follower:仅仅把数据从Leader中拉取到本地,同步更新到自己的Log中。

生产者

产生消息的对象,产生消息之后,将消息按照一定的规则推送到Topic的分区中

消费者

从Topic中拉取消息,并对消息进行消费

  • Consumer:有一个作用,维护它消费到分区(Partition)上的什么位置(即offset的值)。
  • Consumer Group: 在kafka中,多个Consumer可以组成1个Consumer Group,1个Consumer只属于1个Consumer Group.

Consumer Group的作用:保证了这个Consumer Group订阅的Topic(Partition的集合)中的每一个分区(Partition),只被Consumer Group中的一个Consumer处理。

当然,如果要实现消息的广播消费,则将同1条消息放在多个不同的Consumer Group中即可。

就上述这个Consumer和Partition的关系可以理解下面的说法:

通过向Consumer Group中动态添加适量的Consumer, 可以触发kafka的Rebalance操作(重新分配Partition和Consumer的一一对应关系,结合Topic部分的理解,这样就实现了kafka的水平扩展能力)

kafka的安装与使用

安装

本文采用docker安装需要确定linux主机上是否有docker环境

docker -v

image-20241002221756035

拉取zookeeper镜像与kafka镜像

#拉取zookeeper镜像
docker pull zookeeper
#拉取kafka镜像
docker pull wurstmeister/kafka

运行zookeeper

docker run --restart=always -d -p 2181:2181 --name zookeeper zookeeper
#-restart=always 停机不断重启
#-p 2181:2181 docker容器端口映射到主机的端口

运行kafka

docker run --restart=always -d --name kafka -p 19092:19092 -e KAFKA_ZOOKEEPER_CONNECT=localhost:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:19092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:19092 -e KAFKA_PORT=19092 wurstmeister/kafka
#KAFKA_ZOOKEEPER_CONNECT=localhost:2181 需要注册的 zookeeper的ip和端口

创建topic

  • 进入容器目录
    docker exec -it kafka bash
  • 执行创建topic命令

sh /opt/kafka_2.13-2.8.1/bin/kafka-topics.sh --create --bootstrap-server 127.0.0.1:19092 --replication-factor 1 --partitions 1 --topic data-transform

image-20241002222929452

使用(java)

添加项目依赖
  <dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>
编写配置文件
spring:kafka:bootstrap-servers: localhost:9092# 消费者consumer:#分组idgroup-id: my-group# 从一次消费过信息的下一条开始消费auto-offset-reset: earliestkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer#        配置kafka的信息的生产者producer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializertemplate:# 默认的消费主题default-topic: data-transform
生产者

@Component
@Slf4j
public class DataProductService {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;public final static String TOPIC = "data-transform";public void sendDataMessage(String message){log.info("生产者生产数据开始:", DateUtils.format(new Date()),"yyyyMMdd HH:mm:ss");kafkaTemplate.send(TOPIC, message).completable().whenComplete((result, ex) -> {if (ex == null) {RecordMetadata metadata = result.getRecordMetadata();log.info("生产者生产数据成功:,数据为:{}",metadata);} else {log.error("生产者生产数据失败:", ex);}});}
}
消费者

/*** @Description:* @author:<a href="2358853434@qq.com"></a> zh* @Create : 2024/10/2**/
@Component
@Slf4j
public class DataConsumerService {@KafkaListener(topics = DataProductService.TOPIC,groupId = "my-group")public void ConsumerData(String message){log.info("消费者消费数据,数据为:{}",message);}
}
测试用例
@SpringBootTest(classes = application.class)
public class demo {@ResourceDataProductService dataProductService;@ResourceDataConsumerService dataConsumerService;@Testpublic void sendDataMessage(){long l = System.currentTimeMillis();//模拟电商下单信息推送for (int i = 1000; i < 10001; i++) {String message = "{\"orderId\":\""+i+"\",\"userId\":\""+i+"\",\"productId\":\""+i+"\",\"productName\":\""+i+"\",\"productPrice\":\""+i+"\"}";dataProductService.sendDataMessage(message);dataConsumerService.ConsumerData(message);}System.out.println(System.currentTimeMillis()-l);}
}
结果

image-20241002235126191

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/55786.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【mmengine】配置器(config)(进阶)继承与导出,命令行修改配置

一、配置文件的继承 1.1 继承机制概述 新建optimizer_cfg.py: optimizer dict(typeSGD, lr0.02, momentum0.9, weight_decay0.0001)新建runtime_cfg.py: device "cuda" gpu_ids [0, 1] batch_size 64 epochs 100 num_workers 8新建resnet50.py: _base_ […

Vue 路由设置

为了防止遗忘&#xff0c;记录一下用Vue写前端配置路由时的过程&#xff0c;方便后续再需要用到时回忆。 一、举个例子 假如需要实现这样的界面逻辑&#xff1a; 在HomePage中有一组选项卡按钮用于导航到子页面&#xff0c;而子页面Page1中有一个按钮&#xff0c;其响应事件是…

笔记-stm32移植ucos

文章目录 一、UCOS的基础知识1.1 前后台系统:1.2 RTOS系统可剥夺型内核:前后台系统和RTOS系统 1.3 UCOS系统简介学习方法 二、ucossii移植Step1&#xff1a;在工程中建立存放UCOSS代码的文件夹UCOSIIStep2:向CORE文件夹添加文件Step3:向Config文件夹添加文件Step4:向port文件夹…

LLM4Rec最新工作: 字节发布用于序列推荐的分层大模型HLLM

前几个月 Meta HSTU 点燃各大厂商对 LLM4Rec 的热情&#xff0c;一时间&#xff0c;探索推荐领域的 Scaling Law、实现推荐的 ChatGPT 时刻、取代传统推荐模型等一系列话题让人兴奋&#xff0c;然而理想有多丰满&#xff0c;现实就有多骨感&#xff0c;尚未有业界公开真正复刻 …

vscode中配置python虚拟环境

python虚拟环境作用 Python虚拟环境允许你为每个独立的项目创建一个隔离的环境&#xff0c;这样每个项目都可以拥有自己的一套Python安装包和依赖&#xff0c;不会互相影响。实际使用中&#xff0c;可以在vscode或pycharm中使用虚拟环境。 1.创建虚拟环境的方法&#xff1a; …

【NLP自然语言处理】01-基础学习路径简介

目的&#xff1a;让大家能够在 AI-NLP 领域由基础到入门具体安排&#xff1a; NLP介绍 文本预处理RNN 及其变体&#xff08;涉及案例&#xff09;Transformer 原理详解迁移学习 和 Bert 模型详解 &#xff08;涉及案例&#xff09;特点&#xff1a; 原理 实践每个文章会有练习…

04-SpringBootWeb案例(中)

3. 员工管理 完成了部门管理的功能开发之后&#xff0c;我们进入到下一环节员工管理功能的开发。 基于以上原型&#xff0c;我们可以把员工管理功能分为&#xff1a; 分页查询&#xff08;今天完成&#xff09;带条件的分页查询&#xff08;今天完成&#xff09;删除员工&am…

算法题总结(十)——二叉树上

#二叉树的递归遍历 // 前序遍历递归LC144_二叉树的前序遍历 class Solution {public List<Integer> preorderTraversal(TreeNode root) {List<Integer> result new ArrayList<Integer>(); //也可以把result 作为全局变量&#xff0c;只需要一个函数即可。…

Linus Torvalds 要求内核开发人员编写更好的 Git 合并提交信息

昨天在宣布 Linux 6.12-rc2 内核时&#xff0c;Linus Torvalds 要求内核维护者在提交信息方面做得更好。Torvalds 尤其希望内核维护者在描述拉取请求中的变更时&#xff0c;能更好地使用积极、命令式的语气。 Linux创建者在6.12-rc2 公告中解释道&#xff1a; 总之&#xff0c…

论文阅读笔记-XLNet: Generalized Autoregressive Pretraining for Language Understanding

前言 Google发布的XLNet在问答、文本分类、自然语言理解等任务上都大幅超越BERT,XLNet提出一个框架来连接语言建模方法和预训练方法。我们所熟悉的BERT是denoising autoencoding模型,最大的亮点就是能够获取上下文相关的双向特征表示,所以相对于标准语言模型(自回归)的预…

【AIGC】ChatGPT提示词Prompt高效编写模式:结构化Prompt、提示词生成器与单样本/少样本提示

博客主页&#xff1a; [小ᶻZ࿆] 本文专栏: AIGC | ChatGPT 文章目录 &#x1f4af;前言&#x1f4af;结构化Prompt (Structured Prompt)组成元素应用实例优势结论 &#x1f4af;提示词生成器 (Prompt Creator)如何工作应用实例优势结论 &#x1f4af;单样本/少样本提示 (O…

什么是安全运营中心 SOC?

SOC 代表安全运营中心&#xff0c;它是任何企业中负责组织安全、保护企业免受网络风险的单一、集中的团队或职能。 安全运营中心将管理和控制业务运营的所有安全要素&#xff0c;从监控资产到雇用合适的人员和流程&#xff0c;再到检测和应对威胁。 在本文中&#xff0c;我们…

PHP变量(第④篇)

本栏目教学是php零基础到精通&#xff0c;如果你还没有安装php开发工具请查看下方链接&#xff1a; Vscode、小皮面板安装-CSDN博客 今天来讲一讲php中的变量&#xff0c;变量是用于存储信息的"容器"&#xff0c;这些数据可以在程序执行期间被修改&#xff08;即其…

ThinkBook 16+ 锐龙6800h 安装ubuntu键盘失灵

问题&#xff1a;在ThinkBook 16 锐龙6800h 安装ubuntu18.04 出现笔记本键盘按下延迟非常高&#xff0c;输出卡死的情况&#xff0c;但是外接键盘可以正常使用 解决&#xff1a;更新内核 1、进入 https://kernel.ubuntu.com/~kernel-ppa/mainline/ 下载所需内核版本&#x…

Node.js+Express毕设论文选题最新推荐题目和方向

目录 一、前言 二、毕设选题推荐 三、总结 四、附录&#xff08;手册、官网、资源教程等&#xff09; 1. Node.js 官方资源 2. Express 官方资源 3.安装方法 4 创建示例 一、前言 Node.js 是一个基于 Chrome V8 引擎的 JavaScript 运行环境&#xff0c;它允许开发者使用…

智能医疗:Spring Boot医院管理系统开发

2相关技术 2.1 MYSQL数据库 MySQL是一个真正的多用户、多线程SQL数据库服务器。 是基于SQL的客户/服务器模式的关系数据库管理系统&#xff0c;它的有点有有功能强大、使用简单、管理方便、安全可靠性高、运行速度快、多线程、跨平台性、完全网络化、稳定性等&#xff0c;非常适…

x++、++x的一些问题

x、x在字面上无非就说一个先前置递增然后再运算&#xff0c;另一个是运算完再递增&#xff0c;是不是有些许模棱两可的感觉&#xff0c;接下来引用一个简单的for循环就能够大致理解&#xff1a; 先是x&#xff1a; int i0,x0;for(i0;(i)<5;){xi;printf("%d\n",x)…

ubuntu 安装baget

一、安装netcore3.1 环境 二、下载运行文件 下载&#xff1a;github.com/loic-sharma/BaGet/releases 修改&#xff1a;appsettings.json文件 mkdir -p /root/apps/baget mkdir -p /root/apps/datas touch /root/apps/baget.db cd /root/apps/baget dotnet BaGet.dll --urls&…

Android Framework(八)WMS-窗口动效概述

文章目录 动画简述本地、远端动画的定义什么是“leash”图层“leash”图层的命令与创建 Winscope流程小结 动画流程概览分析Activity启动app_transition 动画的主要事件触发动画执行的套路动画真正执行动画的结束回调触发远端动画的Target 动画简述 1、动画的原理也是利用了视觉…

思科dhcp的配置

以路由器为例 让pc3 自动获取ip地址并获取的网段为172.16.4.100-172.16.4.200 配置如下&#xff1a; R1(config)#interface GigabitEthernet0/2 R1(config)#ip address 172.16.4.254 255.255.255.0 R1(config)# no shutdown R1(config)#ip dhcp pool 4_pool //创建dhcp地址池…