Kafka学习-Java使用Kafka

文章目录

  • 前言
  • 一、Kafka
    • 1、什么是消息队列
      • offset
    • 2、高性能
      • topic
      • partition
    • 3、高扩展
      • broker
    • 4、高可用
      • replicas、leader、follower
    • 5、持久化和过期策略
    • 6、消费者组
    • 7、Zookeeper
    • 8、架构图
  • 二、安装Zookeeper
  • 三、安装Kafka
  • 四、Java中使用Kafka
    • 1、引入依赖
    • 2、生产者
    • 3、消费者
    • 4、运行效果


前言

Kafka消息中间件

一、Kafka

1、什么是消息队列

假设我们有两个服务:生产者A每秒能生产200个消息,消费者B每秒能消费100个消息。

在这里插入图片描述

那么B服务是处理不了A这么多消息的,那么怎么使B不被压垮的同时还能处理A的消息呢,我们引入一个中间件,即Kafka。(当然着并不能使消费者的处理速度上升)

在这里插入图片描述

offset

那么我们可以在B服务中加入一个队列,也就是一个链表,链表的每个节点相当于一条消息,每个节点有一个序号即offset,记录消息的位置。

在这里插入图片描述

在这里插入图片描述

但是这样也会有个问题,还没有处理的消息是存储在内存中的,如果B服务挂掉,那么消息也就丢失了。
所以我们可以把队列移出,变成一个单独的进程,即使B服务挂掉,消息也不会丢失。

在这里插入图片描述

2、高性能

B服务由于性能差,队列中未处理的消息会越来越多,我们可以增加更多的消费者来处理消息,相对的也可以增加更多的生产者来生成消息。

在这里插入图片描述

topic

但是,生产者与消费者会争抢同一个队列,没有抢到就要等待,那么怎么解决呢?
我们可以将消息进行分类,每一类消息是一个topic,生产者按消息的类型投递到不同的topic中,消费者也按照不同的topic进行消费。

在这里插入图片描述

partition

但是单个topic的消息还是有可能过多,我们可以将单个队列拆分,每段是一个partition分区,每个消费者负责一个partition

在这里插入图片描述

3、高扩展

broker

随着partition过多,所有的partition都在同一个机器上,就可能会导致单机的cpu和内存过高,影响性能,那么我们可以使用多台机器,将partition分散部署在不同的机器上。每台机器就代表一个broker
我们可以增加broker来缓解服务器的cpu过高的性能问题。

在这里插入图片描述

4、高可用

replicas、leader、follower

假如某个broker挂了, 那么其中partition中的消息也就都丢失了,那么这个问题怎么解决呢?
我们可以给partition多加几个副本,统称replicas,并将它们分为leaderfollower
leader负责生产者和消费者的读写,follower只负责同步leader的数据。假如leader挂了,也不会影响follower,随后在follower中选出一个leader,保证消息队列的高可用。

在这里插入图片描述

5、持久化和过期策略

在上面讲述了leader挂掉的情况,如果所有的broker都挂了,消息不就都丢失了?
为了解决这个问题,就不能只把数据存在内存中,也要存在磁盘中。
但是如果所有消息一直保存在磁盘中,那磁盘也会被占满,所以引入保留策略。

6、消费者组

如果我想在原有的基础上增加一个消费者,那么它只能跟着最新的offset接着消费,如果我想从某个offset开始消费呢?
我们引入消费者组,实现不同消费者维护自己的消费进度。

在这里插入图片描述

7、Zookeeper

上面介绍了很多的组件,每个组件都有自己的状态信息,那么就需要有一个组件去统一维护这些组件的状态信息,于是引入了Zookeeper组件,它会定期与broker通信,获取Kafka集群的状态,判断哪些broker挂了,消费者组消费到哪了等等。

8、架构图

在这里插入图片描述

二、安装Zookeeper

1、官网地址

https://zookeeper.apache.org/

2、下载

在这里插入图片描述

选择稳定版本下载

在这里插入图片描述

3、解压,修改配置文件

解压后,复制 zoo_sample.cfg,重命名为 zoo.cfg

在这里插入图片描述

修改数据文件目录位置

在这里插入图片描述

4、启动

我们是在windows系统下安装的,运行 bin 目录下的 zkServer.cmd

在这里插入图片描述

三、安装Kafka

1、官网地址

https://kafka.apache.org/

2、下载

在这里插入图片描述

3、解压,修改配置文件

修改 config 目录下 server.properties 文件
修改日志文件位置,其他参数(如zookeeper端口,根据需要修改)

在这里插入图片描述

4、启动

bin\windows\kafka-server-start.bat config\server.properties

在这里插入图片描述

四、Java中使用Kafka

1、引入依赖

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId>
</dependency>

2、生产者

public static void main(String[] args) throws InterruptedException {Properties prop = new Properties();prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");prop.put(ProducerConfig.ACKS_CONFIG, "all");prop.put(ProducerConfig.RETRIES_CONFIG, 0);prop.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);prop.put(ProducerConfig.LINGER_MS_CONFIG, 1);prop.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);String topic = "hello";KafkaProducer<String, String> producer = new KafkaProducer<>(prop);for (int i = 0; i < 100; i++) {producer.send(new ProducerRecord<String, String>(topic, Integer.toString(2), "hello kafka" + i));System.out.println("生产消息:" + i);Thread.sleep(1000);}producer.close();
}

3、消费者

public static void main(String[] args) {Properties prop = new Properties();prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");prop.put(ConsumerConfig.GROUP_ID_CONFIG, "con-1");    // 消费者组prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);    //自动提交偏移量prop.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);     //自动提交时间KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop);ArrayList<String> topics = new ArrayList<>();//可以订阅多个消息topics.add("hello");consumer.subscribe(topics);try {while(true) {ConsumerRecords<String, String> poll = consumer.poll(Duration.ofSeconds(10));for (TopicPartition topicPartition : poll.partitions()) {//	通过TopicPartition获取指定的消息集合,获取到的就是当前topicPartition下面所有的消息List<ConsumerRecord<String, String>> partitionRecords = poll.records(topicPartition);//	获取TopicPartition对应的主题名称String topic = topicPartition.topic();//	获取TopicPartition对应的分区位置int partition = topicPartition.partition();//	获取当前TopicPartition下的消息条数int size = partitionRecords.size();System.out.printf("--- 获取topic: %s, 分区位置:%s, 消息总数: %s%n",topic,partition,size);for(int i = 0; i < size; i++) {ConsumerRecord<String, String> consumerRecord = partitionRecords.get(i);//	实际的数据内容String key = consumerRecord.key();//	实际的数据内容String value = consumerRecord.value();//	当前获取的消息偏移量long offset = consumerRecord.offset();//	表示下一次从什么位置(offset)拉取消息long commitOffser = offset + 1;System.out.printf("消费消息 key:%s, value:%s, 消息offset: %s, 提交offset: %s%n",key, value, offset, commitOffser);Thread.sleep(1500);}}}} catch (Exception e) {e.printStackTrace();} finally {consumer.close();}
}

4、运行效果

生产消息

在这里插入图片描述

消费消息

在这里插入图片描述
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/837867.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

JavaScript异步编程——09-Promise类的方法【万字长文,感谢支持】

Promise 类的方法简介 Promise 的 API 分为两种&#xff1a; Promise 实例的方法&#xff08;也称为&#xff1a;Promis的实例方法&#xff09; Promise 类的方法&#xff08;也称为&#xff1a;Promise的静态方法&#xff09; 前面几篇文章&#xff0c;讲的都是 Promise 实…

USB3.0接口——(3)协议层(包格式)

1.协议层 1.1.超高速传输事务 超高速事务&#xff08;SuperSpeed transactions&#xff09;由主机对设备端点请求或发送数据开始&#xff0c;并在端点发送数据或确认收到数据时完成。 超高速总线上的数据传输&#xff08;transfer&#xff09;是主机请求设备应用程序生成的数…

记element-ui树形控件懒加载实现

概要 如何通过vue2element-ui实现树形控件的懒加载 整体架构流程 1.树形控件的组件代码 <el-tree:data"treeData":props"defaultProps":load"load"lazycurrent-change"handleNodeClick":expand-on-click-node"true":h…

【LAMMPS学习】九、LAMMPS脚本 示例

9. 示例脚本 LAMMPS 发行版包含一个包含许多示例问题的示例子目录。许多是二维模型&#xff0c;运行速度快且易于可视化&#xff0c;在台式机上运行最多需要几分钟。每个问题都有一个输入脚本 (in.*)&#xff0c;并在运行时生成一个日志文件 (log.*)。有些使用初始坐标的数据文…

课时125:awk实践_进阶知识_匹配运算

1.2.4 匹配运算 学习目标 这一节&#xff0c;我们从 基础知识、简单实践、小结 三个方面来学习 基础知识 简介 所谓的匹配运算&#xff0c;主要指的是关键字无法精确性的匹配相关信息了&#xff0c;但是我们可以结合一些关键字信息进行模糊的匹配。对于匹配运算来说&#x…

streamlit报错:AxiosError: Request failed with status code 403

解决办法&#xff1a; 步骤一&#xff1a;创建config.toml vi ~/.streamlit/config.toml 步骤二&#xff1a;加入以下内容 [server] enableXsrfProtection false enableCORS false步骤三&#xff1a;重新启动你的streamlit网页

排程过程中任务锁定的外延与内涵

在生产排程过程中&#xff0c;除了可以借助强大的算法&#xff0c;与优质的规划模型对待排任务进行排产优化外&#xff0c;还会遇到一些需要人为锁定部分任务的情况。无论是APS系统开发人员&#xff0c;还是排产作业人员&#xff0c;在常见的认识中&#xff0c;对于“锁定”概念…

windows C++:进程间通信高实时性、安全、数据量大的通信方式(一)文件映射 (File Mapping)

windows进程间通信是写多进程程序的必修课&#xff0c;高实时性、安全、数据量大的通信方式是很必要的&#xff0c;今天我们来看看文件映射 一、文件映射 (File Mapping) 1. 简单的介绍 文件映射通过将文件的部分或全部内容映射到一个或多个进程的虚拟地址空间&#xff0c;使…

Linux-基础IO

&#x1f30e;Linux基础IO 文章目录&#xff1a; Linux基础IO C语言中IO交互       常用C接口         fopen         fputs         fwrite         fgets 当前路径       三个文件流 系统文件IO       open函数     …

什么是Wi-Fi保护设置(WPS),以及如何使用它?这里有详细解释

生活在现代世界的双刃剑是,一切都可以无线连接,但这往往会让我们更容易受到攻击。WPS可以帮助你减轻这种风险,而不需要你精通技术,只需简单地按下路由器上的按钮。 什么是Wi-Fi保护设置(WPS) 当你不想手动连接时,路由器上的WPS按钮是一种连接无线设备的简单方法。它使…

特征模态分解(FMD):一种小众而又新颖的分解方法

​ 声明&#xff1a;文章是从本人公众号中复制而来&#xff0c;因此&#xff0c;想最新最快了解各类智能优化算法及其改进的朋友&#xff0c;可关注我的公众号&#xff1a;强盛机器学习&#xff0c;不定期会有很多免费代码分享~ 今天为大家介绍一个小众而又新颖的信号分…

行业大模型:推动数字化转型的新引擎

引言 随着人工智能技术的飞速发展,大模型技术正成为推动社会进步和产业革新的关键力量。腾讯研究院的《行业大模型调研报告》为我们揭示了这一技术如何催生新一轮的技术创新与产业变革,特别是在工业、金融、广电等领域的数字化转型和高质量发展中发挥着重要作用。 大模型技…

tensorflow实现二分类

# 导入所需库和模块 from tensorflow.keras.layers import Dense, Input, Activation # 导入神经网络层和激活函数模块 from tensorflow.keras.models import Sequential # 导入Keras的Sequential模型 import pandas as pd # 导入Pandas库用于数据处理 import numpy as np …

SQL小练

创建事件 #创建事件&#xff0c;x秒后&#xff0c;用库存更新昨日库存 DELIMITER $$ CREATE EVENT xxx.xxx ON SCHEDULEAT CURRENT_TIMESTAMP INTERVAL 10 SECOND DOBEGINUPDATE stock SET yesterday_quantityquantity;END $$ DELIMITER ;DELIMITER $$ CREATE DE…

接口文档不显示新写的接口

新写的接口&#xff0c;但是不显示&#xff1a; 仔细对比源码才发现没有写tag&#xff1a; 然后就有了&#xff1a;

ES6之正则扩展

正则表达式扩展 u修饰符&#xff08;Unicode模式&#xff09;y修饰符&#xff08;Sticky或粘连模式&#xff09;s修饰符&#xff08;dotAll模式&#xff09;Unicode属性转义正则实例的flags属性字符串方法与正则表达式的整合 javascript的常用的正则表达式 验证数字邮箱验证手机…

C语言中的循环队列与栈、队列之间的转换实现

引言 在数据结构的学习中&#xff0c;栈&#xff08;Stack&#xff09;和队列&#xff08;Queue&#xff09;是两个非常重要的概念。它们分别遵循着后进先出&#xff08;LIFO&#xff09;和先进先出&#xff08;FIFO&#xff09;的原则。在某些情况下&#xff0c;我们可能需要…

C++——超简单登录项目

程序入口文件 #include <QtWidgets/QApplication> // 包含登录页面头文件 #include "DlgLogin.h"int main(int argc, char *argv[]) {QApplication a(argc, argv);// 程序入口// 调页面起来//DlgMain w;//w.show();// 换成登录页面DlgLogin w;w.show();return…

QT状态机6-无目标切换

一个切换也可以没有目标状态,一个没有目标状态的切换也可以像其他切换那样被触发。 其不同之处在于,当一个没有目标的切换被触发时,它不会引起任何的状态变化, 这样便可以让状态机在一个特定的状态时响应信号或者事件而不用离开这个状态。 回顾之前的学习,如下所示:当我…

开源禅道zentao的使用

很不幸禅道因为漏洞被人进攻了&#xff0c;被迫研究。 1.安装 直接使用docker进行部署&#xff0c;这里有非常多门道。官网的镜像easysoft-zentao是属于docker安装&#xff0c;而idoop的镜像虽然也是docker安装&#xff0c;但是实际是使用官网linux一键安装的版本&#xff0c…